MCP Tool 예시 (웹 크롤링)
아래 코드는 MCP 도구 예시 코드 중 웹 크롤링을 하는 도구입니다.
아래 코드를 GenOS 에서 도구 > MCP 도구 > MCP 도구 상세
메뉴 코드 부분에 입력하면 됩니다.
도구 > MCP 도구 > MCP 도구 상세
메뉴 코드 부분에 입력하면 됩니다.@mcp.tool()
async def open_url(opens: list) -> str:
"""
여러 URL의 전체 페이지 내용을 가져오는 통합 웹 크롤링 도구입니다.
Args:
opens (list): URL 정보가 담긴 딕셔너리 배열
예: [{"url": "https://example.com"}, {"url": "https://another.com"}]
Returns:
str: JSON 형식의 크롤링 결과 리스트
"""
import aiohttp
import asyncio
import logging
import unicodedata
import re
import textwrap
import json
from typing import Optional, Dict, Any, List, Set
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup, Comment, NavigableString
from functools import lru_cache
# ===== 입력 검증 =====
if not opens or not isinstance(opens, list):
return json.dumps([{"open_index": "ERROR", "url": "invalid", "contents": "[오류] opens가 유효한 리스트가 아닙니다"}])
print(f"🕷️ 통합 크롤링 시작: {len(opens)}개 URL")
# ===== 캐시 시스템 =====
_download_cache = {}
# ===== 비동기 다운로드 함수 (open_url.py에서 복사) =====
async def _download_async(url: str, timeout: int = 15) -> str:
# 캐시 확인
if url in _download_cache:
return _download_cache[url]
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Referer': 'https://www.google.com/',
'Accept-Language': 'en-US,en;q=0.9',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'
}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
async with session.get(url, headers=headers) as resp:
resp.raise_for_status()
content = await resp.text()
# 캐시 저장 (크기 제한)
if len(_download_cache) < 128:
_download_cache[url] = content
return content
except aiohttp.ClientError as e:
raise Exception(f"다운로드 실패: {e}")
except Exception as e:
raise Exception(f"알 수 없는 오류: {e}")
# ===== HTMLTextExtractor 클래스 (open_url.py에서 완전 복사) =====
class HTMLTextExtractor:
"""강력하고 robust한 HTML 텍스트 추출기"""
REMOVE_TAGS = {
'script', 'style', 'noscript', 'iframe', 'embed', 'object',
'form', 'input', 'button', 'select', 'textarea', 'option',
'nav', 'header', 'footer', 'aside', 'advertisement', 'ads'
}
BLOCK_TAGS = {
'div', 'p', 'section', 'article', 'main', 'aside',
'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
'ul', 'ol', 'li', 'dl', 'dt', 'dd',
'blockquote', 'pre', 'code', 'table', 'tr', 'td', 'th',
'figure', 'figcaption', 'details', 'summary'
}
INLINE_TAGS = {
'span', 'a', 'strong', 'b', 'em', 'i', 'u', 'mark',
'small', 'sub', 'sup', 'abbr', 'cite', 'code', 'kbd', 'samp'
}
def __init__(self):
self.stats = {
'method_used': None,
'original_length': 0,
'extracted_length': 0,
'lines_processed': 0,
'duplicates_removed': 0
}
def extract_readable_text(
self,
html: str,
wrap: int = -1,
min_line_len: int = 5,
remove_duplicate: bool = True,
preserve_structure: bool = True,
base_url: Optional[str] = None,
language: str = 'auto',
extract_metadata: bool = False
) -> Dict[str, Any]:
self.stats['original_length'] = len(html)
html = self._preprocess_html(html)
clean_html = html # 메인 콘텐츠 추출 생략(라이브러리 의존성 최소화)
soup = self._parse_html(clean_html)
if not soup:
return self._empty_result()
metadata = {}
if extract_metadata:
metadata = self._extract_metadata(soup)
links = self._extract_links(soup, base_url) if base_url else []
lines = self._extract_structured_text(
soup, wrap, preserve_structure, language
)
lines = self._postprocess_lines(
lines, min_line_len, remove_duplicate
)
lines = self._merge_fragments(lines)
lines = self._add_line_number(lines)
self.stats['lines_processed'] = len(lines)
self.stats['extracted_length'] = sum(len(line) for line in lines)
return {
'lines': lines,
'metadata': metadata,
'stats': self.stats.copy(),
'links': links
}
def _preprocess_html(self, html: str) -> str:
if not html or not html.strip():
return ""
html = html.lstrip('\ufeff')
if isinstance(html, bytes):
html = html.decode('utf-8', errors='ignore')
html = unicodedata.normalize('NFKC', html)
html = re.sub(r'>\s+<', '><', html)
return html
def _parse_html(self, html: str) -> Optional[BeautifulSoup]:
try:
soup = BeautifulSoup(html, 'html.parser')
for tag_name in self.REMOVE_TAGS:
for tag in soup.find_all(tag_name):
tag.decompose()
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
return soup
except Exception as e:
logging.error(f"Failed to parse HTML: {e}")
return None
def _extract_metadata(self, soup: BeautifulSoup) -> Dict[str, str]:
metadata = {}
title_tag = soup.find('title')
if title_tag:
metadata['title'] = title_tag.get_text().strip()
meta_mappings = {
'description': ['description'],
'keywords': ['keywords'],
'author': ['author'],
'published': ['article:published_time', 'pubdate'],
'modified': ['article:modified_time', 'lastmod']
}
for key, names in meta_mappings.items():
for name in names:
meta = soup.find('meta', attrs={'name': name}) or \
soup.find('meta', attrs={'property': name})
if meta and meta.get('content'):
metadata[key] = meta['content'].strip()
break
og_tags = soup.find_all('meta', attrs={'property': lambda x: x and x.startswith('og:')})
for tag in og_tags:
prop = tag.get('property', '').replace('og:', '')
content = tag.get('content', '').strip()
if prop and content:
metadata[f'og_{prop}'] = content
return metadata
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]:
links = []
for link in soup.find_all('a', href=True):
href = link['href'].strip()
text = link.get_text().strip()
if not href or href.startswith(('#', 'javascript:', 'mailto:')):
continue
try:
full_url = urljoin(base_url, href)
if urlparse(full_url).scheme in ('http', 'https'):
links.append({
'url': full_url,
'text': text,
'title': link.get('title', '').strip()
})
except Exception:
continue
return links
def _extract_structured_text(
self,
soup: BeautifulSoup,
wrap: int,
preserve_structure: bool,
language: str
) -> List[str]:
lines = []
def add_text(text: str, prefix: str = "", suffix: str = ""):
if not text:
return
text = self._normalize_text(text)
if not text:
return
if wrap > 0:
wrapped_lines = textwrap.wrap(text, width=wrap)
else:
wrapped_lines = [text]
for i, line in enumerate(wrapped_lines):
if prefix and i == 0:
line = prefix + line
if suffix and i == len(wrapped_lines) - 1:
line = line + suffix
lines.append(line)
self._traverse_elements(soup, add_text, preserve_structure)
return lines
def _traverse_elements(self, element, add_text_func, preserve_structure: bool):
if not element:
return
if isinstance(element, NavigableString):
if not isinstance(element, Comment):
text = str(element).strip()
if text:
add_text_func(text)
return
tag_name = getattr(element, 'name', '')
if not tag_name:
return
if preserve_structure and tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
level = int(tag_name[1])
text = element.get_text(" ", strip=True)
if text:
prefix = "#" * level + " " if preserve_structure else ""
add_text_func(text, prefix)
add_text_func("")
elif preserve_structure and tag_name == 'li':
text = element.get_text(" ", strip=True)
if text:
add_text_func(text, "- ")
elif preserve_structure and tag_name == 'blockquote':
text = element.get_text(" ", strip=True)
if text:
add_text_func(text, "> ")
add_text_func("")
elif tag_name in ['pre', 'code']:
text = element.get_text()
if text.strip():
if preserve_structure:
add_text_func("```")
add_text_func(text.rstrip())
add_text_func("```")
else:
add_text_func(text.strip())
add_text_func("")
elif tag_name == 'table':
self._extract_table_text(element, add_text_func, preserve_structure)
elif tag_name in self.BLOCK_TAGS:
for child in element.children:
self._traverse_elements(child, add_text_func, preserve_structure)
if tag_name in ['p', 'div']:
add_text_func("")
else:
for child in element.children:
self._traverse_elements(child, add_text_func, preserve_structure)
def _extract_table_text(self, table, add_text_func, preserve_structure: bool):
rows = []
for tr in table.find_all('tr'):
cells = []
for cell in tr.find_all(['td', 'th']):
cell_text = cell.get_text(" ", strip=True)
cells.append(cell_text)
if cells and any(cell.strip() for cell in cells):
if preserve_structure:
row_text = " | ".join(cells)
rows.append(row_text)
else:
rows.extend([cell for cell in cells if cell.strip()])
if rows:
for row in rows:
add_text_func(row)
add_text_func("")
def _normalize_text(self, text: str) -> str:
if not text:
return ""
text = unicodedata.normalize('NFKC', text)
text = text.strip()
text = text.replace('\u00a0', ' ')
text = text.replace('\u200b', '')
text = text.replace('\ufeff', '')
return text
def _postprocess_lines(
self,
lines: List[str],
min_line_len: int,
remove_duplicate: bool
) -> List[str]:
if not lines:
return []
processed = []
seen: Set[str] = set()
duplicate_count = 0
for line in lines:
if not line.strip():
if processed and processed[-1].strip():
processed.append("")
continue
if len(line) < min_line_len:
if not any(line.startswith(marker) for marker in ['#', '-', '*', '>', '```']):
continue
if remove_duplicate:
line_key = self._normalize_text(line).lower()
if line_key in seen:
duplicate_count += 1
continue
seen.add(line_key)
processed.append(line)
while processed and not processed[-1].strip():
processed.pop()
self.stats['duplicates_removed'] = duplicate_count
return processed
def _empty_result(self) -> Dict[str, Any]:
return {
'lines': [],
'metadata': {},
'stats': self.stats.copy(),
'links': []
}
def _add_line_number(self, lines: List[str]) -> List[str]:
return [f"L{idx}: {line}" for idx, line in enumerate(lines)]
def _merge_fragments(self, lines: List[str]) -> List[str]:
"""Merge short fragments into coherent lines"""
if not lines:
return []
merged = []
current_line = ""
for line in lines:
line = line.strip()
if not line:
if current_line:
merged.append(current_line)
current_line = ""
merged.append("")
continue
# If line starts with markdown or is long enough, treat as separate
if (line.startswith(('#', '-', '*', '>', '```')) or
len(line) > 80 or
line.endswith(('.', '!', '?', ':', ';'))):
if current_line:
merged.append(current_line + " " + line)
current_line = ""
else:
merged.append(line)
else:
# Merge with previous
if current_line:
current_line += " " + line
else:
current_line = line
if current_line:
merged.append(current_line)
return merged
# ===== WebCrawler 기반 단일 URL 처리 함수 =====
async def crawl_single_url(url_info: Dict[str, Any], index: int) -> Dict[str, Any]:
"""단일 URL 크롤링 처리"""
url = url_info.get('url', '').strip()
# URL 정규화
if not url:
return {
"open_index": f"URL{index}",
"url": "invalid",
"contents": "[크롤링 오류] URL이 제공되지 않았습니다"
}
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
try:
print(f" 🕷️ 크롤링 시작 [{index}]: {url}")
# HTML 다운로드
html = await _download_async(url, timeout=15)
if not html or not html.strip():
return {
"open_index": f"URL{index}",
"url": url,
"contents": f"[크롤링 오류] 빈 내용: {url}"
}
# HTML 텍스트 추출
extractor = HTMLTextExtractor()
result = extractor.extract_readable_text(html)
lines = result['lines']
if not lines:
return {
"open_index": f"URL{index}",
"url": url,
"contents": f"[크롤링 오류] 텍스트 추출 실패: {url}"
}
# 줄 수 제한 (기본 100줄)
max_lines = 100
used_lines = lines[:max_lines]
# 스마트 병합 (open_url.py의 _smart_merge_lines 로직 사용)
def smart_merge_lines(lines: List[str]) -> List[str]:
if not lines:
return []
merged = []
current_line = ""
for line in lines:
line = line.strip()
if not line:
if current_line:
merged.append(current_line)
current_line = ""
merged.append("")
continue
if (line.startswith(('#', '-', '*', '>', '```')) or
len(line) > 80 or
line.endswith(('.', '!', '?', ':', ';'))):
if current_line:
merged.append(current_line + " " + line)
current_line = ""
else:
merged.append(line)
else:
if current_line:
current_line += " " + line
else:
current_line = line
if current_line:
merged.append(current_line)
return merged
merged_lines = smart_merge_lines(used_lines)
final_content = "\n".join(merged_lines)
print(f" ✅ 크롤링 완료 [{index}]: {len(merged_lines)}줄, {len(final_content):,}자")
return {
"open_index": f"URL{index}",
"url": url,
"contents": final_content
}
except Exception as e:
error_msg = f"[크롤링 오류] {str(e)}: {url}"
print(f" ❌ 실패 [{index}]: {error_msg}")
return {
"open_index": f"URL{index}",
"url": url,
"contents": error_msg
}
# ===== 메인 실행 로직 =====
try:
# 모든 URL 병렬 처리
tasks = []
for idx, url_info in enumerate(opens):
if isinstance(url_info, str):
# 문자열로 전달된 경우 {"url": "..."} 형태로 변환
url_info = {"url": url_info}
tasks.append(crawl_single_url(url_info, idx))
# 병렬 실행
results = await asyncio.gather(*tasks, return_exceptions=True)
# 예외 처리
final_results = []
success_count = 0
for idx, result in enumerate(results):
if isinstance(result, Exception):
final_results.append({
"open_index": f"URL{idx}",
"url": "exception",
"contents": f"[크롤링 오류] 예외 발생: {str(result)}"
})
else:
final_results.append(result)
if not result["contents"].startswith("[크롤링 오류]"):
success_count += 1
print(f"🎉 전체 처리 완료: {success_count}/{len(results)}개 성공")
# JSON 형식으로 반환
return json.dumps(final_results, ensure_ascii=False, indent=2)
except Exception as e:
error_msg = f"[전체 크롤링 오류] {str(e)}"
print(f"❌ {error_msg}")
return json.dumps([{
"open_index": "ERROR",
"url": "system_error",
"contents": error_msg
}], ensure_ascii=False)
Last updated
Was this helpful?