RAG를 위한 기본 전처리기 사용

RAG 워크플로우를 구현하기 위해서는 검색할 문서가 저장된 벡터 DB를 'Vector Stores' 노드('Deepsearfing', 'Weaviate' 등)에서 지정해야 합니다. 벡터 DB를 생성하고 문서를 적재하기 위해서는 문서 적재에 사용할 전처리기를 지정해야 합니다. 이 때 보편적으로 사용되는 문서 전처리를 수행할 '기본 전처리기'를 배포하는 법을 설명합니다.

전처리기 생성하기

  • GenOS 메뉴의 '관리' > '리소스' > '전처리기' 를 클릭합니다.

  • 전처리기 목록 화면 왼쪽 상단의 '전처리기 생성' 버튼을 클릭합니다.

  • '제목'과 '관리 그룹'을 원하는 값으로 입력합니다.

  • '확장자'에 적재하고자 하는 문서의 확장자 리스트를 쉼표로 구분하여 입력합니다.

    • 예) md, docx, pptx, pdf

    • 지원하지 않는 확장자를 입력할 경우 "유효하지 않다"고 메시지가 뜹니다.

  • '코드'에 디폴트로 '기본 전처리기' 코드가 입력되어 있습니다. 전체 코드는 아래 '기본 전처리기 코드' 섹션을 참고 해주세요.

    • '기본 전처리기'에서 지원하는 확장자: txt, md, json, ppt, pptx, doc, docx, pdf

    • 생성 시 혹은 생성 후 필요한 내용으로 코드를 수정할 수 있습니다.

  • '파라미터'에 벡터 DB에 적재 시 필요한 파라미터를 입력/조정할 수 있습니다.

  • 'GPU 할당량'에서 전처리기 코드 실행에 사용할 GPU의 종류 및 개수를 지정할 수 있습니다.

  • 생성 후 '전처리기 상세' 화면의 '기본 정보' 탭에서 생성할 때 입력한 정보를 확인할 수 있습니다.

전처리기 배포하기

  • '전처리기 상세'의 '배포 이력' 탭 > 오른쪽 상단의 '배포' 버튼을 클릭합니다.

  • 배포를 위한 '도커 이미지', '인스턴스 타입', '복제본' 개수를 지정합니다.

  • '생성' 버튼을 클릭하면 '배포 이력'에서 배포 관련 정보를 확인할 수 있습니다.

  • '배포 중지' 버튼을 클릭하면 현재 배포된 컨테이너를 내리고 재배포할 수 있습니다.

  • 전처리기 코드 수정 시 재배포를 해야 수정 내용이 반영됩니다.

기본 전처리기 코드

import json
from multiprocessing import set_forkserver_preload
import os
import platform
import shutil
import subprocess
import sys
import uuid
from collections import defaultdict
from datetime import datetime

import fitz
from fastapi import Request
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.document_loaders import (
    PyMuPDFLoader,                    # PDF
    UnstructuredWordDocumentLoader,   # DOC and DOCX
    UnstructuredPowerPointLoader,     # PPT and PPTX
    UnstructuredFileLoader            # Generic fallback
)
from pydantic import BaseModel
from weasyprint import HTML

from genos_utils import genos_import
from utils import assert_cancelled


def is_libreoffice_installed():
    return shutil.which("libreoffice") is not None


class GenOSVectorMeta(BaseModel):
    text: str
    n_char: int | None = None
    n_word: int | None = None
    n_line: int | None = None
    i_page: int | None = None
    e_page: int | None = None
    i_chunk_on_page: int | None = None
    n_chunk_of_page: int | None = None
    i_chunk_on_doc: int | None = None
    n_chunk_of_doc: int | None = None
    n_page: int | None = None
    reg_date: str | None = None
    chunk_bboxes: str = None


# office 확장자
OFFICE_EXTENSIONS = ['.docx', '.pptx', '.doc', '.ppt']

# 텍스트 타입 확장자
TEXT_TYPE_EXTENTIONS = ['.txt', '.json', '.md']


def _get_pdf_path(file_path: str) -> str:
    ext = os.path.splitext(file_path)[-1].lower()
    if ext in TEXT_TYPE_EXTENTIONS:
        pdf_path = file_path.replace(ext, '.pdf')
    elif ext in OFFICE_EXTENSIONS:
        if is_libreoffice_installed():
            pdf_path = convert_to_pdf(file_path)
        else:
            # pdf 없어서 뷰어에서 에러가 발생하기는 하는데, 적재는 됨
            pdf_path = file_path.replace(ext, '.pdf')
    else:
        pdf_path = file_path
    return pdf_path


def get_korean_font():
    """시스템에 따른 한글 폰트 반환"""
    system = platform.system()
    if system == "Darwin":  # macOS
        return ["Apple SD Gothic Neo", "AppleGothic"] 
    elif system == "Windows": 
        return ["Malgun Gothic", "맑은 고딕"]
    else:  # Linux
        return ["Noto Sans CJK KR", "DejaVu Sans"]
    

def get_html_content(content: str):
    korean_fonts = get_korean_font()
    font_family = ", ".join([f"'{font}'" for font in korean_fonts])
    return f"""<!DOCTYPE html>
<html lang="ko">
<head>
    <meta charset="UTF-8">
    <style>
        body {{
            font-family: {font_family}, sans-serif;
            font-size: 12px;
            line-height: 1.6;
        }}
    </style>
</head>
<body>
    <pre>{content}</pre>
</body>
</html>"""


def convert_to_pdf(file_path: str):
    try:
        print("file_path: ", file_path)
        ext = os.path.splitext(file_path)[-1].lower()
        out_path = os.path.dirname(file_path)
        subprocess.run(['soffice', '--headless', '--convert-to', 'pdf', '--outdir', out_path, file_path], check=True)
        pdf_path = file_path.replace(ext, '.pdf')
        return pdf_path
    except subprocess.CalledProcessError as e:
        print("Error converting PPT to PDF: ", e)
        return False
    except Exception as e:
        print(f"Error converting PPT to PDF: {e}")
        return False
        

class TextLoader:
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.output_dir = os.path.join('/tmp', str(uuid.uuid4()))
        os.makedirs(self.output_dir, exist_ok=True)
    def load(self):
        try:
            with open(self.file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            html_content = get_html_content(content)
            html_file_path = os.path.join(self.output_dir, 'temp.html')
            with open(html_file_path, 'w', encoding='utf-8') as f:
                f.write(html_content)
            pdf_save_path = _get_pdf_path(self.file_path)
            HTML(html_file_path).write_pdf(pdf_save_path)
            loader = PyMuPDFLoader(pdf_save_path)
            return loader.load()
        except Exception as e:
            print(f"Failed to convert {self.file_path} to XHTML")
            raise e
        finally:
            if os.path.exists(self.output_dir):
                shutil.rmtree(self.output_dir)


class DocumentProcessor:
    def __init__(self):
        self.page_chunk_counts = defaultdict(int)
        self.pdf_path = None

    def get_loader(self, file_path: str):
        ext = os.path.splitext(file_path)[-1].lower()
        if ext == '.pdf':
            return PyMuPDFLoader(file_path)
        elif ext in OFFICE_EXTENSIONS:
            pdf_path = _get_pdf_path(file_path)
            self.pdf_path = pdf_path
            return PyMuPDFLoader(pdf_path)
        elif ext in TEXT_TYPE_EXTENTIONS:
            return TextLoader(file_path)
        else:
            return UnstructuredFileLoader(file_path)

    def load_documents(self, file_path: str, **kwargs: dict) -> list[Document]:
        loader = self.get_loader(file_path)
        documents = loader.load()
        return documents

    def split_documents(self, documents, **kwargs: dict) -> list[Document]:
        splitter_params = {}
        chunk_size = kwargs.get('chunk_size')
        chunk_overlap = kwargs.get('chunk_overlap')
        
        if chunk_size is not None:
            splitter_params['chunk_size'] = chunk_size
            
        if chunk_overlap is not None:
            splitter_params['chunk_overlap'] = chunk_overlap
        
        text_splitter = RecursiveCharacterTextSplitter(**splitter_params)
        chunks = text_splitter.split_documents(documents)
        chunks = [chunk for chunk in chunks if chunk.page_content]
        if not chunks:
            raise Exception('Empty document')

        for chunk in chunks:
            page = chunk.metadata.get('page', 0)
            self.page_chunk_counts[page] += 1
        return chunks

    def compose_vectors(self, file_path: str, chunks: list[Document], **kwargs: dict) -> list[dict]:
        if self.pdf_path is None:
            pdf_path = _get_pdf_path(file_path)
        else:
            pdf_path = self.pdf_path
        doc = None
        total_pages = 0

        if os.path.exists(pdf_path):
            doc = fitz.open(pdf_path)
            total_pages = len(doc)

        global_metadata = dict(
            n_chunk_of_doc = len(chunks),
            n_page = max([chunk.metadata.get('page', 0) + 1 for chunk in chunks]),
            reg_date = datetime.now().isoformat(timespec='seconds') + 'Z'
        )

        current_page = None
        chunk_index_on_page = 0
        vectors = []

        for chunk_idx, chunk in enumerate(chunks):
            page = chunk.metadata.get('page', 0)
            text = chunk.page_content

            if page != current_page:
                current_page = page
                chunk_index_on_page = 0

            vectors.append(GenOSVectorMeta.model_validate({
                'text': text,
                'n_char': len(text),
                'n_word': len(text.split()),
                'n_line': len(text.splitlines()),
                'i_page': page,
                'e_page': page,
                'i_chunk_on_page': chunk_index_on_page,
                'n_chunk_of_page': self.page_chunk_counts[page],
                'i_chunk_on_doc': chunk_idx,
                **global_metadata
            }))
            chunk_index_on_page += 1

        return vectors

    async def __call__(self, request: Request, file_path: str, **kwargs: dict):
        documents: list[Document] = self.load_documents(file_path, **kwargs)
        await assert_cancelled(request)
        
        chunks: list[Document] = self.split_documents(documents, **kwargs)
        await assert_cancelled(request)

        vectors: list[dict] = self.compose_vectors(file_path, chunks, **kwargs)
        return vectors
      

Last updated

Was this helpful?