이론: 어떻게 해야할까?
본 섹션은 GenOS 워크플로우에서 Flowise로 구현하기 까다로운 요구사항을 Python 코드로 직접 구현한 것(이하 "파이썬 앱")을, GenOS 채팅 어플리케이션에 연결하는 과정을 설명합니다.
배경
Flowise를 이용해 개발하다보면 Flowise에서 지원하지 않는 사소한 기능 때문에 대체 방법을 찾아가며 개발하며 발생하는 비효율과 피로감이 존재합니다. 특히 LLM 호출 config, 채팅 히스토리 관리, States 관리 등에서 많은 어려움이 존재해 워크플로우가 복잡해지면 복잡해질수록 개발에 불편함이 발생합니다. Flowise는 개발자가 이용하기에 기능이 제한적이고, 개발의 자유도를 낮춘다는 한계가 존재하기 때문에 복잡한 파이프라인을 가진 에이전트일수록 코드 기반의 개발 필요성이 강해집니다.
이전에도 "파이썬 앱" 개발 후 GenOS 채팅과 연결해 보려는 시도는 존재했습니다. 그러나 이전까지는 워크플로우에서 "파이썬 앱"을 단순 호출하는 방식으로 작동했기에 채팅 어플리케이션 상에서 추론 과정이나 최종 답변의 스트리밍이 지원되지 않는 문제가 있었습니다.
이러한 문제점으로 인해 추론 과정을 GenOS 채팅에 표시할 수 있으면서도 코드 개발의 자유성을 보장하는 방법이 필요했습니다.
GenOS Workflow와 채팅은 어떻게 동작하는가
우선 이 프로젝트에 대한 설명 이전에 GenOS 워크플로우와 채팅이 어떤 관계에 있고 어떻게 동작하는지 알아보겠습니다.

하나의 GenOS 채팅은 하나의 GenOS 워크플로우에 연결된다.
사용자가 채팅에서 질문을 입력하면 GenOS 채팅은 연결된 워크플로우를 호출한다.
워크플로우 호출은 워크플로우 내의 스텝에 따라 다음과 같이 진행된다.
워크플로우의 스텝이 Flowise Step일 경우 해당되는 Flowise를 호출한다.
워크플로우의 스텝이 Python Step일 경우 파이썬 코드를 실행한다.
워크플로우 실행 결과를 GenOS 채팅에 전송한다.
Flowise Step: Flowise에서 스트리밍되는 각각의 이벤트와 데이터가 도착하면 실시간으로
socketio
를 이용해 GenOS 채팅에 SSE(Server-Sent Events)를 보내고, 최종 데이터를 리턴한다.Python Step: Python 코드 실행 결과를 리턴한다.
도착한 SSE 정보를 정해진 규칙에 따라 GenOS 채팅에 표출한다.
Flowise Step은 Flowise에서 스트리밍되는 데이터들을 SSE 형식으로 GenOS 채팅에 전송하지만 Python Step은 최종 결과만 전송합니다. 그런데 우리가 하려는 것은 Flowise Step을 사용하지 않고 Python Step에서 "파이썬 앱"을 호출한 뒤 Flowise Step 처럼 SSE 데이터를 채팅에 전송하는 것입니다. 그러기 위해선 다음의 두 가지가 만족되어야 합니다.
Python Step이 Flowise Step처럼 "파이썬 앱"으로부터 실시간 데이터를 받아와서 GenOS 채팅에 SSE를 전송해야 한다.
우리가 직접 개발하는 "파이썬 앱"이 Flowise처럼 주요한 정보를 스트리밍 해주어야 한다.
무엇을 만들 것인가
1. Python Step
"파이썬 앱"이 띄워진 엔드포인트에 POST를 보내어 StreamingResponse를 받아온 뒤, 주요한 정보를 SSE로 채팅에 전송합니다.
2. 파이썬 앱 개발
StreamingResponse로 토큰·리저닝·툴 실행 결과 등을 스트리밍 전송하는 FastAPI 기반의 앱을 개발합니다.
LLM 스트리밍 응답(SSE):
token
,reasoning_token
,result
,error
이벤트 전송툴 실행 노출 이벤트:
agentFlowExecutedData
이벤트로 GenOS UI에 실행 과정을 노출Docker 배포: 손쉬운 빌드/실행으로 운영 편의성 제공
Python Step
GenOS 워크플로우에서 Python Step을 만들고, 아래 예시 코드의 <your-end-point>
를 배포한 "파이썬 앱"의 엔드포인트로 교체하여 사용합니다.
from main_socketio import sio_server
import aiohttp, json
async def run(data: dict) -> dict:
sid = data.get('socketIOClientId')
endpoint = "<your-end-point>" # 예: http://localhost:6666/your-endpoint
result = {}
text_acc = ""
BIG = 2**30
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=None),
read_bufsize=BIG, max_line_size=BIG
) as session:
async with session.post(endpoint, json=data, headers={"x-request-from":"internal"}) as resp:
reasoning = ""
async for line in resp.content:
if not line:
continue
decoded = line.decode("utf-8").strip()
if decoded.startswith("data:"):
decoded = decoded.removeprefix("data:")
try:
payload = json.loads(decoded)
except json.JSONDecodeError:
continue
event = payload.get("event")
ev_data = payload.get("data")
# Event 분류 및 SSE 전송
if event == "reasoning_token":
reasoning += ev_data
elif reasoning:
result.setdefault('agentFlowExecutedData', []).append({
"nodeLabel": "Visible Reasoner",
"data": {"output": {"content": json.dumps({"visible_rationale": reasoning}, ensure_ascii=False)}}
})
reasoning = ""
if sid:
await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
if event == "token":
if isinstance(ev_data, str):
text_acc += ev_data
if sid:
await sio_server.emit("token", ev_data, room=sid)
elif event == "agentFlowExecutedData":
result.setdefault('agentFlowExecutedData', []).append(ev_data)
if sid:
await sio_server.emit("agentFlowExecutedData", result['agentFlowExecutedData'], room=sid)
elif event == "error":
result["message"] = ev_data
result["success"] = False
result['statusCode'] = 500
elif event == "result":
if text_acc and "text" not in result:
result["text"] = text_acc
if sid:
await sio_server.emit("result", result, room=sid)
prev_event = event
data.update(result)
return data
위 파이썬 코드는 GenOS 내부의 파이썬 스텝에서 동작되며, 정의된 엔드포인트로부터 async for
으로 StreamingResponse
를 읽어온 뒤, 이벤트를 분류하여 GenOS 채팅의 sid
로 SSE 정보를 보냅니다.
SSE GenOS UI
SSE를 전송하는 코드는 await sio_server.emit(event, data, room=sid)
입니다. 이때 GenOS 채팅에서는 수신되는 SSE 데이터 중 다음의 두 가지만 UI에 표출합니다.
token
string
모델의 일반 답변 토큰(텍스트 조각)
-
agentFlowExecutedData
object
툴 실행 결과를 UI(에이전트 플로우 노드)에 노출하기 위한 이벤트
{"nodeLabel": "Visible Query Generator", "data": {"output": {"content": "{\"visible_web_search_query\":[\"python 3.12 change\"]}"}}}
중요: 이벤트가
agentFlowExecutedData
인 경우,data.data.output.content
에 GenOS 프론트와 약속된 규칙에 맞춘 문자열(JSON 직렬화 결과)이 들어가야 UI에 노출됩니다. 자세한 포맷은 리서치 에이전트 가이드의 규칙을 참고하세요.참고: GenOS 채팅에서 화면에 변화를 주는
event
는token
과agentFlowExecutedData
뿐이지만 "파이썬 앱"에서 수신되는 event에는reasoning_token
,error
,result
로 여러가지입니다. 이는 편의를 위해 필자가 임의로 정한 규칙이므로 참고하시기 바랍니다. 특히reasoning_token
의 경우 reasoning LLM의 추론 과정을 표출하기 위해 만든 것입니다. 실제로 위 파이썬 스텝 코드를 확인해보면 reasoning token을 모았다가agentFlowExecutedData
이벤트로리서치 에이전트 가이드의 규칙
에 따라visible_rationale
키 값에 넣어서 SSE 데이터를 전송한 것을 확인할 수 있습니다.
파이썬 앱 개발
코드로 구현하고 싶은 파이프라인을 개발합니다. 이때, 배포할 엔드포인트에 연결된 서버는 각 SSE 청크를 StreamingResponse
로 보내주어야 합니다. 실제 SSE 라인은 아래 형식입니다.
본문:
data: {"event": string, "data": any}
+ 빈 줄
예시 이벤트 타입과 data
규칙
data
규칙token
string
모델의 일반 답변 토큰(텍스트 조각)
-
reasoning_token
string
모델의 추론(리저닝) 토큰(텍스트 조각)
-
agentFlowExecutedData
object
툴 실행 결과를 UI(에이전트 플로우 노드)에 노출하기 위한 이벤트
{"nodeLabel": "Visible Query Generator", "data": {"output": {"content": "{\"visible_web_search_query\":[\"python 3.12 change\"]}"}}}
error
string
오류 메시지
-
result
null
최종 완료 신호
-
이 SSE 이벤트는 GenOS 채팅으로 바로 보내는 것이 아닌 워크플로우의 파이썬 스텝으로 보내는 것입니다. 따라서 위의 예시와 달리
event
나data
를 자유롭게 정해도 좋으나 최종적으로 파이썬 스텝에서 규칙에 맞게 처리하여 GenOS 채팅으로 보내주어야 합니다.
Last updated
Was this helpful?