Files
mas/services/backend/workflow.py
Mayne0213 89c616bca5 REFACTOR(app): change ai model to iterative
- Change AI model configuration
- Switch to iterative approach
2025-12-24 12:55:23 +09:00

115 lines
3.0 KiB
Python

"""
LangGraph Iterative Workflow
반복적 워크플로우: Planning → Research → Code → Review (최대 3회 반복)
"""
from typing import Literal
from langgraph.graph import StateGraph, END
from agents import (
AgentState,
orchestrator_node,
planning_node,
research_node,
backend_code_node,
frontend_code_node,
infrastructure_code_node,
review_node
)
def router(state: AgentState) -> Literal[
"planning",
"research",
"code_backend",
"code_frontend",
"code_infrastructure",
"review",
"end"
]:
"""
다음 에이전트 라우팅 로직
"""
current = state.get("current_agent", "orchestrator")
# 명시적으로 지정된 다음 에이전트로 이동
if current in [
"planning",
"research",
"code_backend",
"code_frontend",
"code_infrastructure",
"review"
]:
return current
# end 상태
if current == "end":
return "end"
# 기본값: planning부터 시작
return "planning"
def create_mas_workflow():
"""
MAS Iterative Workflow 생성
워크플로우:
User Request
Orchestrator → Planning → Orchestrator
Research → Orchestrator
Code (Backend/Frontend/Infrastructure) → Orchestrator
Review → Orchestrator
↓ (if not approved and iteration < 3)
Research (반복)
↓ (if approved or iteration >= 3)
End
"""
workflow = StateGraph(AgentState)
# 노드 추가
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("planning", planning_node)
workflow.add_node("research", research_node)
workflow.add_node("code_backend", backend_code_node)
workflow.add_node("code_frontend", frontend_code_node)
workflow.add_node("code_infrastructure", infrastructure_code_node)
workflow.add_node("review", review_node)
# 시작점: Orchestrator
workflow.set_entry_point("orchestrator")
# Orchestrator의 조건부 라우팅
workflow.add_conditional_edges(
"orchestrator",
router,
{
"planning": "planning",
"research": "research",
"code_backend": "code_backend",
"code_frontend": "code_frontend",
"code_infrastructure": "code_infrastructure",
"review": "review",
"end": END
}
)
# 각 에이전트는 작업 후 Orchestrator로 복귀
workflow.add_edge("planning", "orchestrator")
workflow.add_edge("research", "orchestrator")
workflow.add_edge("code_backend", "orchestrator")
workflow.add_edge("code_frontend", "orchestrator")
workflow.add_edge("code_infrastructure", "orchestrator")
# Review는 승인 여부에 따라 처리 (review_node 내부에서 current_agent 설정)
workflow.add_edge("review", "orchestrator")
return workflow.compile()
# 그래프 인스턴스 생성
mas_graph = create_mas_workflow()