실전: 예시 프로젝트(리서치 에이전트)와 함께 이해하기

다음의 깃허브 레포지토리(mindsandcompany/mocking-flowise)를 기준으로, 파이썬 코드로 구현한 리서치 에이전트를 GenOS 채팅과 연동하는 전 과정을 따라가며 이해합니다.


1) 프로젝트 개요

  • 목적: Flowise로 구현이 까다로운 흐름을 Python(FastAPI) 코드로 구현하고, GenOS 채팅 UI에 추론/툴 실행 과정을 그대로 스트리밍 노출

  • 핵심 포인트

    • SSE 스트리밍: token, reasoning_token, agentFlowExecutedData, error, result

    • 툴 호출 및 UI 노출: OpenAI 함수 호출 포맷 스키마 + agentFlowExecutedData 규칙

    • Docker 배포로 손쉬운 실행

디렉터리 구조(요약):

mock_workflow/
  ├─ app/
  │  ├─ __init__.py
  │  ├─ main.py                 # FastAPI 엔트리 및 라우터 포함
  │  ├─ api/
  │  │  ├─ __init__.py
  │  │  ├─ chat.py              # /chat/stream
  │  │  └─ health.py            # /health
  │  ├─ prompts/
  │  │  └─ system.txt           # 시스템 프롬프트
  │  ├─ stores/
  │  │  ├─ __init__.py
  │  │  └─ session_store.py     # Redis 기반 세션 저장/복구
  │  ├─ tools/
  │  │  ├─ __init__.py          # get_tool_map, get_tools_for_llm 정의
  │  │  ├─ open_url.py          # 크롤링 툴
  │  │  └─ web_search.py        # 웹 검색 툴
  │  ├─ mcp/
  │  │  ├─ __init__.py          # get_mcp_tool_map 정의
  │  │  └─ mcp_tools.py         # MCP 서버에서 가져온 툴 관리
  │  ├─ utils.py                # OpenRouter 클라이언트, 스트림 유틸 등
  │  └─ .env                    # 주요한 환경변수 관리
  ├─ tests/
  │  ├─ test_mcp.py             # MCP 서버 연동 확인 스크립트
  │  └─ test_chat.py            # SSE 수신 예제 스크립트
  ├─ .gitignore
  ├─ docker-compose.yml         # API(5555) + Redis 구성
  ├─ Dockerfile
  ├─ README.md
  └─ requirements.txt

2) 요구 사항 및 환경 변수

  • Python 3.11+

  • OpenRouter API Key: OPENROUTER_API_KEY

  • searchapi.io API Key: SEARCHAPI_KEY (웹 검색 툴 사용 시)

.env 예시:

OPENROUTER_API_KEY=<your_openrouter_api_key>
DEFAULT_MODEL=openai/gpt-4o-mini
SEARCHAPI_KEY=<your_searchapi_key>
MCP_SERVER_ID=<your_mcp_server_id> # if you want to use tools from multiple mcp server, separate each GenOS MCP server id with comma (",") (e.g., 1, 2, ...)
GENOS_ID=<your_genos_id>
GENOS_PW=<your_genos_password>

3) 클론 및 실행

A. 레포지토리 클론

git clone https://github.com/mindsandcompany/mocking-flowise.git
cd mocking-flowise/mock_workflow

B. Docker Compose

docker compose up -d --build
# 상태 확인
docker compose ps
# 헬스체크
curl -s http://localhost:5555/health | cat
  • 컨테이너 기본 포트는 5555입니다. (Dockerfile - EXPOSE 5555docker-compose.yml - 5555:5555)

  • 코드/의존성/Dockerfile 변경 후에는 --build를 붙여 최신 상태로 반영하세요.

  • 서버 내에 같은 이름의 도커 이미지나 컨테이너가 존재할 경우 충돌이 발생할 수 있으므로 주의하세요.


4) 엔드포인트 및 SSE 규칙 확인

  • 엔드포인트: app/api/chat.py에서 @app.post("/<path>") 데코레이터로 정의된 라우트를 확인하세요.

    • 예: POST /run 또는 POST /chat 등 프로젝트 버전에 따라 다를 수 있음

  • SSE 규칙: 본 문서의 상위 가이드인 파이썬 코드 기반 FastAPI 앱과 GenOS 채팅 연동하기의 “SSE 이벤트 규칙”을 반드시 준수해야 GenOS 채팅 UI에서 올바르게 표현됩니다.

    • 아래의 app/api/chat.py를 참고하여 코드 구현하시길 바랍니다.

@app.post("/chat/stream")
async def chat_stream(req: GenerateRequest):
    queue: asyncio.Queue[str] = asyncio.Queue()
    SENTINEL = "__STREAM_DONE__"

    async def emit(
        event: Literal["token", "reasoning_token", "agentFlowExecutedData", "error", "result"],
        data: str | dict | None
      ):
        """
        GenOS 채팅 어플리케이션에 표현되기 원하는 정보를 SSE 이벤트로 전송합니다.
        """
        payload = {"event": event, "data": data}
        await queue.put(f"data: {json.dumps(payload, ensure_ascii=False)}\n\n")

    async def heartbeat():
        while True:
            await asyncio.sleep(10)
            await queue.put(": keep-alive\n\n")

    async def runner():
        try:
            """
            실제 비즈니스 로직을 이곳에 구현해 주세요.
            """
            await emit("token", "This is an example sentence.")
        except Exception as e:
            await emit("error", str(e))
        finally:
            await emit("result", None)
            await queue.put(SENTINEL)

    async def sse():
        producer = asyncio.create_task(runner())
        pinger = asyncio.create_task(heartbeat())
        try:
            while True:
                chunk = await queue.get()
                if chunk == SENTINEL:
                    break
                yield chunk
        finally:
            producer.cancel()
            pinger.cancel()

    return StreamingResponse(
        sse(), 
        media_type="text/event-stream", 
        headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}
    )
  • emit: SSE 이벤트를 클라이언트로 전송하는 함수입니다. 이벤트 이름과 데이터를 받아 JSON 형태로 직렬화하여 스트림 큐에 넣습니다.

  • heartbeat: 연결이 유지되고 있음을 알리기 위해 주기적으로(예: 10초마다) keep-alive 신호를 스트림 큐에 넣는 함수입니다.

  • runner: 실제 비즈니스 로직(예: LLM 호출 등)을 실행하고, 중간 결과나 최종 결과를 emit을 통해 스트림으로 전송하는 함수입니다. 오류 발생 시 에러 메시지도 전송하며, 마지막에 스트림 종료 신호를 보냅니다.

간단 cURL 테스트(예시):

curl -N -X POST \
  -H "Content-Type: application/json" \
  -H "x-request-from: internal" \
  -d '{"text":"hello"}' \
  http://localhost:5555/chat/stream
  • 서버는 data: {"event": string, "data": any} 형식의 라인을 스트리밍으로 전송해야 합니다.

  • agentFlowExecutedData 이벤트의 data.data.output.content에는 규칙에 맞춘 문자열(JSON 직렬화 결과)이 들어가야 합니다.


  • 스키마/실행: tools/web_search.py

    • Agent에게 전달되는 툴의 스키마: WEB_SEARCH

    • 실제 툴의 동작: web_search()

  • 서버 등록: tools/__init__.py

    • get_tool_map: 스키마에 정의된 툴 이름과 실제 실행 함수 매핑

    • get_tools_for_llm: Agent에게 전달될 도구 스키마 리스트

새 툴 추가(요약):

  1. tools/my_tool.py에 실행 함수, WEB_MY_TOOL 스키마

  2. tools/__init__.pyget_tool_map, get_tools_for_llm에 등록


6) MCP 서버 연결 및 도구 관리

.env 파일에 연결하고 싶은 MCP 서버 ID를 적으면 해당 서버에 등록된 모든 툴을 에이전트에 연결합니다.

만약 MCP 서버에 연결된 도구의 아웃풋을 후처리하여 LLM에게 전달하고 싶다면, app/mcp/mcp_tools.py 스크립트 내에 get_mcp_toolcall_mcp_tool 함수 내부에서 후처리 로직을 구현하시면 됩니다.


7) GenOS 워크플로우 연동(Python Step)

  • 워크플로우에 Python Step을 만들고, 상위 가이드의 샘플 코드에서 endpoint를 본 예시 서버의 엔드포인트로 교체해 사용하세요.

  • 본 예제를 위한 파이썬 스텝 코드:

from main_socketio import sio_server
import aiohttp, json, re
from urllib.parse import urlparse

async def run(data: dict) -> dict:
    sid = data.get('socketIOClientId')  # 프론트 소켓 ID
    endpoint = "http://192.168.74.188:1003/chat/stream"

    result = {}
    text_acc = ""

    BIG = 2**30
    async with aiohttp.ClientSession(
        timeout=aiohttp.ClientTimeout(total=None),
        read_bufsize=BIG, max_line_size=BIG
    ) as session:
        async with session.post(endpoint, json=data, headers={"x-request-from":"internal"}) as resp:
            reasoning = ""
            tool_state = None
            citation_buffer = ""
            inside_citation = False

            def replace_citation_segment(segment: str) -> str:
                # segment: "【...】" 형태. 내부의 turn:id 만 URL로 치환하고 나머지는 제거
                try:
                    if not (segment.startswith("【") and segment.endswith("】")):
                        return segment
                    body = segment[1:-1]
                    ids = re.findall(r"(\d+:\d+)", body)
                    if not ids:
                        return ""
                    id_to_url = {}
                    if isinstance(tool_state, dict):
                        id_to_url = tool_state.get("id_to_url", {}) or {}
                    mapped = [id_to_url[idv] for idv in ids if idv in id_to_url]
                    
                    def get_domain(url):
                        try:
                            # 1. URL 파싱하여 'netloc' (도메인 부분) 추출
                            parsed_url = urlparse(url)
                            domain = parsed_url.netloc

                            # 2. 'www.'가 있다면 제거
                            if domain.startswith('www.'):
                                domain = domain[4:]
                                
                            return domain
                        except Exception:
                            return "link"
                    
                    styled = [
                        f'<a href="{url}" target="_blank" class="btn__chip"> <strong>{get_domain(url)}</strong></a>'
                        for url in mapped
                    ]
                    return " ".join(styled)
                except Exception:
                    return segment
            
            async def process_token(ev_text: str):
                nonlocal citation_buffer, inside_citation, text_acc
                i = 0
                n = len(ev_text)
                while i < n:
                    if inside_citation:
                        citation_buffer += ev_text[i]
                        i += 1
                        # 버퍼에 닫힘이 생기면 한 세그먼트 처리
                        close_idx = citation_buffer.find("】")
                        if close_idx != -1:
                            segment = citation_buffer[:close_idx+1]
                            remainder = citation_buffer[close_idx+1:]
                            try:
                                if re.fullmatch(r"【\d+†chart】", segment):
                                    key = segment[1:-1]
                                    id_to_iframe = {}
                                    if isinstance(tool_state, dict):
                                        id_to_iframe = tool_state.get("id_to_iframe", {}) or {}
                                    replaced = id_to_iframe.get(key, "")
                                else:
                                    replaced = replace_citation_segment(segment)
                            except Exception:
                                replaced = ""
                            text_acc += replaced
                            if sid:
                                await sio_server.emit("token", replaced, room=sid)
                            citation_buffer = ""
                            inside_citation = False
                            if remainder:
                                # remainder 내에 추가 인용이 있을 수 있으므로 재귀적으로 처리
                                await process_token(remainder)
                            # 이 케이스는 remainder 처리까지 끝났으므로 루프 계속
                    else:
                        # 인용 시작 찾기
                        start_idx = ev_text.find("【", i)
                        if start_idx == -1:
                            chunk = ev_text[i:]
                            if chunk:
                                text_acc += chunk
                                if sid:
                                    await sio_server.emit("token", chunk, room=sid)
                            break
                        # 인용 시작 전 일반 텍스트 출력
                        if start_idx > i:
                            chunk = ev_text[i:start_idx]
                            text_acc += chunk
                            if sid:
                                await sio_server.emit("token", chunk, room=sid)
                        # 인용 시작부터 버퍼링 시작
                        inside_citation = True
                        citation_buffer = "【"
                        i = start_idx + 1
                # 루프 종료

            async for line in resp.content:
                if not line:
                    continue
                decoded = line.decode("utf-8").strip()
                if decoded.startswith("data:"):
                    decoded = decoded.removeprefix("data:")
                try:
                    payload = json.loads(decoded)
                except json.JSONDecodeError:
                    continue

                event = payload.get("event")
                ev_data = payload.get("data")

                if event == "tool_state":
                    tool_state = ev_data
                elif event == "reasoning_token":
                    reasoning += ev_data
                elif reasoning:
                    result.setdefault('agentFlowExecutedData', []).append({
                        "nodeLabel": "Visible Reasoner",
                        "data": {"output": {"content": json.dumps({"visible_rationale": reasoning}, ensure_ascii=False)}}
                    })
                    reasoning = ""
                    if sid:
                        await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
                
                if event == "token":
                    if isinstance(ev_data, str):
                        await process_token(ev_data)
                elif event == "agentFlowExecutedData":
                    result.setdefault('agentFlowExecutedData', []).append(ev_data)
                    if sid:
                        await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
                elif event == "error":
                    result["message"] = ev_data
                    result["success"] = False
                    result['statusCode'] = 500
                elif event == "result":
                    # 토큰만 왔다가 최종 result가 없으면 text로 보강
                    if text_acc and "text" not in result:
                        result["text"] = text_acc
                    if sid:
                        await sio_server.emit("result", result, room=sid)

    # 워크플로우 다음 스텝으로 넘길 데이터 머지 후 반환
    data.update(result)
    return data

위의 파이썬 스텝 코드는 스트리밍과 동시에 inline citation, 차트 삽입을 위한 로직이 샘플 코드에 추가된 것입니다.

에이전트는 미리 정의된 토큰 ("【", "】") 사이에 특정한 ID를 기입하여 inline citation이나 차트 삽입을 수행하는데, 만약 스트리밍된 토큰이 "【"로 시작한다면 GenOS 채팅에 토큰 정보를 보내는 것을 중단하고, "】"이 나왔을 때, 특수 토큰 사이의 ID를 추출해 적절한 값으로 바꾸어서 출력하는 방식으로 동작합니다.

연동 체크리스트:


8) 자주 겪는 이슈 및 해결법

  • SSE가 끊기거나 표시되지 않음

    • 서버가 data: ... 한 줄 + 빈 줄 규칙을 지키는지 확인

    • app/api/chat.pyrunner 함수 내부의 try: ... except: ... finally: ... 구문 밖에서 에러가 발생하는 경우, POST 요청이 끊기지도 않고 파이프라인 진행도 안되는 현상이 발생할 수 있습니다. 가능하다면 try: ... 구문 내에 로직을 작성해주세요.

  • UI에 툴 실행 과정이 안 보임

    • agentFlowExecutedData 이벤트를 실제로 보내는지 확인

    • data.data.output.content가 JSON 문자열인지 확인(객체가 아닌 문자열)

    • 포맷은 규칙 링크 준수

  • 키/호출 오류

    • OPENROUTER_API_KEY, SEARCHAPI_KEY 주입 여부 확인

    • 외부 API 할당량/권한 확인

  • 포트 충돌

    • Docker -p 매핑을 변경하거나, 로컬 실행 포트 조정

Last updated

Was this helpful?