실전: 예시 프로젝트(리서치 에이전트)와 함께 이해하기
다음의 깃허브 레포지토리(mindsandcompany/mocking-flowise)를 기준으로, 파이썬 코드로 구현한 리서치 에이전트를 GenOS 채팅과 연동하는 전 과정을 따라가며 이해합니다.
1) 프로젝트 개요
목적: Flowise로 구현이 까다로운 흐름을 Python(FastAPI) 코드로 구현하고, GenOS 채팅 UI에 추론/툴 실행 과정을 그대로 스트리밍 노출
핵심 포인트
SSE 스트리밍:
token
,reasoning_token
,agentFlowExecutedData
,error
,result
툴 호출 및 UI 노출: OpenAI 함수 호출 포맷 스키마 +
agentFlowExecutedData
규칙Docker 배포로 손쉬운 실행
디렉터리 구조(요약):
mock_workflow/
├─ app/
│ ├─ __init__.py
│ ├─ main.py # FastAPI 엔트리 및 라우터 포함
│ ├─ api/
│ │ ├─ __init__.py
│ │ ├─ chat.py # /chat/stream
│ │ └─ health.py # /health
│ ├─ prompts/
│ │ └─ system.txt # 시스템 프롬프트
│ ├─ stores/
│ │ ├─ __init__.py
│ │ └─ session_store.py # Redis 기반 세션 저장/복구
│ ├─ tools/
│ │ ├─ __init__.py # get_tool_map, get_tools_for_llm 정의
│ │ ├─ open_url.py # 크롤링 툴
│ │ └─ web_search.py # 웹 검색 툴
│ ├─ mcp/
│ │ ├─ __init__.py # get_mcp_tool_map 정의
│ │ └─ mcp_tools.py # MCP 서버에서 가져온 툴 관리
│ ├─ utils.py # OpenRouter 클라이언트, 스트림 유틸 등
│ └─ .env # 주요한 환경변수 관리
├─ tests/
│ ├─ test_mcp.py # MCP 서버 연동 확인 스크립트
│ └─ test_chat.py # SSE 수신 예제 스크립트
├─ .gitignore
├─ docker-compose.yml # API(5555) + Redis 구성
├─ Dockerfile
├─ README.md
└─ requirements.txt
2) 요구 사항 및 환경 변수
Python 3.11+
OpenRouter API Key:
OPENROUTER_API_KEY
searchapi.io API Key:
SEARCHAPI_KEY
(웹 검색 툴 사용 시)
.env
예시:
OPENROUTER_API_KEY=<your_openrouter_api_key>
DEFAULT_MODEL=openai/gpt-4o-mini
SEARCHAPI_KEY=<your_searchapi_key>
MCP_SERVER_ID=<your_mcp_server_id> # if you want to use tools from multiple mcp server, separate each GenOS MCP server id with comma (",") (e.g., 1, 2, ...)
GENOS_ID=<your_genos_id>
GENOS_PW=<your_genos_password>
3) 클론 및 실행
A. 레포지토리 클론
git clone https://github.com/mindsandcompany/mocking-flowise.git
cd mocking-flowise/mock_workflow
B. Docker Compose
docker compose up -d --build
# 상태 확인
docker compose ps
# 헬스체크
curl -s http://localhost:5555/health | cat
컨테이너 기본 포트는 5555입니다. (
Dockerfile
-EXPOSE 5555
및docker-compose.yml
-5555:5555
)코드/의존성/Dockerfile 변경 후에는
--build
를 붙여 최신 상태로 반영하세요.서버 내에 같은 이름의 도커 이미지나 컨테이너가 존재할 경우 충돌이 발생할 수 있으므로 주의하세요.
4) 엔드포인트 및 SSE 규칙 확인
엔드포인트:
app/api/chat.py
에서@app.post("/<path>")
데코레이터로 정의된 라우트를 확인하세요.예:
POST /run
또는POST /chat
등 프로젝트 버전에 따라 다를 수 있음
SSE 규칙: 본 문서의 상위 가이드인 파이썬 코드 기반 FastAPI 앱과 GenOS 채팅 연동하기의 “SSE 이벤트 규칙”을 반드시 준수해야 GenOS 채팅 UI에서 올바르게 표현됩니다.
아래의
app/api/chat.py
를 참고하여 코드 구현하시길 바랍니다.
@app.post("/chat/stream")
async def chat_stream(req: GenerateRequest):
queue: asyncio.Queue[str] = asyncio.Queue()
SENTINEL = "__STREAM_DONE__"
async def emit(
event: Literal["token", "reasoning_token", "agentFlowExecutedData", "error", "result"],
data: str | dict | None
):
"""
GenOS 채팅 어플리케이션에 표현되기 원하는 정보를 SSE 이벤트로 전송합니다.
"""
payload = {"event": event, "data": data}
await queue.put(f"data: {json.dumps(payload, ensure_ascii=False)}\n\n")
async def heartbeat():
while True:
await asyncio.sleep(10)
await queue.put(": keep-alive\n\n")
async def runner():
try:
"""
실제 비즈니스 로직을 이곳에 구현해 주세요.
"""
await emit("token", "This is an example sentence.")
except Exception as e:
await emit("error", str(e))
finally:
await emit("result", None)
await queue.put(SENTINEL)
async def sse():
producer = asyncio.create_task(runner())
pinger = asyncio.create_task(heartbeat())
try:
while True:
chunk = await queue.get()
if chunk == SENTINEL:
break
yield chunk
finally:
producer.cancel()
pinger.cancel()
return StreamingResponse(
sse(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}
)
emit
: SSE 이벤트를 클라이언트로 전송하는 함수입니다. 이벤트 이름과 데이터를 받아 JSON 형태로 직렬화하여 스트림 큐에 넣습니다.heartbeat
: 연결이 유지되고 있음을 알리기 위해 주기적으로(예: 10초마다) keep-alive 신호를 스트림 큐에 넣는 함수입니다.runner
: 실제 비즈니스 로직(예: LLM 호출 등)을 실행하고, 중간 결과나 최종 결과를emit
을 통해 스트림으로 전송하는 함수입니다. 오류 발생 시 에러 메시지도 전송하며, 마지막에 스트림 종료 신호를 보냅니다.
간단 cURL 테스트(예시):
curl -N -X POST \
-H "Content-Type: application/json" \
-H "x-request-from: internal" \
-d '{"text":"hello"}' \
http://localhost:5555/chat/stream
서버는
data: {"event": string, "data": any}
형식의 라인을 스트리밍으로 전송해야 합니다.agentFlowExecutedData
이벤트의data.data.output.content
에는 규칙에 맞춘 문자열(JSON 직렬화 결과)이 들어가야 합니다.
5) 툴(예: web_search) 구조 이해
스키마/실행:
tools/web_search.py
Agent에게 전달되는 툴의 스키마:
WEB_SEARCH
실제 툴의 동작:
web_search()
서버 등록:
tools/__init__.py
get_tool_map
: 스키마에 정의된 툴 이름과 실제 실행 함수 매핑get_tools_for_llm
: Agent에게 전달될 도구 스키마 리스트
새 툴 추가(요약):
tools/my_tool.py
에 실행 함수,WEB_MY_TOOL
스키마tools/__init__.py
의get_tool_map
,get_tools_for_llm
에 등록
6) MCP 서버 연결 및 도구 관리
.env
파일에 연결하고 싶은 MCP 서버 ID를 적으면 해당 서버에 등록된 모든 툴을 에이전트에 연결합니다.
만약 MCP 서버에 연결된 도구의 아웃풋을 후처리하여 LLM에게 전달하고 싶다면, app/mcp/mcp_tools.py
스크립트 내에 get_mcp_tool
의 call_mcp_tool
함수 내부에서 후처리 로직을 구현하시면 됩니다.
7) GenOS 워크플로우 연동(Python Step)
워크플로우에 Python Step을 만들고, 상위 가이드의 샘플 코드에서
endpoint
를 본 예시 서버의 엔드포인트로 교체해 사용하세요.샘플 코드: 상위 가이드의 예시 코드
본 예제를 위한 파이썬 스텝 코드:
from main_socketio import sio_server
import aiohttp, json, re
from urllib.parse import urlparse
async def run(data: dict) -> dict:
sid = data.get('socketIOClientId') # 프론트 소켓 ID
endpoint = "http://192.168.74.188:1003/chat/stream"
result = {}
text_acc = ""
BIG = 2**30
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=None),
read_bufsize=BIG, max_line_size=BIG
) as session:
async with session.post(endpoint, json=data, headers={"x-request-from":"internal"}) as resp:
reasoning = ""
tool_state = None
citation_buffer = ""
inside_citation = False
def replace_citation_segment(segment: str) -> str:
# segment: "【...】" 형태. 내부의 turn:id 만 URL로 치환하고 나머지는 제거
try:
if not (segment.startswith("【") and segment.endswith("】")):
return segment
body = segment[1:-1]
ids = re.findall(r"(\d+:\d+)", body)
if not ids:
return ""
id_to_url = {}
if isinstance(tool_state, dict):
id_to_url = tool_state.get("id_to_url", {}) or {}
mapped = [id_to_url[idv] for idv in ids if idv in id_to_url]
def get_domain(url):
try:
# 1. URL 파싱하여 'netloc' (도메인 부분) 추출
parsed_url = urlparse(url)
domain = parsed_url.netloc
# 2. 'www.'가 있다면 제거
if domain.startswith('www.'):
domain = domain[4:]
return domain
except Exception:
return "link"
styled = [
f'<a href="{url}" target="_blank" class="btn__chip"> <strong>{get_domain(url)}</strong></a>'
for url in mapped
]
return " ".join(styled)
except Exception:
return segment
async def process_token(ev_text: str):
nonlocal citation_buffer, inside_citation, text_acc
i = 0
n = len(ev_text)
while i < n:
if inside_citation:
citation_buffer += ev_text[i]
i += 1
# 버퍼에 닫힘이 생기면 한 세그먼트 처리
close_idx = citation_buffer.find("】")
if close_idx != -1:
segment = citation_buffer[:close_idx+1]
remainder = citation_buffer[close_idx+1:]
try:
if re.fullmatch(r"【\d+†chart】", segment):
key = segment[1:-1]
id_to_iframe = {}
if isinstance(tool_state, dict):
id_to_iframe = tool_state.get("id_to_iframe", {}) or {}
replaced = id_to_iframe.get(key, "")
else:
replaced = replace_citation_segment(segment)
except Exception:
replaced = ""
text_acc += replaced
if sid:
await sio_server.emit("token", replaced, room=sid)
citation_buffer = ""
inside_citation = False
if remainder:
# remainder 내에 추가 인용이 있을 수 있으므로 재귀적으로 처리
await process_token(remainder)
# 이 케이스는 remainder 처리까지 끝났으므로 루프 계속
else:
# 인용 시작 찾기
start_idx = ev_text.find("【", i)
if start_idx == -1:
chunk = ev_text[i:]
if chunk:
text_acc += chunk
if sid:
await sio_server.emit("token", chunk, room=sid)
break
# 인용 시작 전 일반 텍스트 출력
if start_idx > i:
chunk = ev_text[i:start_idx]
text_acc += chunk
if sid:
await sio_server.emit("token", chunk, room=sid)
# 인용 시작부터 버퍼링 시작
inside_citation = True
citation_buffer = "【"
i = start_idx + 1
# 루프 종료
async for line in resp.content:
if not line:
continue
decoded = line.decode("utf-8").strip()
if decoded.startswith("data:"):
decoded = decoded.removeprefix("data:")
try:
payload = json.loads(decoded)
except json.JSONDecodeError:
continue
event = payload.get("event")
ev_data = payload.get("data")
if event == "tool_state":
tool_state = ev_data
elif event == "reasoning_token":
reasoning += ev_data
elif reasoning:
result.setdefault('agentFlowExecutedData', []).append({
"nodeLabel": "Visible Reasoner",
"data": {"output": {"content": json.dumps({"visible_rationale": reasoning}, ensure_ascii=False)}}
})
reasoning = ""
if sid:
await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
if event == "token":
if isinstance(ev_data, str):
await process_token(ev_data)
elif event == "agentFlowExecutedData":
result.setdefault('agentFlowExecutedData', []).append(ev_data)
if sid:
await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
elif event == "error":
result["message"] = ev_data
result["success"] = False
result['statusCode'] = 500
elif event == "result":
# 토큰만 왔다가 최종 result가 없으면 text로 보강
if text_acc and "text" not in result:
result["text"] = text_acc
if sid:
await sio_server.emit("result", result, room=sid)
# 워크플로우 다음 스텝으로 넘길 데이터 머지 후 반환
data.update(result)
return data
위의 파이썬 스텝 코드는 스트리밍과 동시에 inline citation, 차트 삽입을 위한 로직이 샘플 코드에 추가된 것입니다.
에이전트는 미리 정의된 토큰 ("【", "】") 사이에 특정한 ID를 기입하여 inline citation이나 차트 삽입을 수행하는데, 만약 스트리밍된 토큰이 "【"로 시작한다면 GenOS 채팅에 토큰 정보를 보내는 것을 중단하고, "】"이 나왔을 때, 특수 토큰 사이의 ID를 추출해 적절한 값으로 바꾸어서 출력하는 방식으로 동작합니다.
연동 체크리스트:
8) 자주 겪는 이슈 및 해결법
SSE가 끊기거나 표시되지 않음
서버가
data: ...
한 줄 + 빈 줄 규칙을 지키는지 확인app/api/chat.py
의runner
함수 내부의try: ... except: ... finally: ...
구문 밖에서 에러가 발생하는 경우, POST 요청이 끊기지도 않고 파이프라인 진행도 안되는 현상이 발생할 수 있습니다. 가능하다면try: ...
구문 내에 로직을 작성해주세요.
UI에 툴 실행 과정이 안 보임
agentFlowExecutedData
이벤트를 실제로 보내는지 확인data.data.output.content
가 JSON 문자열인지 확인(객체가 아닌 문자열)포맷은 규칙 링크 준수
키/호출 오류
OPENROUTER_API_KEY
,SEARCHAPI_KEY
주입 여부 확인외부 API 할당량/권한 확인
포트 충돌
Docker
-p
매핑을 변경하거나, 로컬 실행 포트 조정
Last updated
Was this helpful?