이론: 어떻게 해야할까?

본 섹션은 GenOS 워크플로우에서 Flowise로 구현하기 까다로운 요구사항을 Python 코드로 직접 구현한 것(이하 "파이썬 앱")을, GenOS 채팅 어플리케이션에 연결하는 과정을 설명합니다.


배경

Flowise를 이용해 개발하다보면 Flowise에서 지원하지 않는 사소한 기능 때문에 대체 방법을 찾아가며 개발하며 발생하는 비효율과 피로감이 존재합니다. 특히 LLM 호출 config, 채팅 히스토리 관리, States 관리 등에서 많은 어려움이 존재해 워크플로우가 복잡해지면 복잡해질수록 개발에 불편함이 발생합니다. Flowise는 개발자가 이용하기에 기능이 제한적이고, 개발의 자유도를 낮춘다는 한계가 존재하기 때문에 복잡한 파이프라인을 가진 에이전트일수록 코드 기반의 개발 필요성이 강해집니다.

이전에도 "파이썬 앱" 개발 후 GenOS 채팅과 연결해 보려는 시도는 존재했습니다. 그러나 이전까지는 워크플로우에서 "파이썬 앱"을 단순 호출하는 방식으로 작동했기에 채팅 어플리케이션 상에서 추론 과정이나 최종 답변의 스트리밍이 지원되지 않는 문제가 있었습니다.

이러한 문제점으로 인해 추론 과정을 GenOS 채팅에 표시할 수 있으면서도 코드 개발의 자유성을 보장하는 방법이 필요했습니다.


GenOS Workflow와 채팅은 어떻게 동작하는가

우선 이 프로젝트에 대한 설명 이전에 GenOS 워크플로우와 채팅이 어떤 관계에 있고 어떻게 동작하는지 알아보겠습니다.

Figure 1. 관계도
  • 하나의 GenOS 채팅은 하나의 GenOS 워크플로우에 연결된다.

  • 사용자가 채팅에서 질문을 입력하면 GenOS 채팅은 연결된 워크플로우를 호출한다.

  • 워크플로우 호출은 워크플로우 내의 스텝에 따라 다음과 같이 진행된다.

    • 워크플로우의 스텝이 Flowise Step일 경우 해당되는 Flowise를 호출한다.

    • 워크플로우의 스텝이 Python Step일 경우 파이썬 코드를 실행한다.

  • 워크플로우 실행 결과를 GenOS 채팅에 전송한다.

    • Flowise Step: Flowise에서 스트리밍되는 각각의 이벤트와 데이터가 도착하면 실시간으로 socketio를 이용해 GenOS 채팅에 SSE(Server-Sent Events)를 보내고, 최종 데이터를 리턴한다.

    • Python Step: Python 코드 실행 결과를 리턴한다.

  • 도착한 SSE 정보를 정해진 규칙에 따라 GenOS 채팅에 표출한다.

Flowise Step은 Flowise에서 스트리밍되는 데이터들을 SSE 형식으로 GenOS 채팅에 전송하지만 Python Step은 최종 결과만 전송합니다. 그런데 우리가 하려는 것은 Flowise Step을 사용하지 않고 Python Step에서 "파이썬 앱"을 호출한 뒤 Flowise Step 처럼 SSE 데이터를 채팅에 전송하는 것입니다. 그러기 위해선 다음의 두 가지가 만족되어야 합니다.

  1. Python Step이 Flowise Step처럼 "파이썬 앱"으로부터 실시간 데이터를 받아와서 GenOS 채팅에 SSE를 전송해야 한다.

  2. 우리가 직접 개발하는 "파이썬 앱"이 Flowise처럼 주요한 정보를 스트리밍 해주어야 한다.


무엇을 만들 것인가

1. Python Step

"파이썬 앱"이 띄워진 엔드포인트에 POST를 보내어 StreamingResponse를 받아온 뒤, 주요한 정보를 SSE로 채팅에 전송합니다.

2. 파이썬 앱 개발

StreamingResponse로 토큰·리저닝·툴 실행 결과 등을 스트리밍 전송하는 FastAPI 기반의 앱을 개발합니다.

  • LLM 스트리밍 응답(SSE): token, reasoning_token, result, error 이벤트 전송

  • 툴 실행 노출 이벤트: agentFlowExecutedData 이벤트로 GenOS UI에 실행 과정을 노출

  • Docker 배포: 손쉬운 빌드/실행으로 운영 편의성 제공


Python Step

GenOS 워크플로우에서 Python Step을 만들고, 아래 예시 코드의 <your-end-point>를 배포한 "파이썬 앱"의 엔드포인트로 교체하여 사용합니다.

from main_socketio import sio_server
import aiohttp, json

async def run(data: dict) -> dict:
    sid = data.get('socketIOClientId')
    endpoint = "<your-end-point>"  # 예: http://localhost:6666/your-endpoint
    result = {}
    text_acc = ""

    BIG = 2**30
    async with aiohttp.ClientSession(
        timeout=aiohttp.ClientTimeout(total=None),
        read_bufsize=BIG, max_line_size=BIG
    ) as session:
        async with session.post(endpoint, json=data, headers={"x-request-from":"internal"}) as resp:
            reasoning = ""
            async for line in resp.content:
                if not line:
                    continue
                decoded = line.decode("utf-8").strip()
                if decoded.startswith("data:"):
                    decoded = decoded.removeprefix("data:")
                try:
                    payload = json.loads(decoded)
                except json.JSONDecodeError:
                    continue

                event = payload.get("event")
                ev_data = payload.get("data")

                # Event 분류 및 SSE 전송
                if event == "reasoning_token":
                    reasoning += ev_data
                elif reasoning:
                    result.setdefault('agentFlowExecutedData', []).append({
                        "nodeLabel": "Visible Reasoner",
                        "data": {"output": {"content": json.dumps({"visible_rationale": reasoning}, ensure_ascii=False)}}
                    })
                    reasoning = ""
                    if sid:
                        await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
                
                if event == "token":
                    if isinstance(ev_data, str):
                        text_acc += ev_data
                    if sid:
                        await sio_server.emit("token", ev_data, room=sid)
                elif event == "agentFlowExecutedData":
                    result.setdefault('agentFlowExecutedData', []).append(ev_data)
                    if sid:
                        await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
                elif event == "error":
                    result["message"] = ev_data
                    result["success"] = False
                    result['statusCode'] = 500
                elif event == "result":
                    if text_acc and "text" not in result:
                        result["text"] = text_acc
                    if sid:
                        await sio_server.emit("result", result, room=sid)
                
                prev_event = event

    data.update(result)
    return data

위 파이썬 코드는 GenOS 내부의 파이썬 스텝에서 동작되며, 정의된 엔드포인트로부터 async for으로 StreamingResponse를 읽어온 뒤, 이벤트를 분류하여 GenOS 채팅의 sid로 SSE 정보를 보냅니다.

SSE GenOS UI

SSE를 전송하는 코드는 await sio_server.emit(event, data, room=sid)입니다. 이때 GenOS 채팅에서는 수신되는 SSE 데이터 중 다음의 두 가지만 UI에 표출합니다.

이벤트
data 타입
설명
data 예시

token

string

모델의 일반 답변 토큰(텍스트 조각)

-

agentFlowExecutedData

object

툴 실행 결과를 UI(에이전트 플로우 노드)에 노출하기 위한 이벤트

{"nodeLabel": "Visible Query Generator", "data": {"output": {"content": "{\"visible_web_search_query\":[\"python 3.12 change\"]}"}}}

  • 중요: 이벤트가 agentFlowExecutedData인 경우, data.data.output.content에 GenOS 프론트와 약속된 규칙에 맞춘 문자열(JSON 직렬화 결과)이 들어가야 UI에 노출됩니다. 자세한 포맷은 리서치 에이전트 가이드의 규칙을 참고하세요.

  • 참고: GenOS 채팅에서 화면에 변화를 주는 eventtokenagentFlowExecutedData뿐이지만 "파이썬 앱"에서 수신되는 event에는 reasoning_token, error, result로 여러가지입니다. 이는 편의를 위해 필자가 임의로 정한 규칙이므로 참고하시기 바랍니다. 특히 reasoning_token의 경우 reasoning LLM의 추론 과정을 표출하기 위해 만든 것입니다. 실제로 위 파이썬 스텝 코드를 확인해보면 reasoning token을 모았다가 agentFlowExecutedData 이벤트로 리서치 에이전트 가이드의 규칙에 따라 visible_rationale 키 값에 넣어서 SSE 데이터를 전송한 것을 확인할 수 있습니다.


파이썬 앱 개발

코드로 구현하고 싶은 파이프라인을 개발합니다. 이때, 배포할 엔드포인트에 연결된 서버는 각 SSE 청크를 StreamingResponse로 보내주어야 합니다. 실제 SSE 라인은 아래 형식입니다.

  • 본문: data: {"event": string, "data": any} + 빈 줄

예시 이벤트 타입과 data 규칙

이벤트
data 타입
설명
data 예시

token

string

모델의 일반 답변 토큰(텍스트 조각)

-

reasoning_token

string

모델의 추론(리저닝) 토큰(텍스트 조각)

-

agentFlowExecutedData

object

툴 실행 결과를 UI(에이전트 플로우 노드)에 노출하기 위한 이벤트

{"nodeLabel": "Visible Query Generator", "data": {"output": {"content": "{\"visible_web_search_query\":[\"python 3.12 change\"]}"}}}

error

string

오류 메시지

-

result

null

최종 완료 신호

-

이 SSE 이벤트는 GenOS 채팅으로 바로 보내는 것이 아닌 워크플로우의 파이썬 스텝으로 보내는 것입니다. 따라서 위의 예시와 달리 eventdata를 자유롭게 정해도 좋으나 최종적으로 파이썬 스텝에서 규칙에 맞게 처리하여 GenOS 채팅으로 보내주어야 합니다.

Last updated

Was this helpful?