RAG를 위한 기본 전처리기 사용
RAG 워크플로우를 구현하기 위해서는 검색할 문서가 저장된 벡터 DB를 'Vector Stores' 노드('Deepsearfing', 'Weaviate' 등)에서 지정해야 합니다. 벡터 DB를 생성하고 문서를 적재하기 위해서는 문서 적재에 사용할 전처리기를 지정해야 합니다. 이 때 보편적으로 사용되는 문서 전처리를 수행할 '기본 전처리기'를 배포하는 법을 설명합니다.
전처리기 생성하기

GenOS 메뉴의 '관리' > '리소스' > '전처리기' 를 클릭합니다.
전처리기 목록 화면 왼쪽 상단의 '전처리기 생성' 버튼을 클릭합니다.

'제목'과 '관리 그룹'을 원하는 값으로 입력합니다.
'확장자'에 적재하고자 하는 문서의 확장자 리스트를 쉼표로 구분하여 입력합니다.
예)
md, docx, pptx, pdf
지원하지 않는 확장자를 입력할 경우 "유효하지 않다"고 메시지가 뜹니다.
'코드'에 디폴트로 '기본 전처리기' 코드가 입력되어 있습니다. 전체 코드는 아래 '기본 전처리기 코드' 섹션을 참고 해주세요.
'기본 전처리기'에서 지원하는 확장자:
txt, md, json, ppt, pptx, doc, docx, pdf
생성 시 혹은 생성 후 필요한 내용으로 코드를 수정할 수 있습니다.
'파라미터'에 벡터 DB에 적재 시 필요한 파라미터를 입력/조정할 수 있습니다.
'GPU 할당량'에서 전처리기 코드 실행에 사용할 GPU의 종류 및 개수를 지정할 수 있습니다.

생성 후 '전처리기 상세' 화면의 '기본 정보' 탭에서 생성할 때 입력한 정보를 확인할 수 있습니다.
전처리기 배포하기

'전처리기 상세'의 '배포 이력' 탭 > 오른쪽 상단의 '배포' 버튼을 클릭합니다.
배포를 위한 '도커 이미지', '인스턴스 타입', '복제본' 개수를 지정합니다.

'생성' 버튼을 클릭하면 '배포 이력'에서 배포 관련 정보를 확인할 수 있습니다.
'배포 중지' 버튼을 클릭하면 현재 배포된 컨테이너를 내리고 재배포할 수 있습니다.
전처리기 코드 수정 시 재배포를 해야 수정 내용이 반영됩니다.
기본 전처리기 코드
import json
from multiprocessing import set_forkserver_preload
import os
import platform
import shutil
import subprocess
import sys
import uuid
from collections import defaultdict
from datetime import datetime
import fitz
from fastapi import Request
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.document_loaders import (
PyMuPDFLoader, # PDF
UnstructuredWordDocumentLoader, # DOC and DOCX
UnstructuredPowerPointLoader, # PPT and PPTX
UnstructuredFileLoader # Generic fallback
)
from pydantic import BaseModel
from weasyprint import HTML
from genos_utils import genos_import
from utils import assert_cancelled
def is_libreoffice_installed():
return shutil.which("libreoffice") is not None
class GenOSVectorMeta(BaseModel):
text: str
n_char: int | None = None
n_word: int | None = None
n_line: int | None = None
i_page: int | None = None
e_page: int | None = None
i_chunk_on_page: int | None = None
n_chunk_of_page: int | None = None
i_chunk_on_doc: int | None = None
n_chunk_of_doc: int | None = None
n_page: int | None = None
reg_date: str | None = None
chunk_bboxes: str = None
# office 확장자
OFFICE_EXTENSIONS = ['.docx', '.pptx', '.doc', '.ppt']
# 텍스트 타입 확장자
TEXT_TYPE_EXTENTIONS = ['.txt', '.json', '.md']
def _get_pdf_path(file_path: str) -> str:
ext = os.path.splitext(file_path)[-1].lower()
if ext in TEXT_TYPE_EXTENTIONS:
pdf_path = file_path.replace(ext, '.pdf')
elif ext in OFFICE_EXTENSIONS:
if is_libreoffice_installed():
pdf_path = convert_to_pdf(file_path)
else:
# pdf 없어서 뷰어에서 에러가 발생하기는 하는데, 적재는 됨
pdf_path = file_path.replace(ext, '.pdf')
else:
pdf_path = file_path
return pdf_path
def get_korean_font():
"""시스템에 따른 한글 폰트 반환"""
system = platform.system()
if system == "Darwin": # macOS
return ["Apple SD Gothic Neo", "AppleGothic"]
elif system == "Windows":
return ["Malgun Gothic", "맑은 고딕"]
else: # Linux
return ["Noto Sans CJK KR", "DejaVu Sans"]
def get_html_content(content: str):
korean_fonts = get_korean_font()
font_family = ", ".join([f"'{font}'" for font in korean_fonts])
return f"""<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<style>
body {{
font-family: {font_family}, sans-serif;
font-size: 12px;
line-height: 1.6;
}}
</style>
</head>
<body>
<pre>{content}</pre>
</body>
</html>"""
def convert_to_pdf(file_path: str):
try:
print("file_path: ", file_path)
ext = os.path.splitext(file_path)[-1].lower()
out_path = os.path.dirname(file_path)
subprocess.run(['soffice', '--headless', '--convert-to', 'pdf', '--outdir', out_path, file_path], check=True)
pdf_path = file_path.replace(ext, '.pdf')
return pdf_path
except subprocess.CalledProcessError as e:
print("Error converting PPT to PDF: ", e)
return False
except Exception as e:
print(f"Error converting PPT to PDF: {e}")
return False
class TextLoader:
def __init__(self, file_path: str):
self.file_path = file_path
self.output_dir = os.path.join('/tmp', str(uuid.uuid4()))
os.makedirs(self.output_dir, exist_ok=True)
def load(self):
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
html_content = get_html_content(content)
html_file_path = os.path.join(self.output_dir, 'temp.html')
with open(html_file_path, 'w', encoding='utf-8') as f:
f.write(html_content)
pdf_save_path = _get_pdf_path(self.file_path)
HTML(html_file_path).write_pdf(pdf_save_path)
loader = PyMuPDFLoader(pdf_save_path)
return loader.load()
except Exception as e:
print(f"Failed to convert {self.file_path} to XHTML")
raise e
finally:
if os.path.exists(self.output_dir):
shutil.rmtree(self.output_dir)
class DocumentProcessor:
def __init__(self):
self.page_chunk_counts = defaultdict(int)
self.pdf_path = None
def get_loader(self, file_path: str):
ext = os.path.splitext(file_path)[-1].lower()
if ext == '.pdf':
return PyMuPDFLoader(file_path)
elif ext in OFFICE_EXTENSIONS:
pdf_path = _get_pdf_path(file_path)
self.pdf_path = pdf_path
return PyMuPDFLoader(pdf_path)
elif ext in TEXT_TYPE_EXTENTIONS:
return TextLoader(file_path)
else:
return UnstructuredFileLoader(file_path)
def load_documents(self, file_path: str, **kwargs: dict) -> list[Document]:
loader = self.get_loader(file_path)
documents = loader.load()
return documents
def split_documents(self, documents, **kwargs: dict) -> list[Document]:
splitter_params = {}
chunk_size = kwargs.get('chunk_size')
chunk_overlap = kwargs.get('chunk_overlap')
if chunk_size is not None:
splitter_params['chunk_size'] = chunk_size
if chunk_overlap is not None:
splitter_params['chunk_overlap'] = chunk_overlap
text_splitter = RecursiveCharacterTextSplitter(**splitter_params)
chunks = text_splitter.split_documents(documents)
chunks = [chunk for chunk in chunks if chunk.page_content]
if not chunks:
raise Exception('Empty document')
for chunk in chunks:
page = chunk.metadata.get('page', 0)
self.page_chunk_counts[page] += 1
return chunks
def compose_vectors(self, file_path: str, chunks: list[Document], **kwargs: dict) -> list[dict]:
if self.pdf_path is None:
pdf_path = _get_pdf_path(file_path)
else:
pdf_path = self.pdf_path
doc = None
total_pages = 0
if os.path.exists(pdf_path):
doc = fitz.open(pdf_path)
total_pages = len(doc)
global_metadata = dict(
n_chunk_of_doc = len(chunks),
n_page = max([chunk.metadata.get('page', 0) + 1 for chunk in chunks]),
reg_date = datetime.now().isoformat(timespec='seconds') + 'Z'
)
current_page = None
chunk_index_on_page = 0
vectors = []
for chunk_idx, chunk in enumerate(chunks):
page = chunk.metadata.get('page', 0)
text = chunk.page_content
if page != current_page:
current_page = page
chunk_index_on_page = 0
vectors.append(GenOSVectorMeta.model_validate({
'text': text,
'n_char': len(text),
'n_word': len(text.split()),
'n_line': len(text.splitlines()),
'i_page': page,
'e_page': page,
'i_chunk_on_page': chunk_index_on_page,
'n_chunk_of_page': self.page_chunk_counts[page],
'i_chunk_on_doc': chunk_idx,
**global_metadata
}))
chunk_index_on_page += 1
return vectors
async def __call__(self, request: Request, file_path: str, **kwargs: dict):
documents: list[Document] = self.load_documents(file_path, **kwargs)
await assert_cancelled(request)
chunks: list[Document] = self.split_documents(documents, **kwargs)
await assert_cancelled(request)
vectors: list[dict] = self.compose_vectors(file_path, chunks, **kwargs)
return vectors
Last updated
Was this helpful?