diff --git a/deploy/k8s/overlays/prod/kustomization.yaml b/deploy/k8s/overlays/prod/kustomization.yaml index ae18130..b6ff869 100644 --- a/deploy/k8s/overlays/prod/kustomization.yaml +++ b/deploy/k8s/overlays/prod/kustomization.yaml @@ -16,5 +16,5 @@ commonLabels: # 이미지 태그 설정 (ArgoCD Image Updater가 자동으로 업데이트) images: - name: gitea0213.kro.kr/bluemayne/mas - newTag: main-sha-6e40a54165d3617184471a4cc4a68c2f155c1fbe + newTag: main-sha-fc91b3cbe4395f640cd842e15569b9a6b559ef5b diff --git a/services/backend/agents.py b/services/backend/agents.py deleted file mode 100644 index 35ab7d4..0000000 --- a/services/backend/agents.py +++ /dev/null @@ -1,689 +0,0 @@ -""" -MAS (Multi-Agent System) 에이전트 정의 -""" -from typing import Annotated, Literal, TypedDict, Optional -from langchain_anthropic import ChatAnthropic -from langchain_openai import ChatOpenAI -from langgraph.graph import StateGraph, END -from langgraph.prebuilt import ToolNode -from langchain_core.messages import HumanMessage, SystemMessage -from langchain_core.tools import tool -import os -import subprocess -import json -import requests -from datetime import datetime -from kubernetes import client, config -from kubernetes.client.rest import ApiException -import psycopg2 -from urllib.parse import quote_plus - - -class AgentState(TypedDict): - """에이전트 간 공유되는 상태""" - messages: list - current_agent: str - task_type: str - result: dict - - -# ===== Kubernetes Client 초기화 ===== -try: - # Try in-cluster config first (Pod 내부에서 실행 시) - config.load_incluster_config() - print("✅ Loaded in-cluster Kubernetes config") -except config.ConfigException: - # Fallback to local kubeconfig (로컬 개발 시) - try: - config.load_kube_config() - print("✅ Loaded local Kubernetes config") - except config.ConfigException: - print("⚠️ No Kubernetes config found - K8s tools will fail") - -# Kubernetes API clients -k8s_core_v1 = client.CoreV1Api() -k8s_apps_v1 = client.AppsV1Api() -k8s_batch_v1 = client.BatchV1Api() -k8s_networking_v1 = client.NetworkingV1Api() - - -# ===== Configure all Git repositories on startup ===== -def configure_git_repositories(): - """ - Configure Git user for all repositories in /app/projects (hostPath mount). - /app/projects is mounted from host /home/ubuntu/Projects. - """ - projects_path = "/app/projects" - - if not os.path.exists(projects_path): - print(f"⚠️ Projects directory not found at {projects_path}") - print(" Make sure hostPath volume is mounted correctly") - return - - try: - # Add safe.directory to allow Git operations on mounted directories - # This is needed because the pod runs as root but files are owned by host user - subprocess.run(["git", "config", "--global", "--add", "safe.directory", "*"], - timeout=5, check=True, capture_output=True) - print("✅ Added Git safe.directory configuration") - - # Configure git user for all repositories - repos = [d for d in os.listdir(projects_path) - if os.path.isdir(os.path.join(projects_path, d)) and - os.path.exists(os.path.join(projects_path, d, ".git"))] - - if not repos: - print(f"⚠️ No git repositories found in {projects_path}") - return - - for repo in repos: - repo_path = os.path.join(projects_path, repo) - try: - subprocess.run(["git", "-C", repo_path, "config", "user.name", "mas-agent"], - timeout=5, check=True, capture_output=True) - subprocess.run(["git", "-C", repo_path, "config", "user.email", "mas-agent@mas.local"], - timeout=5, check=True, capture_output=True) - print(f"✅ Configured Git for: {repo}") - except Exception as e: - print(f"⚠️ Failed to configure Git for {repo}: {e}") - - print(f"✅ Git configuration complete for {len(repos)} repositories") - - except Exception as e: - print(f"❌ Failed to configure Git repositories: {e}") - -# Configure git on module import -configure_git_repositories() - - -# ===== Universal Tools (Bash-centric approach) ===== - -@tool -def bash_command(command: str, timeout: int = 120) -> str: - """ - Execute any bash command in the container. - - Examples: - - kubectl get pods -n mas - - cat /app/projects/portfolio/README.md - - git -C /app/projects/mas status - - npm test - - python script.py - - psql -U bluemayne -c 'SELECT * FROM users' - - Args: - command: The bash command to execute - timeout: Timeout in seconds (default: 120) - - Returns: - Command output (stdout and stderr) - """ - try: - result = subprocess.run( - command, - shell=True, - capture_output=True, - text=True, - timeout=timeout, - cwd="/app" - ) - - output = "" - if result.returncode == 0: - output = f"✅ Success (exit code: 0)\n\n{result.stdout}" - else: - output = f"❌ Failed (exit code: {result.returncode})\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}" - - return output - except subprocess.TimeoutExpired: - return f"❌ Command timed out after {timeout} seconds" - except Exception as e: - return f"❌ Error executing command: {str(e)}" - - -@tool -def read_file(file_path: str, max_lines: int = 1000) -> str: - """ - Read a file from the filesystem. - - Args: - file_path: Absolute path to the file (e.g., /app/projects/portfolio/README.md) - max_lines: Maximum number of lines to read (default: 1000) - - Returns: - File contents - """ - try: - with open(file_path, 'r', encoding='utf-8') as f: - lines = f.readlines() - if len(lines) > max_lines: - content = ''.join(lines[:max_lines]) - return f"📄 {file_path} (showing first {max_lines} of {len(lines)} lines):\n\n{content}\n\n... (truncated)" - else: - return f"📄 {file_path}:\n\n{''.join(lines)}" - except FileNotFoundError: - return f"❌ File not found: {file_path}" - except Exception as e: - return f"❌ Error reading file: {str(e)}" - - -@tool -def write_file(file_path: str, content: str) -> str: - """ - Write content to a file. - - Args: - file_path: Absolute path to the file - content: Content to write - - Returns: - Success or error message - """ - try: - import os - os.makedirs(os.path.dirname(file_path), exist_ok=True) - - with open(file_path, 'w', encoding='utf-8') as f: - f.write(content) - - return f"✅ Successfully wrote {len(content)} characters to {file_path}" - except Exception as e: - return f"❌ Error writing file: {str(e)}" - - -# MCP Tools Collection -# Read-only tools (available to ALL agents including Groq) -# ===== Universal Tools (Bash-centric, Claude Code style) ===== -# All agents get the same 3 tools. Behavior is controlled by prompts, not tool restrictions. - -universal_tools = [ - bash_command, # Execute any bash command (kubectl, git, npm, python, etc.) - read_file, # Read files (convenience wrapper for 'cat') - write_file, # Write files (convenience wrapper for 'echo >') -] - - -# ===== 1. Claude Code - Orchestrator ===== -claude_orchestrator = ChatAnthropic( - model="claude-sonnet-4-5", # Latest Claude Sonnet 4.5 (Sep 2025) - api_key=os.getenv("ANTHROPIC_API_KEY"), - temperature=0 -).bind_tools(universal_tools) # Bash-centric: bash, read, write - -ORCHESTRATOR_PROMPT = """당신은 MAS의 총괄 조율자이자 DevOps 전문가입니다. - -**역할**: -- 사용자 요청을 분석하여 적절한 에이전트에게 작업 할당 -- Kubernetes, ArgoCD, Helm, Kustomize 관리 -- CI/CD 파이프라인 구성 -- 최종 코드 리뷰 및 승인 - -**사용 가능한 에이전트**: -1. backend_developer: FastAPI, Node.js 백엔드 개발 -2. frontend_developer: Next.js, React 프론트엔드 개발 -3. sre_specialist: 모니터링, 성능 최적화, 보안 -4. yaml_manager: Kubernetes YAML 파일 생성 및 관리, Git 배포 - -**사용 가능한 도구 (3개만 - 단순하고 강력함)**: - -1. **bash_command(command, timeout)** - 가장 중요! 모든 것을 할 수 있음 - 예시: - - `bash_command("kubectl get pods -n mas")` - Kubernetes 조회 - - `bash_command("cat /app/projects/portfolio/README.md")` - 파일 읽기 - - `bash_command("ls /app/projects")` - 디렉토리 목록 - - `bash_command("git -C /app/projects/mas status")` - Git 상태 - - `bash_command("psql -U bluemayne -d mas -c 'SELECT * FROM users'")` - DB 쿼리 - - `bash_command("curl http://prometheus:9090/api/v1/query?query=up")` - Prometheus - - `bash_command("npm test")` - 테스트 실행 - - `bash_command("python script.py")` - Python 실행 - -2. **read_file(file_path, max_lines)** - 파일 읽기 (편의성) - 예시: `read_file("/app/projects/portfolio/README.md")` - -3. **write_file(file_path, content)** - 파일 쓰기 (편의성) - 예시: `write_file("/app/projects/test.txt", "내용")` - -**중요 경로**: -- `/app/projects/`: 모든 Git 레포지토리 (portfolio, mas, cluster-infrastructure 등 11개) -- `/app/`: 현재 작업 디렉토리 - -**사용 방법**: -- **bash_command를 적극 활용**하세요. kubectl, git, cat, ls, npm, python 등 모든 CLI 도구 사용 가능 -- 파일을 읽을 때는 read_file 또는 `bash_command("cat file")` -- 추측하지 말고, 도구를 통해 실제 데이터를 확인하세요 -- 복잡한 작업은 여러 bash 명령을 순차적으로 실행하세요 - -요청을 분석하고 필요한 도구를 사용한 후, 적절한 에이전트에게 작업을 할당하세요. -""" - - -# ===== 2. Groq #1 - Backend Developer ===== -# Groq OpenAI-compatible endpoint -GROQ_API_BASE = os.getenv("GROQ_API_BASE", "https://api.groq.com/openai/v1") -GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") - -groq_backend = ChatOpenAI( - model=os.getenv("GROQ_BACKEND_MODEL", "llama-3.3-70b-specdec"), - base_url=GROQ_API_BASE, - api_key=GROQ_API_KEY, - temperature=0.7, -).bind_tools(universal_tools) # Bash-centric: bash, read, write - -BACKEND_PROMPT = """당신은 백엔드 개발 전문가입니다. - -**역할**: -- FastAPI, Node.js 백엔드 개발 -- REST API 설계 및 구현 -- 데이터베이스 쿼리 최적화 -- 비즈니스 로직 구현 - -요청된 백엔드 작업을 수행하고 코드를 생성하세요. -""" - - -# ===== 3. Groq #2 - Frontend Developer ===== -groq_frontend = ChatOpenAI( - model=os.getenv("GROQ_FRONTEND_MODEL", "llama-3.1-8b-instant"), - base_url=GROQ_API_BASE, - api_key=GROQ_API_KEY, - temperature=0.7, -).bind_tools(universal_tools) # Bash-centric: bash, read, write - -FRONTEND_PROMPT = """당신은 프론트엔드 개발 전문가입니다. - -**역할**: -- Next.js, React 컴포넌트 개발 -- UI/UX 구현 -- 상태 관리 -- 반응형 디자인 - -요청된 프론트엔드 작업을 수행하고 코드를 생성하세요. -""" - - -# ===== 4. Groq #3 - SRE Specialist ===== -groq_sre = ChatOpenAI( - model=os.getenv("GROQ_SRE_MODEL", "llama-3.1-8b-instant"), - base_url=GROQ_API_BASE, - api_key=GROQ_API_KEY, - temperature=0.3, -).bind_tools(universal_tools) # Bash-centric: bash, read, write - -SRE_PROMPT = """당신은 SRE(Site Reliability Engineer) 전문가입니다. - -**역할**: -- 시스템 모니터링 (Prometheus, Grafana, Loki) -- 로그 분석 및 알람 설정 -- 성능 튜닝 -- 보안 취약점 점검 - -**중요한 원칙**: -- 실제 시스템 메트릭이나 로그에 접근할 수 없으므로 추측하지 마세요 -- 구체적인 확인이 필요한 경우 "kubectl logs", "kubectl top" 등의 명령어를 제안하세요 -- 일반적인 모범 사례와 트러블슈팅 가이드를 제공하세요 - -요청된 SRE 작업을 수행하고 솔루션을 제시하세요. -""" - - -# ===== 5. Groq #4 - YAML Manager ===== -groq_yaml_manager = ChatOpenAI( - model=os.getenv("GROQ_YAML_MODEL", "llama-3.3-70b-specdec"), - base_url=GROQ_API_BASE, - api_key=GROQ_API_KEY, - temperature=0.3, -).bind_tools(universal_tools) # Bash-centric: bash, read, write - -YAML_MANAGER_PROMPT = """당신은 Kubernetes YAML 파일 관리 및 자동 배포 전문가입니다. - -**역할**: -- Kubernetes 애플리케이션 완전 자동 배포 -- YAML 파일 생성 (Deployment, Service, Ingress) -- ArgoCD Application 자동 생성 및 설정 -- Git 저장소에 자동 커밋 및 푸시 -- 배포 상태 모니터링 및 보고 - -**🌟 추천 도구: yaml_deploy_application** -새로운 애플리케이션을 배포할 때는 **yaml_deploy_application**을 사용하세요. -이 도구는 모든 것을 자동으로 처리합니다: -- ✅ Deployment, Service, Ingress YAML 생성 -- ✅ ArgoCD Application 생성 (auto-sync 활성화) -- ✅ Git commit & push -- ✅ 배포 요약 및 다음 단계 안내 - -**사용 예시**: -``` -사용자: "myapp을 배포하고 싶어. 이미지는 nginx:latest, 포트 80, myapp.example.com으로 접속" - -→ yaml_deploy_application( - app_name="myapp", - image="nginx:latest", - port=80, - host="myapp.example.com" -) -``` - -**개별 도구**: -- yaml_create_deployment: Deployment만 생성 -- yaml_create_service: Service만 생성 -- yaml_create_ingress: Ingress만 생성 -- yaml_create_argocd_application: ArgoCD Application만 생성 -- yaml_apply_to_cluster: 생성된 YAML을 클러스터에 직접 적용 -- git_show_file_changes: Git 변경사항 확인 -- git_push: Git 푸시 - -**작업 흐름**: -1. 사용자 요구사항 분석 (앱 이름, 이미지, 포트, 도메인) -2. yaml_deploy_application 실행 (한 번에 모두 처리!) -3. 결과 확인 및 사용자에게 보고 -4. 필요시 추가 설정 (환경 변수, 리소스 제한 등) - -**중요**: -- ArgoCD Application은 자동으로 Git 저장소를 모니터링 -- Git push 후 약 30초 내에 자동 배포 시작 -- Auto-sync가 활성화되어 있어 Git 변경사항이 자동 반영됨 - -요청된 배포 작업을 수행하세요. -""" - - -def orchestrator_node(state: AgentState) -> AgentState: - """Claude Code - 작업 분석 및 할당 (도구 사용 가능)""" - messages = state["messages"] - - # Claude 호출 - response = claude_orchestrator.invoke([ - SystemMessage(content=ORCHESTRATOR_PROMPT), - HumanMessage(content=messages[-1]["content"]) - ]) - - # 도구 호출이 있는 경우 처리 - tool_outputs = [] - if hasattr(response, 'tool_calls') and response.tool_calls: - for tool_call in response.tool_calls: - tool_name = tool_call['name'] - tool_args = tool_call.get('args', {}) - - # 도구 실행 - try: - tool_func = next(t for t in mcp_tools if t.name == tool_name) - tool_result = tool_func.invoke(tool_args) - tool_outputs.append(f"\n🔧 **{tool_name}**: {tool_result}") - except Exception as e: - tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") - - # 도구 결과를 포함하여 다시 Claude 호출 - if tool_outputs: - tool_context = "\n".join(tool_outputs) - response = claude_orchestrator.invoke([ - SystemMessage(content=ORCHESTRATOR_PROMPT), - HumanMessage(content=messages[-1]["content"]), - HumanMessage(content=f"도구 실행 결과:\n{tool_context}") - ]) - - # 응답 내용 추출 - content = response.content if isinstance(response.content, str) else str(response.content) - - # 도구 출력 추가 - if tool_outputs: - content = "\n".join(tool_outputs) + "\n\n" + content - - # 작업 타입 결정 - content_lower = content.lower() - if "yaml" in content_lower or "deployment" in content_lower or "kubernetes" in content_lower or "k8s" in content_lower or "manifests" in content_lower: - next_agent = "yaml_manager" - elif "backend" in content_lower or "api" in content_lower or "fastapi" in content_lower: - next_agent = "backend_developer" - elif "frontend" in content_lower or "ui" in content_lower or "react" in content_lower: - next_agent = "frontend_developer" - elif "monitoring" in content_lower or "performance" in content_lower or "sre" in content_lower: - next_agent = "sre_specialist" - else: - next_agent = "orchestrator" # 자신이 직접 처리 - - state["messages"].append({ - "role": "orchestrator", - "content": content - }) - state["current_agent"] = next_agent - - return state - - -def backend_node(state: AgentState) -> AgentState: - """Groq #1 - 백엔드 개발""" - messages = state["messages"] - - response = groq_backend.invoke([ - SystemMessage(content=BACKEND_PROMPT), - HumanMessage(content=messages[-1]["content"]) - ]) - - # Handle tool calls if any - tool_outputs = [] - if hasattr(response, 'tool_calls') and response.tool_calls: - for tool_call in response.tool_calls: - tool_name = tool_call['name'] - tool_args = tool_call.get('args', {}) - - try: - tool_func = next(t for t in universal_tools if t.name == tool_name) - tool_result = tool_func.invoke(tool_args) - tool_outputs.append(f"\n🔧 **{tool_name}**: {tool_result}") - except Exception as e: - tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") - - # Call agent again with tool results - if tool_outputs: - tool_context = "\n".join(tool_outputs) - response = groq_backend.invoke([ - SystemMessage(content=BACKEND_PROMPT), - HumanMessage(content=messages[-1]["content"]), - HumanMessage(content=f"도구 실행 결과:\n{tool_context}") - ]) - - content = response.content if isinstance(response.content, str) else str(response.content) - if tool_outputs: - content = "\n".join(tool_outputs) + "\n\n" + content - - state["messages"].append({ - "role": "backend_developer", - "content": content - }) - state["current_agent"] = "orchestrator" - - return state - - -def frontend_node(state: AgentState) -> AgentState: - """Groq #2 - 프론트엔드 개발""" - messages = state["messages"] - - response = groq_frontend.invoke([ - SystemMessage(content=FRONTEND_PROMPT), - HumanMessage(content=messages[-1]["content"]) - ]) - - # Handle tool calls if any - tool_outputs = [] - if hasattr(response, 'tool_calls') and response.tool_calls: - for tool_call in response.tool_calls: - tool_name = tool_call['name'] - tool_args = tool_call.get('args', {}) - - try: - tool_func = next(t for t in universal_tools if t.name == tool_name) - tool_result = tool_func.invoke(tool_args) - tool_outputs.append(f"\n🔧 **{tool_name}**: {tool_result}") - except Exception as e: - tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") - - # Call agent again with tool results - if tool_outputs: - tool_context = "\n".join(tool_outputs) - response = groq_frontend.invoke([ - SystemMessage(content=FRONTEND_PROMPT), - HumanMessage(content=messages[-1]["content"]), - HumanMessage(content=f"도구 실행 결과:\n{tool_context}") - ]) - - content = response.content if isinstance(response.content, str) else str(response.content) - if tool_outputs: - content = "\n".join(tool_outputs) + "\n\n" + content - - state["messages"].append({ - "role": "frontend_developer", - "content": content - }) - state["current_agent"] = "orchestrator" - - return state - - -def sre_node(state: AgentState) -> AgentState: - """Groq #3 - SRE 작업""" - messages = state["messages"] - - response = groq_sre.invoke([ - SystemMessage(content=SRE_PROMPT), - HumanMessage(content=messages[-1]["content"]) - ]) - - # Handle tool calls if any - tool_outputs = [] - if hasattr(response, 'tool_calls') and response.tool_calls: - for tool_call in response.tool_calls: - tool_name = tool_call['name'] - tool_args = tool_call.get('args', {}) - - try: - tool_func = next(t for t in universal_tools if t.name == tool_name) - tool_result = tool_func.invoke(tool_args) - tool_outputs.append(f"\n🔧 **{tool_name}**: {tool_result}") - except Exception as e: - tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") - - # Call agent again with tool results - if tool_outputs: - tool_context = "\n".join(tool_outputs) - response = groq_sre.invoke([ - SystemMessage(content=SRE_PROMPT), - HumanMessage(content=messages[-1]["content"]), - HumanMessage(content=f"도구 실행 결과:\n{tool_context}") - ]) - - content = response.content if isinstance(response.content, str) else str(response.content) - if tool_outputs: - content = "\n".join(tool_outputs) + "\n\n" + content - - state["messages"].append({ - "role": "sre_specialist", - "content": content - }) - state["current_agent"] = "orchestrator" - - return state - - -def yaml_manager_node(state: AgentState) -> AgentState: - """Groq #4 - YAML Manager""" - messages = state["messages"] - - response = groq_yaml_manager.invoke([ - SystemMessage(content=YAML_MANAGER_PROMPT), - HumanMessage(content=messages[-1]["content"]) - ]) - - # Handle tool calls if any - tool_outputs = [] - if hasattr(response, 'tool_calls') and response.tool_calls: - for tool_call in response.tool_calls: - tool_name = tool_call['name'] - tool_args = tool_call.get('args', {}) - - # Execute tool - try: - tool_func = next(t for t in universal_tools if t.name == tool_name) - tool_result = tool_func.invoke(tool_args) - tool_outputs.append(f"\n🔧 **{tool_name}**: {tool_result}") - except Exception as e: - tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") - - # Call agent again with tool results - if tool_outputs: - tool_context = "\n".join(tool_outputs) - response = groq_yaml_manager.invoke([ - SystemMessage(content=YAML_MANAGER_PROMPT), - HumanMessage(content=messages[-1]["content"]), - HumanMessage(content=f"도구 실행 결과:\n{tool_context}") - ]) - - content = response.content if isinstance(response.content, str) else str(response.content) - - # Add tool outputs to content - if tool_outputs: - content = "\n".join(tool_outputs) + "\n\n" + content - - state["messages"].append({ - "role": "yaml_manager", - "content": content - }) - state["current_agent"] = "orchestrator" - - return state - - -def router(state: AgentState) -> Literal["backend_developer", "frontend_developer", "sre_specialist", "yaml_manager", "end"]: - """다음 에이전트 라우팅""" - current = state.get("current_agent", "orchestrator") - - if current == "backend_developer": - return "backend_developer" - elif current == "frontend_developer": - return "frontend_developer" - elif current == "sre_specialist": - return "sre_specialist" - elif current == "yaml_manager": - return "yaml_manager" - else: - return "end" - - -# ===== LangGraph 워크플로우 구성 ===== -def create_mas_graph(): - """MAS 워크플로우 그래프 생성""" - workflow = StateGraph(AgentState) - - # 노드 추가 - workflow.add_node("orchestrator", orchestrator_node) - workflow.add_node("backend_developer", backend_node) - workflow.add_node("frontend_developer", frontend_node) - workflow.add_node("sre_specialist", sre_node) - workflow.add_node("yaml_manager", yaml_manager_node) - - # 엣지 정의 - workflow.set_entry_point("orchestrator") - workflow.add_conditional_edges( - "orchestrator", - router, - { - "backend_developer": "backend_developer", - "frontend_developer": "frontend_developer", - "sre_specialist": "sre_specialist", - "yaml_manager": "yaml_manager", - "end": END - } - ) - - # 각 에이전트는 작업 후 orchestrator로 복귀 - workflow.add_edge("backend_developer", "orchestrator") - workflow.add_edge("frontend_developer", "orchestrator") - workflow.add_edge("sre_specialist", "orchestrator") - workflow.add_edge("yaml_manager", "orchestrator") - - return workflow.compile() - - -# 그래프 인스턴스 생성 -mas_graph = create_mas_graph() - diff --git a/services/backend/agents/__init__.py b/services/backend/agents/__init__.py new file mode 100644 index 0000000..d8c68cb --- /dev/null +++ b/services/backend/agents/__init__.py @@ -0,0 +1,22 @@ +""" +MAS Agents Package +""" +from .state import AgentState +from .orchestrator import orchestrator_node +from .planning_agent import planning_node +from .research_agent import research_node +from .code_backend_agent import backend_code_node +from .code_frontend_agent import frontend_code_node +from .code_infrastructure_agent import infrastructure_code_node +from .review_agent import review_node + +__all__ = [ + 'AgentState', + 'orchestrator_node', + 'planning_node', + 'research_node', + 'backend_code_node', + 'frontend_code_node', + 'infrastructure_code_node', + 'review_node', +] diff --git a/services/backend/agents/code_backend_agent.py b/services/backend/agents/code_backend_agent.py new file mode 100644 index 0000000..516e960 --- /dev/null +++ b/services/backend/agents/code_backend_agent.py @@ -0,0 +1,124 @@ +""" +Backend Code Agent (Groq) +백엔드 코드 작성/수정 전문 (FastAPI, Node.js, Database) +""" +from langchain_openai import ChatOpenAI +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +from tools.bash_tool import bash_tools +import os + + +# Groq 모델 초기화 +groq_backend = ChatOpenAI( + model="llama-3.3-70b-versatile", + api_key=os.getenv("GROQ_API_KEY"), + base_url="https://api.groq.com/openai/v1", + temperature=0.5 +) + + +BACKEND_PROMPT = """당신은 Multi-Agent System의 **Backend Code Agent**입니다. + +## 역할 +- FastAPI, Node.js 백엔드 코드 작성 +- RESTful API 설계 및 구현 +- 데이터베이스 스키마 설계 및 마이그레이션 +- ORM (SQLAlchemy, Prisma 등) 활용 +- 인증/인가 로직 구현 + +## 기술 스택 +- Python: FastAPI, SQLAlchemy, Pydantic +- Node.js: Express, NestJS, Prisma +- Database: PostgreSQL, Redis +- Tools: execute_bash로 모든 작업 수행 가능 + +## 코드 작성 가이드라인 +1. **코드 품질**: + - 타입 힌트 사용 (Python) / TypeScript 사용 (Node.js) + - 명확한 함수/변수명 + - 적절한 에러 처리 + +2. **보안**: + - SQL Injection 방지 + - XSS 방지 + - 비밀번호 해싱 + - JWT 토큰 검증 + +3. **성능**: + - 효율적인 쿼리 + - 캐싱 활용 (Redis) + - 비동기 처리 + +## execute_bash 활용 예시: +- 파일 생성: execute_bash("cat > /app/repos/project/api/users.py << 'EOF'\\n코드내용\\nEOF") +- Git 커밋: execute_bash("cd /app/repos/project && git add . && git commit -m 'Add user API'") +- 테스트 실행: execute_bash("cd /app/repos/project && pytest tests/") +- DB 마이그레이션: execute_bash("cd /app/repos/project && alembic upgrade head") + +## 출력 형식 +생성한 파일 목록과 간단한 설명을 제공하세요. +""" + + +def backend_code_node(state: AgentState) -> AgentState: + """ + Backend Code 노드: 백엔드 코드 작성 + """ + messages = state["messages"] + task_plan = state.get("task_plan", {}) + research_data = state.get("research_data", {}) + + # Groq에 bash 도구 바인딩 + groq_with_tools = groq_backend.bind_tools(bash_tools) + + # 코드 작성 요청 구성 + code_request = f""" +작업 계획: {task_plan.get('summary', '')} +수집된 정보: {research_data.get('summary', '')} + +다음 백엔드 코드를 작성해주세요. +""" + + # Groq 호출 + response = groq_with_tools.invoke([ + SystemMessage(content=BACKEND_PROMPT), + HumanMessage(content=code_request) + ]) + + # Tool calls 처리 + tool_outputs = [] + if hasattr(response, 'tool_calls') and response.tool_calls: + for tool_call in response.tool_calls: + tool_name = tool_call['name'] + tool_args = tool_call.get('args', {}) + + try: + tool_func = bash_tools[0] + tool_result = tool_func.invoke(tool_args) + tool_outputs.append(f"\n🔧 **{tool_name}({tool_args.get('command', '')[:50]}...)**:\n{tool_result}") + except Exception as e: + tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") + + # Tool 결과와 함께 재호출 + if tool_outputs: + tool_context = "\n".join(tool_outputs) + response = groq_backend.invoke([ + SystemMessage(content=BACKEND_PROMPT), + HumanMessage(content=code_request), + HumanMessage(content=f"도구 실행 결과:\n{tool_context}\n\n작업 결과를 요약해주세요.") + ]) + + content = response.content + if tool_outputs: + content = "\n".join(tool_outputs) + "\n\n" + content + + # 상태 업데이트 + state["code_outputs"]["backend"] = content + state["messages"].append({ + "role": "backend_developer", + "content": content + }) + state["current_agent"] = "orchestrator" + + return state diff --git a/services/backend/agents/code_frontend_agent.py b/services/backend/agents/code_frontend_agent.py new file mode 100644 index 0000000..dcf9318 --- /dev/null +++ b/services/backend/agents/code_frontend_agent.py @@ -0,0 +1,123 @@ +""" +Frontend Code Agent (Groq) +프론트엔드 코드 작성/수정 전문 (React, Next.js, Vue) +""" +from langchain_openai import ChatOpenAI +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +from tools.bash_tool import bash_tools +import os + + +# Groq 모델 초기화 +groq_frontend = ChatOpenAI( + model="llama-3.3-70b-versatile", + api_key=os.getenv("GROQ_API_KEY"), + base_url="https://api.groq.com/openai/v1", + temperature=0.5 +) + + +FRONTEND_PROMPT = """당신은 Multi-Agent System의 **Frontend Code Agent**입니다. + +## 역할 +- React, Next.js, Vue 프론트엔드 코드 작성 +- 반응형 UI/UX 구현 +- 상태 관리 (Redux, Zustand, Pinia 등) +- API 연동 및 데이터 페칭 +- CSS/Tailwind를 활용한 스타일링 + +## 기술 스택 +- React: TypeScript, Hooks, Context API +- Next.js: App Router, Server Components +- Vue: Composition API, Pinia +- Styling: Tailwind CSS, CSS Modules, Styled Components +- UI Libraries: shadcn/ui, Ant Design, Material-UI + +## 코드 작성 가이드라인 +1. **코드 품질**: + - TypeScript 사용 + - 컴포넌트 재사용성 + - Props 타입 정의 + +2. **성능**: + - 메모이제이션 (useMemo, useCallback) + - 코드 스플리팅 + - 이미지 최적화 + +3. **접근성**: + - 시맨틱 HTML + - ARIA 속성 + - 키보드 네비게이션 + +## execute_bash 활용 예시: +- 컴포넌트 생성: execute_bash("cat > /app/repos/project/src/components/UserCard.tsx << 'EOF'\\n코드\\nEOF") +- 스타일 추가: execute_bash("cat > /app/repos/project/src/styles/UserCard.module.css << 'EOF'\\n스타일\\nEOF") +- 빌드 테스트: execute_bash("cd /app/repos/project && npm run build") + +## 출력 형식 +생성한 컴포넌트/파일 목록과 사용 방법을 설명하세요. +""" + + +def frontend_code_node(state: AgentState) -> AgentState: + """ + Frontend Code 노드: 프론트엔드 코드 작성 + """ + messages = state["messages"] + task_plan = state.get("task_plan", {}) + research_data = state.get("research_data", {}) + + # Groq에 bash 도구 바인딩 + groq_with_tools = groq_frontend.bind_tools(bash_tools) + + # 코드 작성 요청 구성 + code_request = f""" +작업 계획: {task_plan.get('summary', '')} +수집된 정보: {research_data.get('summary', '')} + +다음 프론트엔드 코드를 작성해주세요. +""" + + # Groq 호출 + response = groq_with_tools.invoke([ + SystemMessage(content=FRONTEND_PROMPT), + HumanMessage(content=code_request) + ]) + + # Tool calls 처리 + tool_outputs = [] + if hasattr(response, 'tool_calls') and response.tool_calls: + for tool_call in response.tool_calls: + tool_name = tool_call['name'] + tool_args = tool_call.get('args', {}) + + try: + tool_func = bash_tools[0] + tool_result = tool_func.invoke(tool_args) + tool_outputs.append(f"\n🔧 **{tool_name}({tool_args.get('command', '')[:50]}...)**:\n{tool_result}") + except Exception as e: + tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") + + # Tool 결과와 함께 재호출 + if tool_outputs: + tool_context = "\n".join(tool_outputs) + response = groq_frontend.invoke([ + SystemMessage(content=FRONTEND_PROMPT), + HumanMessage(content=code_request), + HumanMessage(content=f"도구 실행 결과:\n{tool_context}\n\n작업 결과를 요약해주세요.") + ]) + + content = response.content + if tool_outputs: + content = "\n".join(tool_outputs) + "\n\n" + content + + # 상태 업데이트 + state["code_outputs"]["frontend"] = content + state["messages"].append({ + "role": "frontend_developer", + "content": content + }) + state["current_agent"] = "orchestrator" + + return state diff --git a/services/backend/agents/code_infrastructure_agent.py b/services/backend/agents/code_infrastructure_agent.py new file mode 100644 index 0000000..b684880 --- /dev/null +++ b/services/backend/agents/code_infrastructure_agent.py @@ -0,0 +1,123 @@ +""" +Infrastructure Code Agent (Groq) +인프라/DevOps 코드 작성 전문 (Kubernetes, YAML, Docker) +""" +from langchain_openai import ChatOpenAI +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +from tools.bash_tool import bash_tools +import os + + +# Groq 모델 초기화 +groq_infrastructure = ChatOpenAI( + model="llama-3.3-70b-versatile", + api_key=os.getenv("GROQ_API_KEY"), + base_url="https://api.groq.com/openai/v1", + temperature=0.3 # 인프라는 더 정확하게 +) + + +INFRASTRUCTURE_PROMPT = """당신은 Multi-Agent System의 **Infrastructure Code Agent**입니다. + +## 역할 +- Kubernetes Deployment, Service, Ingress YAML 작성 +- Docker 컨테이너 설정 +- CI/CD 파이프라인 구성 +- 모니터링 및 로깅 설정 +- ArgoCD, Tekton 등 GitOps 도구 활용 + +## 기술 스택 +- Kubernetes: Deployment, Service, Ingress, ConfigMap, Secret +- Helm Charts +- Docker & Dockerfile +- ArgoCD, Tekton +- Prometheus, Grafana + +## YAML 작성 가이드라인 +1. **구조**: + - 명확한 네임스페이스 분리 + - Label/Selector 일관성 + - Resource limits/requests 설정 + +2. **보안**: + - Secret 사용 + - RBAC 설정 + - Network Policy + +3. **모니터링**: + - Liveness/Readiness Probe + - Prometheus ServiceMonitor + - Logging 설정 + +## execute_bash 활용 예시: +- Deployment YAML: execute_bash("cat > /app/repos/cluster-infrastructure/apps/myapp/deployment.yaml << 'EOF'\\nYAML내용\\nEOF") +- kubectl apply: execute_bash("kubectl apply -f /app/repos/cluster-infrastructure/apps/myapp/") +- ArgoCD sync: execute_bash("kubectl apply -f /app/repos/cluster-infrastructure/argocd/myapp.yaml") + +## 출력 형식 +생성한 YAML 파일 목록과 배포 방법을 설명하세요. +""" + + +def infrastructure_code_node(state: AgentState) -> AgentState: + """ + Infrastructure Code 노드: 인프라 코드 작성 + """ + messages = state["messages"] + task_plan = state.get("task_plan", {}) + research_data = state.get("research_data", {}) + + # Groq에 bash 도구 바인딩 + groq_with_tools = groq_infrastructure.bind_tools(bash_tools) + + # 코드 작성 요청 구성 + code_request = f""" +작업 계획: {task_plan.get('summary', '')} +수집된 정보: {research_data.get('summary', '')} + +다음 인프라 코드/YAML을 작성해주세요. +""" + + # Groq 호출 + response = groq_with_tools.invoke([ + SystemMessage(content=INFRASTRUCTURE_PROMPT), + HumanMessage(content=code_request) + ]) + + # Tool calls 처리 + tool_outputs = [] + if hasattr(response, 'tool_calls') and response.tool_calls: + for tool_call in response.tool_calls: + tool_name = tool_call['name'] + tool_args = tool_call.get('args', {}) + + try: + tool_func = bash_tools[0] + tool_result = tool_func.invoke(tool_args) + tool_outputs.append(f"\n🔧 **{tool_name}({tool_args.get('command', '')[:50]}...)**:\n{tool_result}") + except Exception as e: + tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") + + # Tool 결과와 함께 재호출 + if tool_outputs: + tool_context = "\n".join(tool_outputs) + response = groq_infrastructure.invoke([ + SystemMessage(content=INFRASTRUCTURE_PROMPT), + HumanMessage(content=code_request), + HumanMessage(content=f"도구 실행 결과:\n{tool_context}\n\n작업 결과를 요약해주세요.") + ]) + + content = response.content + if tool_outputs: + content = "\n".join(tool_outputs) + "\n\n" + content + + # 상태 업데이트 + state["code_outputs"]["infrastructure"] = content + state["messages"].append({ + "role": "infrastructure_engineer", + "content": content + }) + state["current_agent"] = "orchestrator" + + return state diff --git a/services/backend/agents/orchestrator.py b/services/backend/agents/orchestrator.py new file mode 100644 index 0000000..47b8ff8 --- /dev/null +++ b/services/backend/agents/orchestrator.py @@ -0,0 +1,106 @@ +""" +Orchestrator Agent (Claude 4.5) +전체 조율 및 최종 의사결정 +""" +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +import os + + +# Claude 4.5 모델 초기화 +claude_orchestrator = ChatAnthropic( + model="claude-sonnet-4-20250514", + api_key=os.getenv("ANTHROPIC_API_KEY"), + temperature=0.7 +) + + +ORCHESTRATOR_PROMPT = """당신은 Multi-Agent System의 **총괄 조율자(Orchestrator)**입니다. + +## 역할 +- 사용자 요청을 분석하고 적절한 에이전트에게 작업 위임 +- 각 에이전트의 결과를 검토하고 다음 단계 결정 +- 최종 출력물의 품질 보증 +- 에러 발생 시 복구 전략 수립 + +## 워크플로우 +1. 사용자 요청 분석 +2. Planning Agent에게 작업 계획 수립 요청 +3. 계획에 따라 Research → Code → Review 순환 관리 +4. Review Agent 피드백 기반 재작업 여부 결정 (최대 3회 반복) +5. 최종 승인 시 사용자에게 결과 전달 + +## 다음 단계 결정 기준 +- planning: 아직 계획이 없는 경우 +- research: 정보 수집이 필요한 경우 +- code_backend: 백엔드 코드 작성 필요 +- code_frontend: 프론트엔드 코드 작성 필요 +- code_infrastructure: Kubernetes/YAML/인프라 작업 필요 +- review: 코드 검토 및 품질 검증 필요 +- end: 작업 완료 또는 최대 반복 도달 + +## 출력 형식 +다음 에이전트와 이유를 명시하세요: +NEXT_AGENT: planning +REASON: 이유 설명 +MESSAGE: 해당 에이전트에게 전달할 메시지 + +## 주의사항 +- 반복 횟수(iteration_count) 확인 (최대 3회) +- Review Agent의 피드백을 신중히 검토 +- 에러 발생 시 적절한 복구 조치 +""" + + +def orchestrator_node(state: AgentState) -> AgentState: + """ + Orchestrator 노드: 전체 워크플로우 조율 + """ + messages = state["messages"] + iteration_count = state.get("iteration_count", 0) + + # 컨텍스트 구성 + context_parts = [f"현재 반복 횟수: {iteration_count}/3"] + + if state.get("task_plan"): + context_parts.append(f"작업 계획: {state['task_plan']}") + + if state.get("research_data"): + context_parts.append(f"수집된 정보: {state['research_data']}") + + if state.get("code_outputs"): + context_parts.append(f"생성된 코드: {state['code_outputs']}") + + if state.get("review_feedback"): + context_parts.append(f"리뷰 피드백: {state['review_feedback']}") + + context = "\n".join(context_parts) + + # 사용자 요청 + user_request = messages[-1]["content"] if messages else "" + + # Claude 호출 + response = claude_orchestrator.invoke([ + SystemMessage(content=ORCHESTRATOR_PROMPT), + HumanMessage(content=f"사용자 요청: {user_request}\n\n현재 상태:\n{context}") + ]) + + content = response.content + + # 다음 에이전트 파싱 + next_agent = "planning" # 기본값 + if "NEXT_AGENT:" in content: + for line in content.split("\n"): + if line.startswith("NEXT_AGENT:"): + next_agent = line.split(":")[1].strip() + break + + # 메시지 추가 + state["messages"].append({ + "role": "orchestrator", + "content": content + }) + state["current_agent"] = next_agent + + return state diff --git a/services/backend/agents/planning_agent.py b/services/backend/agents/planning_agent.py new file mode 100644 index 0000000..65c58aa --- /dev/null +++ b/services/backend/agents/planning_agent.py @@ -0,0 +1,125 @@ +""" +Planning Agent (Claude 4.5) +작업 계획 수립 및 단계별 태스크 정의 +""" +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +import os +import json + + +# Claude 4.5 모델 초기화 +claude_planning = ChatAnthropic( + model="claude-sonnet-4-20250514", + api_key=os.getenv("ANTHROPIC_API_KEY"), + temperature=0.3 # 계획은 더 deterministic하게 +) + + +PLANNING_PROMPT = """당신은 Multi-Agent System의 **Planning Agent**입니다. + +## 역할 +- 사용자 요청을 분석하여 구체적인 작업 계획 수립 +- 단계별 태스크 정의 및 우선순위 설정 +- 필요한 정보 수집 항목 명시 +- 성공 기준 정의 + +## 계획 수립 프로세스 +1. 요청 분석: 사용자가 원하는 것이 무엇인지 명확히 파악 +2. 작업 분류: Backend / Frontend / Infrastructure 중 어느 영역인지 +3. 단계 분해: 큰 작업을 작은 단계로 나누기 +4. 정보 필요성 파악: 어떤 정보를 수집해야 하는지 +5. 성공 기준 설정: 언제 작업이 완료된 것으로 볼지 + +## 출력 형식 (JSON) +반드시 다음 JSON 형식으로 출력하세요: +```json +{ + "task_type": "backend | frontend | infrastructure | mixed", + "summary": "작업 요약 (1-2문장)", + "steps": [ + {"step": 1, "description": "단계 설명", "agent": "research|code_backend|code_frontend|code_infrastructure"}, + {"step": 2, "description": "단계 설명", "agent": "research|code_backend|code_frontend|code_infrastructure"} + ], + "research_needed": [ + "수집할 정보 1", + "수집할 정보 2" + ], + "success_criteria": [ + "성공 기준 1", + "성공 기준 2" + ] +} +``` + +## 예시 +요청: "PostgreSQL 데이터베이스에 사용자 테이블 추가" +출력: +```json +{ + "task_type": "backend", + "summary": "PostgreSQL에 users 테이블을 생성하고 기본 스키마 정의", + "steps": [ + {"step": 1, "description": "현재 DB 스키마 확인", "agent": "research"}, + {"step": 2, "description": "users 테이블 마이그레이션 스크립트 작성", "agent": "code_backend"}, + {"step": 3, "description": "테이블 생성 및 검증", "agent": "code_backend"} + ], + "research_needed": [ + "기존 PostgreSQL 테이블 목록", + "현재 사용 중인 ORM/마이그레이션 도구" + ], + "success_criteria": [ + "users 테이블이 정상적으로 생성됨", + "기본 컬럼(id, name, email, created_at)이 포함됨" + ] +} +``` +""" + + +def planning_node(state: AgentState) -> AgentState: + """ + Planning 노드: 작업 계획 수립 + """ + messages = state["messages"] + user_request = messages[0]["content"] if messages else "" + + # Claude 호출 + response = claude_planning.invoke([ + SystemMessage(content=PLANNING_PROMPT), + HumanMessage(content=f"사용자 요청: {user_request}") + ]) + + content = response.content + + # JSON 파싱 시도 + try: + # JSON 블록 추출 + if "```json" in content: + json_str = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + json_str = content.split("```")[1].split("```")[0].strip() + else: + json_str = content + + task_plan = json.loads(json_str) + except Exception as e: + task_plan = { + "task_type": "mixed", + "summary": "계획 파싱 실패, Research Agent로 이동", + "steps": [{"step": 1, "description": "정보 수집", "agent": "research"}], + "research_needed": ["사용자 요청 관련 정보"], + "success_criteria": ["작업 완료"], + "error": str(e) + } + + # 상태 업데이트 + state["task_plan"] = task_plan + state["messages"].append({ + "role": "planning", + "content": content + }) + state["current_agent"] = "orchestrator" # 다시 orchestrator로 반환 + + return state diff --git a/services/backend/agents/research_agent.py b/services/backend/agents/research_agent.py new file mode 100644 index 0000000..0bb952b --- /dev/null +++ b/services/backend/agents/research_agent.py @@ -0,0 +1,159 @@ +""" +Research Agent (Groq) +정보 수집 및 문서/코드베이스 검색 +""" +from langchain_openai import ChatOpenAI +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +from tools.bash_tool import bash_tools +import os +import json + + +# Groq 모델 초기화 (OpenAI 호환) +groq_research = ChatOpenAI( + model="llama-3.3-70b-versatile", + api_key=os.getenv("GROQ_API_KEY"), + base_url="https://api.groq.com/openai/v1", + temperature=0.3 +) + + +RESEARCH_PROMPT = """당신은 Multi-Agent System의 **Research Agent**입니다. + +## 역할 +- Bash 명령어를 활용하여 필요한 정보 수집 +- Kubernetes 클러스터 상태 조회 +- PostgreSQL 데이터베이스 스키마/데이터 탐색 +- Git 레포지토리 분석 +- 파일 시스템 검색 +- Prometheus 메트릭 수집 + +## 사용 가능한 도구 +**execute_bash**: 모든 bash 명령어를 실행할 수 있습니다. + +### 예시 명령어: + +**Kubernetes 조회:** +- kubectl get pods -n mas +- kubectl get deployments -A +- kubectl describe pod -n +- kubectl logs -n --tail=50 + +**PostgreSQL 조회:** +- psql -U bluemayne -d postgres -c "\\dt" # 테이블 목록 +- psql -U bluemayne -d postgres -c "SELECT * FROM users LIMIT 10" +- psql -U bluemayne -d postgres -c "\\d users" # 테이블 스키마 + +**Git 조회:** +- git log -10 --oneline +- git status +- git diff +- git branch -a + +**파일 시스템:** +- ls -la /app/repos/ +- cat /app/repos/cluster-infrastructure/README.md +- find /app/repos -name "*.yaml" -type f +- grep -r "keyword" /app/repos/ + +**Prometheus 메트릭:** +- curl -s "http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090/api/v1/query?query=up" +- curl -s "http://prometheus:9090/api/v1/query?query=node_cpu_seconds_total" + +## 출력 형식 (JSON) +수집한 정보를 JSON 형식으로 정리하세요: +```json +{ + "summary": "수집한 정보 요약", + "findings": [ + {"category": "카테고리", "data": "발견한 데이터"}, + {"category": "카테고리", "data": "발견한 데이터"} + ], + "recommendations": ["추천 사항 1", "추천 사항 2"] +} +``` + +## 주의사항 +- 여러 명령어를 실행하여 충분한 정보 수집 +- 에러 발생 시 대안 명령어 시도 +- 보안에 민감한 정보는 마스킹 +""" + + +def research_node(state: AgentState) -> AgentState: + """ + Research 노드: 정보 수집 + """ + messages = state["messages"] + task_plan = state.get("task_plan", {}) + research_needed = task_plan.get("research_needed", []) + + # Groq에 bash 도구 바인딩 + groq_with_tools = groq_research.bind_tools(bash_tools) + + # 연구 요청 구성 + research_request = f"다음 정보를 수집해주세요:\n" + "\n".join(f"- {item}" for item in research_needed) + + # Groq 호출 + response = groq_with_tools.invoke([ + SystemMessage(content=RESEARCH_PROMPT), + HumanMessage(content=research_request) + ]) + + # Tool calls 처리 + tool_outputs = [] + if hasattr(response, 'tool_calls') and response.tool_calls: + for tool_call in response.tool_calls: + tool_name = tool_call['name'] + tool_args = tool_call.get('args', {}) + + # 도구 실행 + try: + tool_func = bash_tools[0] # execute_bash + tool_result = tool_func.invoke(tool_args) + tool_outputs.append(f"\n🔧 **{tool_name}({tool_args.get('command', '')})**:\n{tool_result}") + except Exception as e: + tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}") + + # Tool 결과와 함께 재호출 + if tool_outputs: + tool_context = "\n".join(tool_outputs) + response = groq_research.invoke([ + SystemMessage(content=RESEARCH_PROMPT), + HumanMessage(content=research_request), + HumanMessage(content=f"도구 실행 결과:\n{tool_context}\n\n이제 JSON 형식으로 정리해주세요.") + ]) + + content = response.content + + # Tool outputs를 content에 포함 + if tool_outputs: + content = "\n".join(tool_outputs) + "\n\n" + content + + # JSON 파싱 시도 + try: + if "```json" in content: + json_str = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + json_str = content.split("```")[1].split("```")[0].strip() + else: + json_str = content + + research_data = json.loads(json_str) + except Exception: + research_data = { + "summary": "정보 수집 완료", + "findings": [{"category": "raw", "data": content}], + "recommendations": [] + } + + # 상태 업데이트 + state["research_data"] = research_data + state["messages"].append({ + "role": "research", + "content": content + }) + state["current_agent"] = "orchestrator" + + return state diff --git a/services/backend/agents/review_agent.py b/services/backend/agents/review_agent.py new file mode 100644 index 0000000..f8207d4 --- /dev/null +++ b/services/backend/agents/review_agent.py @@ -0,0 +1,161 @@ +""" +Review & Test Agent (Claude) +코드 리뷰, 품질 검증, 테스트 전략 수립 +""" +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import SystemMessage, HumanMessage +from .state import AgentState +import os +import json + + +# Claude 모델 초기화 (리뷰는 고품질 모델 사용) +claude_review = ChatAnthropic( + model="claude-sonnet-4-20250514", + api_key=os.getenv("ANTHROPIC_API_KEY"), + temperature=0.3 +) + + +REVIEW_PROMPT = """당신은 Multi-Agent System의 **Review & Test Agent**입니다. + +## 역할 +- 생성된 코드의 품질 검증 +- 보안 취약점 검사 +- 성능 및 확장성 평가 +- 테스트 전략 수립 +- 개선 사항 제안 + +## 검토 항목 + +### 1. 코드 품질 +- 가독성 및 유지보수성 +- 네이밍 컨벤션 +- 코드 중복 제거 +- 에러 처리 + +### 2. 보안 +- SQL Injection +- XSS (Cross-Site Scripting) +- CSRF +- 인증/인가 로직 +- 민감 정보 노출 + +### 3. 성능 +- 쿼리 최적화 +- 캐싱 전략 +- 비동기 처리 +- 리소스 사용 + +### 4. 테스트 +- 단위 테스트 필요성 +- 통합 테스트 시나리오 +- E2E 테스트 계획 + +### 5. 인프라 (Kubernetes YAML) +- Resource limits/requests +- Liveness/Readiness probes +- Security context +- Network policies + +## 출력 형식 (JSON) +반드시 다음 JSON 형식으로 출력하세요: +```json +{ + "approved": true/false, + "overall_score": 85, + "summary": "전체 평가 요약", + "issues": [ + { + "severity": "high|medium|low", + "category": "security|performance|quality|test", + "description": "문제 설명", + "recommendation": "개선 방안" + } + ], + "strengths": ["강점 1", "강점 2"], + "next_steps": ["다음 단계 1", "다음 단계 2"] +} +``` + +## 승인 기준 +- **approved: true**: 심각한 문제 없음, 배포 가능 +- **approved: false**: 중대한 보안/품질 이슈, 재작업 필요 + +## 점수 기준 +- 90-100: Excellent +- 80-89: Good +- 70-79: Acceptable +- 60-69: Needs Improvement +- < 60: Major Issues +""" + + +def review_node(state: AgentState) -> AgentState: + """ + Review 노드: 코드 리뷰 및 품질 검증 + """ + messages = state["messages"] + code_outputs = state.get("code_outputs", {}) + task_plan = state.get("task_plan", {}) + + # 코드 리뷰 요청 구성 + code_summary = "\n\n".join([ + f"### {agent_type.upper()}\n{code}" + for agent_type, code in code_outputs.items() + ]) + + review_request = f""" +작업 계획: {task_plan.get('summary', '')} +성공 기준: {task_plan.get('success_criteria', [])} + +생성된 코드: +{code_summary} + +위 코드를 검토하고 JSON 형식으로 피드백을 제공해주세요. +""" + + # Claude 호출 + response = claude_review.invoke([ + SystemMessage(content=REVIEW_PROMPT), + HumanMessage(content=review_request) + ]) + + content = response.content + + # JSON 파싱 시도 + try: + if "```json" in content: + json_str = content.split("```json")[1].split("```")[0].strip() + elif "```" in content: + json_str = content.split("```")[1].split("```")[0].strip() + else: + json_str = content + + review_feedback = json.loads(json_str) + except Exception as e: + review_feedback = { + "approved": False, + "overall_score": 50, + "summary": "리뷰 파싱 실패", + "issues": [{"severity": "high", "category": "quality", "description": f"JSON 파싱 에러: {str(e)}", "recommendation": "재검토 필요"}], + "strengths": [], + "next_steps": ["피드백 재생성"] + } + + # 상태 업데이트 + state["review_feedback"] = review_feedback + state["is_approved"] = review_feedback.get("approved", False) + state["messages"].append({ + "role": "review", + "content": content + }) + + # 승인되지 않았고 반복 횟수가 3 미만이면 재작업 + if not state["is_approved"] and state["iteration_count"] < 3: + state["iteration_count"] += 1 + state["current_agent"] = "orchestrator" # 재작업을 위해 orchestrator로 + else: + state["current_agent"] = "end" # 승인되었거나 최대 반복 도달 + + return state diff --git a/services/backend/agents/state.py b/services/backend/agents/state.py new file mode 100644 index 0000000..7b756bf --- /dev/null +++ b/services/backend/agents/state.py @@ -0,0 +1,17 @@ +""" +공유 상태 정의 (AgentState) +""" +from typing import TypedDict, Optional + + +class AgentState(TypedDict): + """에이전트 간 공유되는 상태""" + messages: list # 대화 메시지 이력 + current_agent: str # 현재 활성 에이전트 + task_plan: Optional[dict] # Planning Agent 출력 + research_data: Optional[dict] # Research Agent 출력 + code_outputs: dict # Code Agent(s) 출력 + review_feedback: Optional[dict] # Review Agent 출력 + iteration_count: int # 반복 횟수 (최대 3회) + is_approved: bool # 최종 승인 여부 + error: Optional[str] # 에러 메시지 diff --git a/services/backend/chainlit_app.py b/services/backend/chainlit_app.py index 6be85b7..abe40c6 100644 --- a/services/backend/chainlit_app.py +++ b/services/backend/chainlit_app.py @@ -2,7 +2,8 @@ Chainlit UI for MAS Platform """ import chainlit as cl -from agents import mas_graph, AgentState +from workflow import mas_graph +from agents import AgentState import os from dotenv import load_dotenv @@ -13,13 +14,19 @@ load_dotenv() async def start(): """채팅 시작 시""" await cl.Message( - content="🤖 **Multi-Agent System**에 오신 것을 환영합니다!\n\n" + content="🤖 **Multi-Agent System v2.0**에 오신 것을 환영합니다!\n\n" "저는 다음 전문가 팀과 함께 작업합니다:\n\n" - "- 🎼 **Claude Code**: 총괄 조율자 & DevOps 전문가\n" - "- ⚙️ **Groq Backend**: 백엔드 개발자\n" - "- 🎨 **Groq Frontend**: 프론트엔드 개발자\n" - "- 📊 **Groq SRE**: 모니터링 & 성능 전문가\n" - "- 📝 **Groq YAML Manager**: Kubernetes YAML 파일 관리\n\n" + "**계획 & 조율**\n" + "- 🎼 **Orchestrator** (Claude 4.5): 전체 워크플로우 조율\n" + "- 📋 **Planning Agent** (Claude 4.5): 작업 계획 수립\n\n" + "**정보 수집**\n" + "- 🔍 **Research Agent** (Groq): 정보 수집 및 분석\n\n" + "**코드 작성**\n" + "- ⚙️ **Backend Agent** (Groq): 백엔드 개발\n" + "- 🎨 **Frontend Agent** (Groq): 프론트엔드 개발\n" + "- 🏗️ **Infrastructure Agent** (Groq): K8s/DevOps\n\n" + "**품질 보증**\n" + "- ✅ **Review Agent** (Claude): 코드 리뷰 & 테스트\n\n" "무엇을 도와드릴까요?" ).send() @@ -32,8 +39,13 @@ async def main(message: cl.Message): initial_state: AgentState = { "messages": [{"role": "user", "content": message.content}], "current_agent": "orchestrator", - "task_type": "", - "result": {} + "task_plan": None, + "research_data": None, + "code_outputs": {}, + "review_feedback": None, + "iteration_count": 0, + "is_approved": False, + "error": None } # 응답 메시지 생성 @@ -51,12 +63,14 @@ async def main(message: cl.Message): # 에이전트별 아이콘 agent_icons = { "orchestrator": "🎼", + "planning": "📋", + "research": "🔍", "backend_developer": "⚙️", "frontend_developer": "🎨", - "sre_specialist": "📊", - "yaml_manager": "📝" + "infrastructure_engineer": "🏗️", + "review": "✅" } - + icon = agent_icons.get(agent_name, "🤖") # 스트리밍 업데이트 @@ -78,11 +92,13 @@ async def setup_agent(settings): def rename(orig_author: str): """에이전트 이름 매핑""" rename_dict = { - "orchestrator": "Claude Code (Orchestrator)", - "backend_developer": "Groq Backend Dev", - "frontend_developer": "Groq Frontend Dev", - "sre_specialist": "Groq SRE", - "yaml_manager": "Groq YAML Manager" + "orchestrator": "Orchestrator (Claude 4.5)", + "planning": "Planning Agent (Claude 4.5)", + "research": "Research Agent (Groq)", + "backend_developer": "Backend Agent (Groq)", + "frontend_developer": "Frontend Agent (Groq)", + "infrastructure_engineer": "Infrastructure Agent (Groq)", + "review": "Review Agent (Claude)" } return rename_dict.get(orig_author, orig_author) diff --git a/services/backend/tools/__init__.py b/services/backend/tools/__init__.py new file mode 100644 index 0000000..ba5b73a --- /dev/null +++ b/services/backend/tools/__init__.py @@ -0,0 +1,6 @@ +""" +MAS Tools Package +""" +from .bash_tool import bash_tools, execute_bash + +__all__ = ['bash_tools', 'execute_bash'] diff --git a/services/backend/tools/bash_tool.py b/services/backend/tools/bash_tool.py new file mode 100644 index 0000000..abd9e57 --- /dev/null +++ b/services/backend/tools/bash_tool.py @@ -0,0 +1,55 @@ +""" +Bash 명령어 실행 도구 +""" +import subprocess +from langchain_core.tools import tool +from typing import Optional + + +@tool +def execute_bash(command: str, timeout: int = 30, cwd: Optional[str] = None) -> str: + """ + Execute a bash command and return the output. + + Args: + command: Bash command to execute + timeout: Command timeout in seconds (default: 30) + cwd: Working directory (default: None) + + Returns: + Command output or error message + + Examples: + - execute_bash("kubectl get pods -n mas") + - execute_bash("git log -5 --oneline", cwd="/app/repos/cluster-infrastructure") + - execute_bash("psql -U bluemayne -d postgres -c 'SELECT * FROM users LIMIT 10'") + - execute_bash("curl -s http://prometheus:9090/api/v1/query?query=up") + """ + try: + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + timeout=timeout, + cwd=cwd + ) + + # Combine stdout and stderr + output = result.stdout + if result.stderr: + output += f"\n[STDERR]:\n{result.stderr}" + + if result.returncode != 0: + return f"❌ Command failed (exit code {result.returncode}):\n{output}" + + return f"✅ Command executed successfully:\n{output}" + + except subprocess.TimeoutExpired: + return f"❌ Command timed out after {timeout} seconds" + except Exception as e: + return f"❌ Error executing command: {str(e)}" + + +# Export the tool +bash_tools = [execute_bash] diff --git a/services/backend/workflow.py b/services/backend/workflow.py new file mode 100644 index 0000000..1f7ba2b --- /dev/null +++ b/services/backend/workflow.py @@ -0,0 +1,114 @@ +""" +LangGraph Iterative Workflow +반복적 워크플로우: Planning → Research → Code → Review (최대 3회 반복) +""" +from typing import Literal +from langgraph.graph import StateGraph, END +from agents import ( + AgentState, + orchestrator_node, + planning_node, + research_node, + backend_code_node, + frontend_code_node, + infrastructure_code_node, + review_node +) + + +def router(state: AgentState) -> Literal[ + "planning", + "research", + "code_backend", + "code_frontend", + "code_infrastructure", + "review", + "end" +]: + """ + 다음 에이전트 라우팅 로직 + """ + current = state.get("current_agent", "orchestrator") + + # 명시적으로 지정된 다음 에이전트로 이동 + if current in [ + "planning", + "research", + "code_backend", + "code_frontend", + "code_infrastructure", + "review" + ]: + return current + + # end 상태 + if current == "end": + return "end" + + # 기본값: planning부터 시작 + return "planning" + + +def create_mas_workflow(): + """ + MAS Iterative Workflow 생성 + + 워크플로우: + User Request + ↓ + Orchestrator → Planning → Orchestrator + ↓ + Research → Orchestrator + ↓ + Code (Backend/Frontend/Infrastructure) → Orchestrator + ↓ + Review → Orchestrator + ↓ (if not approved and iteration < 3) + Research (반복) + ↓ (if approved or iteration >= 3) + End + """ + workflow = StateGraph(AgentState) + + # 노드 추가 + workflow.add_node("orchestrator", orchestrator_node) + workflow.add_node("planning", planning_node) + workflow.add_node("research", research_node) + workflow.add_node("code_backend", backend_code_node) + workflow.add_node("code_frontend", frontend_code_node) + workflow.add_node("code_infrastructure", infrastructure_code_node) + workflow.add_node("review", review_node) + + # 시작점: Orchestrator + workflow.set_entry_point("orchestrator") + + # Orchestrator의 조건부 라우팅 + workflow.add_conditional_edges( + "orchestrator", + router, + { + "planning": "planning", + "research": "research", + "code_backend": "code_backend", + "code_frontend": "code_frontend", + "code_infrastructure": "code_infrastructure", + "review": "review", + "end": END + } + ) + + # 각 에이전트는 작업 후 Orchestrator로 복귀 + workflow.add_edge("planning", "orchestrator") + workflow.add_edge("research", "orchestrator") + workflow.add_edge("code_backend", "orchestrator") + workflow.add_edge("code_frontend", "orchestrator") + workflow.add_edge("code_infrastructure", "orchestrator") + + # Review는 승인 여부에 따라 처리 (review_node 내부에서 current_agent 설정) + workflow.add_edge("review", "orchestrator") + + return workflow.compile() + + +# 그래프 인스턴스 생성 +mas_graph = create_mas_workflow()