Copy import json
from multiprocessing import set_forkserver_preload
import os
import platform
import shutil
import subprocess
import sys
import uuid
from collections import defaultdict
from datetime import datetime
import fitz
from fastapi import Request
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.document_loaders import (
PyMuPDFLoader, # PDF
UnstructuredWordDocumentLoader, # DOC and DOCX
UnstructuredPowerPointLoader, # PPT and PPTX
UnstructuredFileLoader # Generic fallback
)
from pydantic import BaseModel
from weasyprint import HTML
from genos_utils import genos_import
from utils import assert_cancelled
def is_libreoffice_installed():
return shutil.which("libreoffice") is not None
class GenOSVectorMeta(BaseModel):
text: str
n_char: int | None = None
n_word: int | None = None
n_line: int | None = None
i_page: int | None = None
e_page: int | None = None
i_chunk_on_page: int | None = None
n_chunk_of_page: int | None = None
i_chunk_on_doc: int | None = None
n_chunk_of_doc: int | None = None
n_page: int | None = None
reg_date: str | None = None
chunk_bboxes: str = None
# office 확장자
OFFICE_EXTENSIONS = ['.docx', '.pptx', '.doc', '.ppt']
# 텍스트 타입 확장자
TEXT_TYPE_EXTENTIONS = ['.txt', '.json', '.md']
def _get_pdf_path(file_path: str) -> str:
ext = os.path.splitext(file_path)[-1].lower()
if ext in TEXT_TYPE_EXTENTIONS:
pdf_path = file_path.replace(ext, '.pdf')
elif ext in OFFICE_EXTENSIONS:
if is_libreoffice_installed():
pdf_path = convert_to_pdf(file_path)
else:
# pdf 없어서 뷰어에서 에러가 발생하기는 하는데, 적재는 됨
pdf_path = file_path.replace(ext, '.pdf')
else:
pdf_path = file_path
return pdf_path
def get_korean_font():
"""시스템에 따른 한글 폰트 반환"""
system = platform.system()
if system == "Darwin": # macOS
return ["Apple SD Gothic Neo", "AppleGothic"]
elif system == "Windows":
return ["Malgun Gothic", "맑은 고딕"]
else: # Linux
return ["Noto Sans CJK KR", "DejaVu Sans"]
def get_html_content(content: str):
korean_fonts = get_korean_font()
font_family = ", ".join([f"'{font}'" for font in korean_fonts])
return f"""<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<style>
body {{
font-family: {font_family}, sans-serif;
font-size: 12px;
line-height: 1.6;
}}
</style>
</head>
<body>
<pre>{content}</pre>
</body>
</html>"""
def convert_to_pdf(file_path: str):
try:
print("file_path: ", file_path)
ext = os.path.splitext(file_path)[-1].lower()
out_path = os.path.dirname(file_path)
subprocess.run(['soffice', '--headless', '--convert-to', 'pdf', '--outdir', out_path, file_path], check=True)
pdf_path = file_path.replace(ext, '.pdf')
return pdf_path
except subprocess.CalledProcessError as e:
print("Error converting PPT to PDF: ", e)
return False
except Exception as e:
print(f"Error converting PPT to PDF: {e}")
return False
class TextLoader:
def __init__(self, file_path: str):
self.file_path = file_path
self.output_dir = os.path.join('/tmp', str(uuid.uuid4()))
os.makedirs(self.output_dir, exist_ok=True)
def load(self):
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
html_content = get_html_content(content)
html_file_path = os.path.join(self.output_dir, 'temp.html')
with open(html_file_path, 'w', encoding='utf-8') as f:
f.write(html_content)
pdf_save_path = _get_pdf_path(self.file_path)
HTML(html_file_path).write_pdf(pdf_save_path)
loader = PyMuPDFLoader(pdf_save_path)
return loader.load()
except Exception as e:
print(f"Failed to convert {self.file_path} to XHTML")
raise e
finally:
if os.path.exists(self.output_dir):
shutil.rmtree(self.output_dir)
class DocumentProcessor:
def __init__(self):
self.page_chunk_counts = defaultdict(int)
self.pdf_path = None
def get_loader(self, file_path: str):
ext = os.path.splitext(file_path)[-1].lower()
if ext == '.pdf':
return PyMuPDFLoader(file_path)
elif ext in OFFICE_EXTENSIONS:
pdf_path = _get_pdf_path(file_path)
self.pdf_path = pdf_path
return PyMuPDFLoader(pdf_path)
elif ext in TEXT_TYPE_EXTENTIONS:
return TextLoader(file_path)
else:
return UnstructuredFileLoader(file_path)
def load_documents(self, file_path: str, **kwargs: dict) -> list[Document]:
loader = self.get_loader(file_path)
documents = loader.load()
return documents
def split_documents(self, documents, **kwargs: dict) -> list[Document]:
splitter_params = {}
chunk_size = kwargs.get('chunk_size')
chunk_overlap = kwargs.get('chunk_overlap')
if chunk_size is not None:
splitter_params['chunk_size'] = chunk_size
if chunk_overlap is not None:
splitter_params['chunk_overlap'] = chunk_overlap
text_splitter = RecursiveCharacterTextSplitter(**splitter_params)
chunks = text_splitter.split_documents(documents)
chunks = [chunk for chunk in chunks if chunk.page_content]
if not chunks:
raise Exception('Empty document')
for chunk in chunks:
page = chunk.metadata.get('page', 0)
self.page_chunk_counts[page] += 1
return chunks
def compose_vectors(self, file_path: str, chunks: list[Document], **kwargs: dict) -> list[dict]:
if self.pdf_path is None:
pdf_path = _get_pdf_path(file_path)
else:
pdf_path = self.pdf_path
doc = None
total_pages = 0
if os.path.exists(pdf_path):
doc = fitz.open(pdf_path)
total_pages = len(doc)
global_metadata = dict(
n_chunk_of_doc = len(chunks),
n_page = max([chunk.metadata.get('page', 0) + 1 for chunk in chunks]),
reg_date = datetime.now().isoformat(timespec='seconds') + 'Z'
)
current_page = None
chunk_index_on_page = 0
vectors = []
for chunk_idx, chunk in enumerate(chunks):
page = chunk.metadata.get('page', 0)
text = chunk.page_content
if page != current_page:
current_page = page
chunk_index_on_page = 0
vectors.append(GenOSVectorMeta.model_validate({
'text': text,
'n_char': len(text),
'n_word': len(text.split()),
'n_line': len(text.splitlines()),
'i_page': page,
'e_page': page,
'i_chunk_on_page': chunk_index_on_page,
'n_chunk_of_page': self.page_chunk_counts[page],
'i_chunk_on_doc': chunk_idx,
**global_metadata
}))
chunk_index_on_page += 1
return vectors
async def __call__(self, request: Request, file_path: str, **kwargs: dict):
documents: list[Document] = self.load_documents(file_path, **kwargs)
await assert_cancelled(request)
chunks: list[Document] = self.split_documents(documents, **kwargs)
await assert_cancelled(request)
vectors: list[dict] = self.compose_vectors(file_path, chunks, **kwargs)
return vectors