REFACTOR(repo): simplify project structure
Some checks failed
Build Docker Image / build-and-push (push) Has been cancelled

- Move services/backend to langgraph/
- Move deploy/docker/Dockerfile to Dockerfile
- Remove deploy/, services/ folders
- Update GitHub Actions workflow paths
- Remove kustomization update logic (managed by K3S-HOME/applications)
This commit is contained in:
2026-01-05 16:45:33 +09:00
parent 49083910c6
commit 013140f02c
30 changed files with 4 additions and 1094 deletions

View File

@@ -0,0 +1,19 @@
"""
MAS Agents Package
K8s Infrastructure Planning System
"""
from .state import AgentState
from .orchestrator import orchestrator_node
from .planning_agent import planning_node
from .research_agent import research_node
from .decision_agent import decision_node
from .prompt_generator_agent import prompt_generator_node
__all__ = [
'AgentState',
'orchestrator_node',
'planning_node',
'research_node',
'decision_node',
'prompt_generator_node',
]

View File

@@ -0,0 +1,156 @@
"""
Decision Agent (Claude 4.5)
Planning과 Research 결과를 분석하여 최종 의사결정 (추천/비추천)
"""
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from .state import AgentState
import os
import json
# Claude 4.5 모델 초기화
claude_decision = ChatAnthropic(
model="claude-sonnet-4-20250514",
api_key=os.getenv("ANTHROPIC_API_KEY"),
temperature=0.5
)
DECISION_SYSTEM = """You are the Decision Agent.
## Role
Analyze planning and research data to make final deployment decision (추천/비추천).
## Input
- Planning data: deployment requirements, resources needed
- Research data: current cluster state, existing tools
## Output Format (Korean Markdown)
Make a clear decision with reasoning:
```markdown
# [도구명] 도입 분석 결과
## 📊 현재 클러스터 상태
- **Kubernetes 버전**: [version]
- **노드 구성**: [nodes info]
- **기존 도구**: [existing tools]
- **리소스 상태**: [available resources]
## 💡 권장사항: [✅ 도입 추천 / ❌ 도입 비추천]
### 결정 이유
1. [이유 1]
2. [이유 2]
3. [이유 3]
### 🔄 대안 (비추천인 경우)
- [대안 1]: [설명]
- [대안 2]: [설명]
### 📌 고려사항 (추천인 경우)
- **필요 리소스**: [CPU, Memory]
- **예상 작업 시간**: [estimate]
- **복잡도**: [level]
## 🎯 결론
[1-2문장으로 최종 권장사항 요약]
```
## Guidelines
1. **한국어로 작성**
2. **명확한 결론** (✅ 추천 or ❌ 비추천)
3. **구체적인 이유** 제공
4. **사용자 친화적** (기술 용어 최소화)
5. 이모지 사용으로 가독성 향상
## Decision Output
Also output a JSON with decision:
{"recommendation": "approve" or "reject", "tool_name": "..."}
"""
def decision_node(state: AgentState) -> AgentState:
"""
Decision 노드: 최종 의사결정 (추천/비추천)
"""
messages = state["messages"]
task_plan = state.get("task_plan", {})
research_data = state.get("research_data", {})
# 입력 데이터 준비
plan_summary = json.dumps(task_plan, indent=2, ensure_ascii=False) if task_plan else "No plan available"
research_summary = json.dumps(research_data, indent=2, ensure_ascii=False) if research_data else "No research data"
# 사용자 원래 요청
user_request = messages[0]["content"] if messages else "Deploy infrastructure"
print(f"\n{'='*80}")
print(f"Decision Agent - Making final decision")
print(f"{'='*80}")
# Claude 호출
response = claude_decision.invoke([
SystemMessage(content=DECISION_SYSTEM),
HumanMessage(content=f"""분석 결과를 바탕으로 최종 의사결정을 내려주세요:
**사용자 요청:** {user_request}
**계획 데이터:**
```json
{plan_summary}
```
**클러스터 분석 결과:**
```json
{research_summary}
```
위 정보를 바탕으로:
1. 현재 클러스터 상태 요약
2. **도입 추천/비추천 명확히 결정**
3. 구체적인 이유 제시
4. 대안 또는 고려사항 제공
5. 최종 결론
**중요**: 한국어로 작성하고, 사용자 친화적으로 작성해주세요.
마지막에 JSON 형식으로 결정도 포함: {{"recommendation": "approve" or "reject", "tool_name": "..."}}
""")
])
content = response.content
# 추천/비추천 판단 (JSON 파싱 시도)
recommendation = "reject" # 기본값
try:
if '{"recommendation"' in content or "```json" in content:
import re
json_match = re.search(r'\{[^{}]*"recommendation"[^{}]*\}', content)
if json_match:
decision_json = json.loads(json_match.group(0))
recommendation = decision_json.get("recommendation", "reject")
except:
# 텍스트 기반 판단
if "✅ 도입 추천" in content or "추천" in content:
recommendation = "approve"
print(f"✅ Decision made: {recommendation}")
# 상태 업데이트
state["decision_report"] = {
"content": content,
"recommendation": recommendation
}
state["messages"].append({
"role": "decision",
"content": content
})
# 추천이면 prompt_generator로, 비추천이면 end
if recommendation == "approve":
state["current_agent"] = "orchestrator" # Orchestrator가 prompt_generator로 보냄
else:
state["current_agent"] = "end" # 비추천이면 바로 종료
return state

View File

@@ -0,0 +1,192 @@
"""
Orchestrator Agent (Claude 4.5)
전체 조율 및 최종 의사결정
"""
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from .state import AgentState
from tools.bash_tool import bash_tools
import os
# Claude 4.5 모델 초기화
claude_orchestrator = ChatAnthropic(
model="claude-sonnet-4-20250514",
api_key=os.getenv("ANTHROPIC_API_KEY"),
temperature=0.7
)
ORCHESTRATOR_PROMPT = """You are the Orchestrator of a K8s Analysis & Decision System.
## Role
Determine request type and route to appropriate agents.
## Request Types
### Type 1: Information Query (정보 조회)
Keywords: "알려줘", "조회", "확인", "보여줘", "찾아줘", "검색", "상태", "비밀번호", "목록", "리스트"
Examples:
- "PostgreSQL 비밀번호 알려줘"
- "현재 Pod 상태 확인해줘"
- "Secret 목록 보여줘"
Workflow: research → end
### Type 2: Deployment Decision (도입 결정)
Keywords: "도입", "설치", "배포", "필요", "결정", "추천", "분석", "사용"
Examples:
- "Tekton 도입할까?"
- "Harbor가 필요한지 분석해줘"
Workflow: planning → research → prompt_generator → end
## Available Agents
- planning: Plan deployment requirements (deployment_decision only)
- research: Analyze cluster state or retrieve information
- decision: Make final decision (추천/비추천) (deployment_decision only)
- prompt_generator: Generate implementation guide for other AI (deployment_decision, only if approved)
- end: Complete the task
## Decision Logic
**First, determine request_type (첫 호출 시만):**
- If user wants information → request_type = "information_query"
- If user wants deployment decision → request_type = "deployment_decision"
**Then route based on request_type:**
### For information_query:
- Current state: start → NEXT_AGENT: research
- Current state: research done → NEXT_AGENT: end
### For deployment_decision:
- Current state: start → NEXT_AGENT: planning
- Current state: planning done → NEXT_AGENT: research
- Current state: research done → NEXT_AGENT: decision
- Current state: decision done (추천) → NEXT_AGENT: prompt_generator
- Current state: decision done (비추천) → NEXT_AGENT: end
- Current state: prompt_generator done → NEXT_AGENT: end
Check state.get("task_plan"), state.get("research_data"), state.get("decision_report"), state.get("implementation_prompt") to determine current progress.
## Output Format
REQUEST_TYPE: <information_query|deployment_decision>
NEXT_AGENT: <agent_name>
REASON: <brief reason>
Analyze user intent carefully to choose the correct request type.
"""
def orchestrator_node(state: AgentState) -> AgentState:
"""
Orchestrator 노드: 전체 워크플로우 조율
"""
messages = state["messages"]
iteration_count = state.get("iteration_count", 0)
# 컨텍스트 구성
context_parts = [f"현재 반복 횟수: {iteration_count}/2"]
if state.get("task_plan"):
context_parts.append(f"✅ 계획 수립 완료")
if state.get("research_data"):
context_parts.append(f"✅ 클러스터 분석 완료")
if state.get("implementation_prompt"):
context_parts.append(f"✅ 구현 프롬프트 생성 완료")
context = "\n".join(context_parts)
# 사용자 요청
user_request = messages[-1]["content"] if messages else ""
# Claude에 bash 도구 바인딩
claude_with_tools = claude_orchestrator.bind_tools(bash_tools)
# Claude 호출
response = claude_with_tools.invoke([
SystemMessage(content=ORCHESTRATOR_PROMPT),
HumanMessage(content=f"사용자 요청: {user_request}\n\n현재 상태:\n{context}")
])
# Tool calls 처리
tool_outputs = []
if hasattr(response, 'tool_calls') and response.tool_calls:
for tool_call in response.tool_calls:
tool_name = tool_call['name']
tool_args = tool_call.get('args', {})
try:
# tool_name에 따라 올바른 도구 선택
from tools.bash_tool import execute_bash, execute_host
if tool_name == "execute_host":
tool_func = execute_host
else:
tool_func = execute_bash
tool_result = tool_func.invoke(tool_args)
tool_outputs.append(f"\n🔧 **Orchestrator {tool_name}({tool_args.get('command', '')[:50]}...)**:\n{tool_result}")
except Exception as e:
tool_outputs.append(f"\n❌ **{tool_name}** failed: {str(e)}")
# Tool 결과와 함께 재호출
if tool_outputs:
tool_context = "\n".join(tool_outputs)
response = claude_orchestrator.invoke([
SystemMessage(content=ORCHESTRATOR_PROMPT),
HumanMessage(content=f"사용자 요청: {user_request}\n\n현재 상태:\n{context}"),
HumanMessage(content=f"도구 실행 결과:\n{tool_context}")
])
content = response.content
if tool_outputs:
content = "\n".join(tool_outputs) + "\n\n" + content
# 요청 타입 파싱
request_type = state.get("request_type") # 기존 값 유지
if "REQUEST_TYPE:" in content and not request_type:
for line in content.split("\n"):
if line.startswith("REQUEST_TYPE:"):
request_type = line.split(":")[1].strip()
state["request_type"] = request_type
break
# 다음 에이전트 파싱
next_agent = "planning" # 기본값
if "NEXT_AGENT:" in content:
for line in content.split("\n"):
if line.startswith("NEXT_AGENT:"):
next_agent = line.split(":")[1].strip()
break
# request_type에 따른 라우팅 보정
if request_type == "information_query":
# 정보 조회: Planning 건너뛰기
if next_agent == "planning":
next_agent = "research"
elif request_type == "deployment_decision":
# 의사결정: 순서 보장 (planning → research → decision → prompt_generator(추천시만) → end)
task_plan = state.get("task_plan")
research_data = state.get("research_data")
decision_report = state.get("decision_report")
implementation_prompt = state.get("implementation_prompt")
if not task_plan:
next_agent = "planning"
elif not research_data:
next_agent = "research"
elif not decision_report:
next_agent = "decision"
elif decision_report and decision_report.get("recommendation") == "approve" and not implementation_prompt:
next_agent = "prompt_generator"
else:
next_agent = "end"
# 메시지 추가
state["messages"].append({
"role": "orchestrator",
"content": content
})
state["current_agent"] = next_agent
return state

View File

@@ -0,0 +1,153 @@
"""
Planning Agent (Claude 4.5)
작업 계획 수립 및 단계별 태스크 정의
"""
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from .state import AgentState
import os
import json
# Claude 4.5 모델 초기화
claude_planning = ChatAnthropic(
model="claude-sonnet-4-20250514",
api_key=os.getenv("ANTHROPIC_API_KEY"),
temperature=0.3 # 계획은 더 deterministic하게
)
PLANNING_PROMPT = """You are the K8s Infrastructure Planning Agent.
## Role
Analyze user requests for Kubernetes infrastructure and create implementation plans.
## Your Mission
When a user wants to deploy something (e.g., "Tekton", "Harbor", "Prometheus"):
1. Understand what they want to deploy
2. Design high-level folder structure
3. Identify what K8s resources would be needed
4. Determine what cluster information to gather
## Output Format (JSON)
```json
{
"task_type": "k8s_infrastructure",
"summary": "Deploy X to Kubernetes cluster",
"target_tool": "Name of the tool/service to deploy",
"folder_structure": {
"base_path": "deploy/X",
"directories": ["base", "overlays/prod"]
},
"k8s_resources": [
{"type": "Namespace", "name": "X"},
{"type": "Deployment", "name": "X"},
{"type": "Service", "name": "X-svc"}
],
"research_needed": [
"Check Kubernetes version",
"Check existing similar tools",
"Verify available resources",
"Check storage classes"
],
"requirements": {
"min_k8s_version": "1.24",
"estimated_resources": {"cpu": "2", "memory": "4Gi"},
"dependencies": ["tool1", "tool2"]
}
}
```
Keep it simple and high-level. Focus on what needs to be checked, not detailed YAML structures.
"""
def planning_node(state: AgentState) -> AgentState:
"""
Planning 노드: 작업 계획 수립
"""
messages = state["messages"]
user_request = messages[0]["content"] if messages else ""
# Claude 호출
response = claude_planning.invoke([
SystemMessage(content=PLANNING_PROMPT),
HumanMessage(content=f"사용자 요청: {user_request}")
])
content = response.content
# JSON 파싱 시도
try:
# JSON 블록 추출
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
json_str = content.split("```")[1].split("```")[0].strip()
else:
json_str = content
task_plan = json.loads(json_str)
# 사용자 친화적인 한국어 요약 생성
summary_parts = []
target_tool = task_plan.get("target_tool", "알 수 없음")
summary_parts.append(f"📋 **{target_tool}** 요구사항 분석 완료\n")
# 필요 조건
requirements = task_plan.get("requirements", {})
if requirements:
summary_parts.append("**필요 조건**")
if requirements.get("min_k8s_version"):
summary_parts.append(f"- Kubernetes 버전: 최소 {requirements['min_k8s_version']} 이상")
resources = requirements.get("estimated_resources", {})
if resources:
cpu = resources.get("cpu", "")
memory = resources.get("memory", "")
storage = resources.get("storage", "")
resource_str = []
if cpu:
resource_str.append(f"CPU {cpu}코어")
if memory:
resource_str.append(f"메모리 {memory}")
if storage:
resource_str.append(f"스토리지 {storage}")
if resource_str:
summary_parts.append(f"- 예상 리소스: {', '.join(resource_str)}")
dependencies = requirements.get("dependencies", [])
if dependencies:
deps_str = ", ".join(dependencies)
summary_parts.append(f"- 의존성: {deps_str}")
# 확인이 필요한 사항
research_needed = task_plan.get("research_needed", [])
if research_needed:
summary_parts.append("\n**확인이 필요한 사항**")
for item in research_needed[:5]: # 최대 5개
# 영어를 한국어로 간단히 변환
item_ko = item.replace("Check", "확인:").replace("Verify", "검증:").replace("Analyze", "분석:")
summary_parts.append(f"- {item_ko}")
user_friendly_content = "\n".join(summary_parts)
except Exception as e:
task_plan = {
"task_type": "mixed",
"summary": "계획 파싱 실패",
"research_needed": ["클러스터 상태 확인"],
"error": str(e)
}
user_friendly_content = "📋 요구사항 분석 중...\n\n기본 정보를 확인하겠습니다."
# 상태 업데이트
state["task_plan"] = task_plan
state["messages"].append({
"role": "planning",
"content": user_friendly_content
})
state["current_agent"] = "orchestrator" # 다시 orchestrator로 반환
return state

View File

@@ -0,0 +1,178 @@
"""
Prompt Generator Agent (Claude 4.5)
Decision Agent의 추천 결과를 바탕으로 다른 AI에게 전달할 구현 프롬프트 생성
"""
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from .state import AgentState
import os
import json
# Claude 4.5 모델 초기화
claude_prompt_gen = ChatAnthropic(
model="claude-sonnet-4-20250514",
api_key=os.getenv("ANTHROPIC_API_KEY"),
temperature=0.3
)
PROMPT_GEN_SYSTEM = """You are the Implementation Prompt Generator.
## Role
Generate structured deployment prompts for other AI assistants based on existing project patterns.
## Environment Context
- **Projects Root**: `/home/ubuntu/Projects/`
- **Git Sync**: Local ↔️ Server auto-sync
- **ArgoCD**: All apps managed by ArgoCD
- **Vault**: Secrets managed by Vault ExternalSecrets
- **Kustomize**: All resources use Kustomization
## Project Structure Categories
### 1. Applications (`/home/ubuntu/Projects/applications/`)
**용도**: User-facing applications, development tools
**예시**: gitea, code-server, kubernetes-dashboard, homer, umami
**패턴**:
```
applications/{app-name}/
├── argocd/{app-name}.yaml # ArgoCD Application
├── helm-values/{app-name}.yaml # (Optional) Helm values
├── vault/*.yaml # (Optional) ExternalSecrets
└── kustomization.yaml # Resource list
```
### 2. Cluster Infrastructure (`/home/ubuntu/Projects/cluster-infrastructure/`)
**용도**: Cluster-level infrastructure tools
**예시**: cert-manager, ingress-nginx, vault, external-secrets, reloader
**패턴**: Same as applications
### 3. Monitoring (`/home/ubuntu/Projects/monitoring/`)
**용도**: Monitoring and observability tools
**예시**: prometheus, grafana, loki
### 4. Databases (`/home/ubuntu/Projects/databases/`)
**용도**: Database services
**예시**: postgresql, redis, mongodb
### 5. Individual Projects (`/home/ubuntu/Projects/{project-name}/`)
**용도**: Standalone application projects
**예시**: mas, jaejadle, joossam, portfolio
**패턴**:
```
{project-name}/
├── deploy/
│ ├── argocd/{project-name}.yaml
│ └── k8s/
│ ├── base/
│ └── overlays/prod/
└── services/
```
## Output Format (Markdown)
Create a CONCISE guide (MAX 25 lines total):
```markdown
# [도구명] 배포 가이드
## 📍 배치
`/home/ubuntu/Projects/[category]/[tool-name]/`
**이유**: [1줄 설명]
**참고**: [category]/[example]/ 구조 동일
## 📂 구조
\`\`\`
[category]/[tool-name]/
├── argocd/[tool-name].yaml
├── kustomization.yaml
└── vault/*.yaml (선택)
\`\`\`
## 📋 파일
- **argocd/**: ArgoCD Application (repoURL, path, namespace)
- **kustomization.yaml**: 리소스 목록
- **vault/**: ExternalSecret (평문 금지)
## ✅ 필수
- ArgoCD 통합
- `/home/ubuntu/Projects/[category]/kustomization.yaml` 업데이트
```
CRITICAL: Response MUST be under 25 lines!
## Guidelines
1. **폴더 구조와 파일 역할**만 명시 (세부 YAML은 AI가 생성)
2. **카테고리 선택 기준** 명확히 제시
3. **기존 프로젝트 패턴** 반드시 준수
4. **ArgoCD, Vault, Kustomize 통합** 필수
5. **참고 예시** 제공하여 AI가 따라할 수 있도록
"""
def prompt_generator_node(state: AgentState) -> AgentState:
"""
Prompt Generator 노드: 다른 AI에게 전달할 구현 프롬프트 생성
"""
messages = state["messages"]
task_plan = state.get("task_plan", {})
research_data = state.get("research_data", {})
decision_report = state.get("decision_report", {})
# 입력 데이터 준비
plan_summary = json.dumps(task_plan, indent=2, ensure_ascii=False) if task_plan else "No plan"
research_summary = json.dumps(research_data, indent=2, ensure_ascii=False) if research_data else "No research"
# 사용자 원래 요청
user_request = messages[0]["content"] if messages else "Deploy infrastructure"
tool_name = task_plan.get("target_tool", "Unknown") if task_plan else "Unknown"
print(f"\n{'='*80}")
print(f"Prompt Generator - Creating implementation guide")
print(f"{'='*80}")
# Claude 호출
response = claude_prompt_gen.invoke([
SystemMessage(content=PROMPT_GEN_SYSTEM),
HumanMessage(content=f"""다른 AI에게 전달할 구현 가이드를 생성해주세요:
**사용자 요청:** {user_request}
**배포 대상:** {tool_name}
**계획 데이터:**
```json
{plan_summary}
```
**클러스터 상태:**
```json
{research_summary}
```
위 정보를 바탕으로:
1. **적절한 카테고리 선택** (applications, cluster-infrastructure, monitoring, databases)
2. **폴더 구조만 제시** (세부 YAML은 다른 AI가 생성)
3. **파일별 역할 설명** (필수 필드와 용도만 명시)
4. **기존 패턴 준수** (ArgoCD, Vault, Kustomize 통합)
5. **참고 예시 제공** (동일 카테고리 프로젝트)
**중요**:
- 구조와 역할만 설명하고, 세부 YAML 내용은 생성하지 마세요
- 다른 AI가 이 가이드를 보고 YAML을 직접 생성할 수 있도록 간결하게 작성
- 응답은 간결하게 유지 (너무 길면 잘립니다)
""")
])
content = response.content
print(f"✅ Implementation guide generated ({len(content)} characters)")
# 상태 업데이트
state["implementation_prompt"] = content
state["messages"].append({
"role": "prompt_generator",
"content": content
})
state["current_agent"] = "end" # 완료
return state

View File

@@ -0,0 +1,343 @@
"""
Research Agent (Claude)
정보 수집 및 문서/코드베이스 검색
JSON 기반 명령어 생성 방식으로 재작성
"""
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage
from .state import AgentState
import os
import json
import re
# Claude 4.5 모델 초기화
claude_research = ChatAnthropic(
model="claude-sonnet-4-20250514",
api_key=os.getenv("ANTHROPIC_API_KEY"),
temperature=0.3
)
RESEARCH_PROMPT = """Research Agent: Analyze cluster or retrieve information.
## Two Modes
### Mode 1: Information Query (정보 조회)
User wants specific information (password, status, list, storage capacity, etc.)
- Execute kubectl commands to get the information
- Provide a clear, natural language answer
- Focus on exactly what the user asked
### Mode 2: Deployment Analysis (배포 분석)
User wants deployment decision
- Analyze cluster state comprehensively
- Collect version, tools, resources
- Provide structured findings
## Request commands in JSON:
{"commands": [{"tool": "execute_host", "command": "kubectl get nodes", "use_sudo": true}]}
Rules:
- Request 1-2 commands at a time
- Use execute_host for kubectl commands (with use_sudo: true)
- Output ONLY JSON when requesting commands
- For storage queries, use: kubectl get pvc, df -h, du -sh
- For memory queries, use: kubectl top nodes, kubectl top pods
- Be precise: storage ≠ memory
## Final report format
### For Information Query (IMPORTANT - Answer in natural Korean, NOT JSON):
Provide a direct answer in natural Korean language. Examples:
- "Gitea의 공유 스토리지는 10GB 할당되어 있으며, 현재 약 3.2GB를 사용 중입니다."
- "현재 클러스터에는 3개의 노드가 실행 중입니다."
DO NOT use JSON format for information queries. Just answer naturally.
### For Deployment Analysis:
{
"summary": "클러스터 상태 요약",
"cluster_info": {
"k8s_version": "v1.x.x",
"nodes": "3 nodes",
"existing_tools": ["ArgoCD", "Gitea"]
},
"findings": [{"category": "...", "data": "..."}]
}
Choose the appropriate format based on the user's request.
"""
def research_node(state: AgentState) -> AgentState:
"""
Research 노드: 정보 수집 (JSON 기반 명령어 방식)
"""
messages = state["messages"]
request_type = state.get("request_type", "deployment_decision")
task_plan = state.get("task_plan") or {}
research_needed = task_plan.get("research_needed", []) if isinstance(task_plan, dict) else []
# 사용자 원래 요청 찾기
user_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
# 연구 요청 구성
if request_type == "information_query":
# 정보 조회 모드: 사용자 요청을 그대로 전달
research_request = f"사용자가 다음 정보를 요청했습니다:\n\n{user_message}\n\n해당 정보를 kubectl 명령어로 조회하여 결과를 반환해주세요."
elif research_needed:
# 배포 결정 모드: Planning의 지시 따름
research_request = f"다음 정보를 수집해주세요:\n" + "\n".join(f"- {item}" for item in research_needed)
else:
# 기본 모드
if user_message:
research_request = f"사용자 요청: {user_message}\n\n위 요청에 필요한 정보를 수집하고 분석해주세요."
else:
research_request = "현재 시스템 상태를 분석하고 필요한 정보를 수집해주세요."
# 대화 히스토리 (도구 실행 결과 포함)
conversation = [
SystemMessage(content=RESEARCH_PROMPT),
HumanMessage(content=research_request)
]
tool_outputs = []
max_iterations = 2
iteration = 0
while iteration < max_iterations:
iteration += 1
print(f"\n{'='*80}")
print(f"Research Agent - Iteration {iteration}/{max_iterations}")
print(f"{'='*80}")
# Claude 호출
response = claude_research.invoke(conversation)
response_text = response.content
print(f"Response: {response_text[:500]}...")
print(f"\n📝 Full Response:\n{response_text}\n") # 디버깅용 전체 응답 출력
# JSON 명령어 추출 시도
commands_executed = False
is_final_answer = False
# 방법 1: ```json ... ``` 블록에서 추출
json_match = re.search(r'```json\s*(\{.*?\})\s*```', response_text, re.DOTALL)
if not json_match:
# 방법 2: 단순 {...} 블록 추출
json_match = re.search(r'(\{[^{}]*"commands"[^{}]*\[.*?\][^{}]*\})', response_text, re.DOTALL)
if json_match:
try:
commands_data = json.loads(json_match.group(1))
# commands가 있으면 실행
if "commands" in commands_data and commands_data["commands"]:
commands_executed = True
results = []
for cmd_spec in commands_data["commands"][:2]: # 최대 2개까지만 (토큰 절약)
tool_name = cmd_spec.get("tool", "execute_bash")
command = cmd_spec.get("command", "")
use_sudo = cmd_spec.get("use_sudo", False)
if not command:
continue
print(f"\n🔧 Executing: {tool_name}('{command[:80]}...')")
# 도구 실행
try:
from tools.bash_tool import execute_bash, execute_host
if tool_name == "execute_host":
result = execute_host.invoke({"command": command, "use_sudo": use_sudo})
else:
result = execute_bash.invoke({"command": command})
results.append(f"Command: {command}\nResult: {result}")
print(f"✅ Success")
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
results.append(f"Command: {command}\nResult: {error_msg}")
print(error_msg)
# 결과를 대화에 추가 (최신 것만 유지)
results_text = "\n\n".join(results)
tool_outputs.append(results_text)
# 요청 유형에 따라 다른 지시
if request_type == "information_query":
# 정보 조회: 자연어로 답변 지시
next_instruction = f"명령어 실행 결과:\n\n{results_text}\n\n**이제 위 결과를 바탕으로 사용자의 질문에 자연스러운 한국어로 답변해주세요. JSON이 아닌 일반 문장으로 작성하세요. 핵심 정보만 간결하게 전달하세요.**"
else:
# 배포 분석: 선택권 제공
next_instruction = f"명령어 실행 결과:\n\n{results_text}\n\n계속 정보가 필요하면 추가 명령어를 요청하고, 충분한 정보를 수집했으면 최종 리포트를 JSON으로 제공해주세요."
# 전체 히스토리 대신 시스템 프롬프트 + 초기 요청 + 최신 결과만 유지
conversation = [
SystemMessage(content=RESEARCH_PROMPT),
HumanMessage(content=research_request),
HumanMessage(content=next_instruction)
]
continue # 다음 반복으로
# 최종 리포트인 경우
elif "summary" in commands_data and "findings" in commands_data:
print("\n✅ 최종 리포트 수신")
is_final_answer = True
# 요청 유형에 따라 다른 포맷
if request_type == "information_query":
# 정보 조회: result 필드가 있으면 그것을 자연어 답변으로 사용
result = commands_data.get("result", "")
if result:
# result가 있으면 그대로 사용 (자연어 답변)
final_content = result.strip()
else:
# result가 없으면 findings에서 추출
findings = commands_data.get("findings", [])
summary_parts = []
for finding in findings[:3]:
data = finding.get("data", "")
if data:
summary_parts.append(data)
final_content = "\n".join(summary_parts) if summary_parts else "정보를 찾을 수 없습니다."
# 정보 조회는 바로 종료
state["current_agent"] = "end"
else:
# 배포 분석: 간단한 상태만 표시 (Decision agent가 상세 결과 표시)
final_content = "✅ 분석 완료"
# 배포 분석은 orchestrator로 돌아감 (decision으로 이동)
state["current_agent"] = "orchestrator"
state["research_data"] = commands_data
state["messages"].append({
"role": "research",
"content": final_content
})
return state
except json.JSONDecodeError as e:
print(f"⚠️ JSON 파싱 실패: {e}")
# 명령어도 없고 최종 리포트도 아니면 자연어 답변으로 간주
if not commands_executed and not is_final_answer:
print("\n✅ 자연어 답변 수신")
# 요청 유형에 따라 다른 출력
if request_type == "information_query":
# 정보 조회: Claude 응답을 간결하게 표시
# JSON이 아닌 자연어 답변인지 확인
if not response_text.strip().startswith('{'):
content = response_text.strip()
else:
# 만약 JSON이면 파싱해서 표시
try:
data = json.loads(response_text)
if "result" in data:
content = data["result"]
else:
content = response_text
except:
content = response_text
state["current_agent"] = "end"
else:
# 배포 분석: 간단한 메시지만 (Decision agent가 상세 결과 표시)
content = "✅ 분석 완료"
state["current_agent"] = "orchestrator"
state["research_data"] = {
"summary": "정보 수집 완료",
"findings": [{"category": "분석", "data": response_text}],
"recommendations": []
}
state["messages"].append({
"role": "research",
"content": content
})
return state
# 최대 반복 도달
print(f"\n⚠️ 최대 반복 횟수 도달 ({max_iterations})")
# 요청 유형에 따라 다른 출력
if request_type == "information_query":
# 정보 조회: 수집된 정보를 바탕으로 사용자 친화적인 답변 생성
if tool_outputs:
outputs_text = "\n\n".join(tool_outputs)
# Claude에게 결과 해석 요청
print("\n📝 결과 해석 요청 중...")
interpretation_prompt = f"""수집된 정보를 바탕으로 사용자 질문에 답변해주세요.
**사용자 질문:** {user_message}
**수집된 정보:**
{outputs_text}
위 정보를 바탕으로:
1. 사용자 질문에 직접적으로 답변
2. 한국어로 간결하게 작성
3. 핵심 정보만 포함
4. 기술적 세부사항은 필요시에만 포함
답변:"""
interpretation_response = claude_research.invoke([
HumanMessage(content=interpretation_prompt)
])
content = f"✅ 조회 완료\n\n{interpretation_response.content}"
state["research_data"] = {
"summary": "정보 수집 완료",
"findings": [{"category": "클러스터 정보", "data": outputs_text}],
"recommendations": []
}
else:
content = "✅ 조회 완료\n\n⚠️ 충분한 정보를 수집하지 못했습니다."
state["research_data"] = {
"summary": "정보 수집 불완전",
"findings": [{"category": "경고", "data": "추가 정보 필요"}],
"recommendations": []
}
state["current_agent"] = "end"
else:
# 배포 분석: 간단한 메시지만 (Decision agent가 상세 결과 표시)
content = "✅ 분석 완료"
if tool_outputs:
outputs_text = "\n\n".join(tool_outputs)
state["research_data"] = {
"summary": "정보 수집 완료",
"findings": [{"category": "클러스터 정보", "data": outputs_text}],
"recommendations": []
}
else:
state["research_data"] = {
"summary": "정보 수집 불완전",
"findings": [{"category": "경고", "data": "추가 정보 필요"}],
"recommendations": []
}
state["current_agent"] = "orchestrator"
state["messages"].append({
"role": "research",
"content": content
})
return state

18
langgraph/agents/state.py Normal file
View File

@@ -0,0 +1,18 @@
"""
공유 상태 정의 (AgentState)
K8s 인프라 분석 및 계획 수립에 특화
"""
from typing import TypedDict, Optional
class AgentState(TypedDict):
"""에이전트 간 공유되는 상태"""
messages: list # 대화 메시지 이력
current_agent: str # 현재 활성 에이전트
request_type: Optional[str] # 요청 유형: "information_query" or "deployment_decision"
task_plan: Optional[dict] # Planning Agent 출력 (폴더 구조, YAML 설계)
research_data: Optional[dict] # Research Agent 출력 (K8s 클러스터 상태)
decision_report: Optional[dict] # Decision Agent 출력 (추천/비추천 결정)
implementation_prompt: Optional[str] # Prompt Generator 출력 (구현 가이드)
iteration_count: int # 반복 횟수 (최대 2회)
error: Optional[str] # 에러 메시지

10
langgraph/chainlit.md Normal file
View File

@@ -0,0 +1,10 @@
# Welcome to Chainlit! 🚀🤖
Hi there, Developer! 👋 We're excited to have you on board. Chainlit is a powerful tool designed to help you prototype, debug and share applications built on top of LLMs.
## Useful Links 🔗
- **Documentation:** Get started with our comprehensive [Chainlit Documentation](https://docs.chainlit.io) 📚
- **Discord Community:** Join our friendly [Chainlit Discord](https://discord.gg/k73SQ3FyUh) to ask questions, share your projects, and connect with other developers! 💬
We can't wait to see what you create with Chainlit! Happy coding! 💻😊

165
langgraph/chainlit_app.py Normal file
View File

@@ -0,0 +1,165 @@
"""
Chainlit UI for MAS Platform
"""
import chainlit as cl
from workflow import mas_graph
from agents import AgentState
import os
from dotenv import load_dotenv
import contextvars
load_dotenv()
# Chainlit의 local_steps ContextVar 초기화
try:
from chainlit.step import local_steps
local_steps.set([])
except:
pass
@cl.on_chat_start
async def start():
"""채팅 시작 시"""
await cl.Message(
content="☸️ **K8s 인프라 분석 & 의사결정 시스템**에 오신 것을 환영합니다!\n\n"
"클러스터를 분석하고, 도구 도입 여부를 결정해드립니다.\n\n"
"**어떻게 작동하나요?**\n"
"1. 📋 도구 분석 → 2. 🔍 클러스터 상태 확인 → 3. 💡 추천/비추천 결정\n\n"
"**사용 예시**\n"
"```\n"
"Tekton 도입 여부를 결정해줘\n"
"Harbor가 필요한지 분석해줘\n"
"Prometheus를 설치해야 할까?\n"
"```\n\n"
"**결과물**\n"
"✅ 도입 추천 또는 ❌ 도입 비추천 (이유 포함)\n"
"🔄 대안 제시\n"
"📌 구현 가이드 (도입하는 경우)\n\n"
"궁금한 도구를 알려주세요!"
).send()
@cl.on_message
async def main(message: cl.Message):
"""메시지 수신 시"""
# local_steps ContextVar 초기화
try:
from chainlit.step import local_steps
local_steps.set([])
except:
pass
try:
# 초기 상태
initial_state: AgentState = {
"messages": [{"role": "user", "content": message.content}],
"current_agent": "orchestrator",
"request_type": None, # Orchestrator가 결정
"task_plan": None,
"research_data": None,
"decision_report": None,
"implementation_prompt": None,
"iteration_count": 0,
"error": None
}
# 응답 메시지 생성
response_msg = cl.Message(content="")
await response_msg.send()
# 상태 표시용 메시지
status_msg = cl.Message(content="⏳ 작업 중...")
await status_msg.send()
# MAS 그래프 실행
async for event in mas_graph.astream(initial_state):
for node_name, state in event.items():
if node_name != "__end__":
last_message = state["messages"][-1]
agent_name = last_message["role"]
agent_content = last_message["content"]
# 사용자에게 보여줄 에이전트만 필터링
user_facing_agents = ["planning", "research", "decision", "prompt_generator"]
if agent_name in user_facing_agents:
# 에이전트별 아이콘
agent_icons = {
"planning": "📋",
"research": "🔍",
"decision": "💡",
"prompt_generator": "📝"
}
agent_display_names = {
"planning": "도구 요구사항 분석",
"research": "클러스터 상태 분석",
"decision": "의사결정 분석",
"prompt_generator": "구현 가이드 생성"
}
icon = agent_icons.get(agent_name, "🤖")
display_name = agent_display_names.get(agent_name, agent_name)
# 내부 라우팅 정보 제거 (NEXT_AGENT, REASON 등)
cleaned_content = agent_content
for keyword in ["NEXT_AGENT:", "REASON:", "MESSAGE:"]:
if keyword in cleaned_content:
# 라우팅 정보가 포함된 경우 해당 부분 제거
lines = cleaned_content.split("\n")
cleaned_lines = [line for line in lines if not line.strip().startswith(keyword.replace(":", ""))]
cleaned_content = "\n".join(cleaned_lines)
# 스트리밍 업데이트
response_msg.content += f"\n\n{icon} **{display_name}**:\n{cleaned_content.strip()}"
await response_msg.update()
elif agent_name == "orchestrator":
# Orchestrator는 간단한 상태 메시지만 표시
current_agent = state.get("current_agent", "")
status_icons = {
"planning": "📋 도구 요구사항 분석 중...",
"research": "🔍 클러스터 상태 분석 중...",
"decision": "💡 의사결정 분석 중...",
"prompt_generator": "📝 구현 가이드 생성 중...",
"end": "✨ 분석 완료!"
}
status_text = status_icons.get(current_agent, "⏳ 작업 중...")
status_msg.content = status_text
await status_msg.update()
# 상태 메시지 제거
await status_msg.remove()
# 최종 업데이트
await response_msg.update()
except Exception as e:
error_msg = f"❌ 오류가 발생했습니다: {str(e)}"
await cl.Message(content=error_msg).send()
print(f"Error in main: {e}")
import traceback
traceback.print_exc()
@cl.on_settings_update
async def setup_agent(settings):
"""설정 업데이트"""
print(f"Settings updated: {settings}")
# 사이드바 설정
@cl.author_rename
def rename(orig_author: str):
"""에이전트 이름 매핑"""
rename_dict = {
"orchestrator": "조율자 (Claude 4.5)",
"planning": "요구사항 분석 (Claude 4.5)",
"research": "클러스터 분석 (Groq)",
"decision": "의사결정 (Claude 4.5)",
"prompt_generator": "구현 가이드 (Claude 4.5)"
}
return rename_dict.get(orig_author, orig_author)

View File

@@ -0,0 +1,35 @@
# LangGraph & LangChain
langgraph==0.2.53
langchain==0.3.13
langchain-anthropic==0.3.0
langchain-openai==0.2.14
langchain-google-genai==2.0.8
# Chainlit (UI) - 최신 버전으로 업그레이드
chainlit>=2.0.0
# Pydantic
pydantic>=2.0.0
pydantic-settings>=2.0.0
# Database
sqlalchemy==2.0.36
asyncpg==0.30.0
psycopg2-binary==2.9.10
# MCP (Model Context Protocol)
mcp==1.1.2
httpx==0.28.1
kubernetes==31.0.0
psycopg2-binary==2.9.10
GitPython==3.1.43
# YAML
PyYAML==6.0.2
# Utilities
python-dotenv==1.0.1
redis==5.2.1
aioredis==2.0.1
httpx==0.28.1

View File

@@ -0,0 +1,6 @@
"""
MAS Tools Package
"""
from .bash_tool import bash_tools, execute_bash, execute_host
__all__ = ['bash_tools', 'execute_bash', 'execute_host']

View File

@@ -0,0 +1,132 @@
"""
Bash 명령어 실행 도구
"""
import subprocess
import shlex
from langchain_core.tools import tool
from typing import Optional
@tool
def execute_bash(command: str, timeout: int = 30, cwd: Optional[str] = None) -> str:
"""
Execute a bash command in the container.
Args:
command: Bash command to execute
timeout: Command timeout in seconds (default: 30)
cwd: Working directory (default: None)
Returns:
Command output or error message
Examples:
- execute_bash("ls -la /app")
- execute_bash("python --version")
- execute_bash("curl -s http://prometheus:9090/api/v1/query?query=up")
"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
cwd=cwd
)
# Combine stdout and stderr
output = result.stdout
if result.stderr:
output += f"\n[STDERR]:\n{result.stderr}"
if result.returncode != 0:
return f"❌ Command failed (exit code {result.returncode}):\n{output}"
return f"✅ Command executed successfully:\n{output}"
except subprocess.TimeoutExpired:
return f"❌ Command timed out after {timeout} seconds"
except Exception as e:
return f"❌ Error executing command: {str(e)}"
@tool
def execute_host(command: str, timeout: int = 30, use_sudo: bool = False) -> str:
"""
Execute command on the HOST system using nsenter (NO SSH needed!).
USE THIS for accessing the host system:
- kubectl commands (Kubernetes cluster management)
- Accessing /home/ubuntu/Projects/ (Git repositories)
- PostgreSQL queries (via psql)
- Git operations on host repositories
- File system operations on host
- ALL host system operations
This works by entering the host's namespaces directly from the container.
Much faster than SSH and no authentication needed!
NOTE: Projects folder is located at /home/ubuntu/Projects/ on oracle-master server.
Args:
command: Command to run on the host system
timeout: Command timeout in seconds (default: 30)
use_sudo: Whether to prepend 'sudo' to the command (default: False)
Returns:
Command output or error message
Examples:
- execute_host("kubectl get pods -n mas", use_sudo=True)
- execute_host("ls -la /home/ubuntu/Projects")
- execute_host("cat /home/ubuntu/Projects/mas/README.md")
- execute_host("cd /home/ubuntu/Projects/mas && git log -5 --oneline")
- execute_host("psql -U bluemayne -h postgresql-primary.postgresql.svc.cluster.local -d postgres -c 'SELECT version()'")
"""
try:
# Use nsenter to enter host namespaces
# -t 1: target PID 1 (init process on host)
# -m: mount namespace
# -u: UTS namespace (hostname)
# -n: network namespace
# -i: IPC namespace
# Run as ubuntu user to avoid git "dubious ownership" errors
# Use 'su ubuntu -c' (without -) to preserve current directory context
# This allows commands to work from SSH initial directory
if use_sudo:
# For sudo commands, run directly with sudo
# Use shlex.quote to safely quote the command while preserving shell expansion
nsenter_command = f"nsenter -t 1 -m -u -n -i -- sh -c {shlex.quote(f'sudo {command}')}"
else:
# For regular commands, run as ubuntu user
# Use 'su ubuntu -c' (not 'su - ubuntu -c') to preserve current directory
# This matches SSH behavior where you start from the initial directory
nsenter_command = f"nsenter -t 1 -m -u -n -i -- su ubuntu -c {shlex.quote(command)}"
result = subprocess.run(
nsenter_command,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
# Combine stdout and stderr
output = result.stdout
if result.stderr:
output += f"\n[STDERR]:\n{result.stderr}"
if result.returncode != 0:
return f"❌ Host command failed (exit code {result.returncode}):\n{output}"
return f"✅ Host command executed successfully:\n{output}"
except subprocess.TimeoutExpired:
return f"❌ Host command timed out after {timeout} seconds"
except Exception as e:
return f"❌ Error executing host command: {str(e)}"
# Export both tools
bash_tools = [execute_bash, execute_host]

100
langgraph/workflow.py Normal file
View File

@@ -0,0 +1,100 @@
"""
LangGraph K8s Infrastructure Planning Workflow
워크플로우: Planning → Research → Decision → Prompt Generation → End
"""
from typing import Literal
from langgraph.graph import StateGraph, END
from agents import (
AgentState,
orchestrator_node,
planning_node,
research_node,
decision_node,
prompt_generator_node
)
def router(state: AgentState) -> Literal[
"planning",
"research",
"decision",
"prompt_generator",
"end"
]:
"""
다음 에이전트 라우팅 로직
정보 조회: research → end
도입 결정: planning → research → decision → prompt_generator (추천시) → end
"""
current = state.get("current_agent", "orchestrator")
# 명시적으로 지정된 다음 에이전트로 이동
if current in ["planning", "research", "decision", "prompt_generator"]:
return current
# end 상태
if current == "end":
return "end"
# 기본값: orchestrator가 결정
return "planning"
def create_mas_workflow():
"""
K8s Infrastructure Analysis & Decision Workflow 생성
워크플로우 1 (정보 조회):
User Request (e.g., "PostgreSQL 비밀번호 알려줘")
Orchestrator → Research → Orchestrator → End
워크플로우 2 (도입 결정):
User Request (e.g., "Tekton 도입할까?")
Orchestrator → Planning → Orchestrator
Research (K8s cluster analysis) → Orchestrator
Decision (추천/비추천) → Orchestrator
Prompt Generator (추천시만, 구현 가이드) → Orchestrator
End
"""
workflow = StateGraph(AgentState)
# 노드 추가
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("planning", planning_node)
workflow.add_node("research", research_node)
workflow.add_node("decision", decision_node)
workflow.add_node("prompt_generator", prompt_generator_node)
# 시작점: Orchestrator
workflow.set_entry_point("orchestrator")
# Orchestrator의 조건부 라우팅
workflow.add_conditional_edges(
"orchestrator",
router,
{
"planning": "planning",
"research": "research",
"decision": "decision",
"prompt_generator": "prompt_generator",
"end": END
}
)
# 각 에이전트는 작업 후 Orchestrator로 복귀
workflow.add_edge("planning", "orchestrator")
workflow.add_edge("research", "orchestrator")
workflow.add_edge("decision", "orchestrator")
workflow.add_edge("prompt_generator", "orchestrator")
return workflow.compile()
# 그래프 인스턴스 생성
mas_graph = create_mas_workflow()