REFACTOR(app): use native Python clients

Major changes:
- Kubernetes tools: Replace subprocess kubectl calls with kubernetes-client library
  - Supports in-cluster config for pod execution
  - Fallback to local kubeconfig for development
  - All k8s tools (nodes, pods, deployments, logs, describe) now use Python API

- PostgreSQL tools: Replace kubectl exec psql with direct psycopg2 connection
  - Connect via Kubernetes service DNS
  - Support for environment-based configuration
  - Improved error handling with proper pgcode/pgerror

- Prometheus tools: Replace kubectl exec wget with direct HTTP requests
  - Use requests library to query Prometheus API
  - Connect via Kubernetes service DNS
  - Configurable via PROMETHEUS_URL env var

- Deployment updates: Add explicit PostgreSQL connection env vars
  - POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER
  - Already had POSTGRES_PASSWORD from secret

Benefits:
- No longer requires kubectl binary in container
- Faster execution (no subprocess overhead)
- Better error handling and type safety
- Works seamlessly in Kubernetes pods with RBAC
This commit is contained in:
2025-12-24 00:47:17 +09:00
parent 7197e94baf
commit 6f984e5b6f
3 changed files with 333 additions and 128 deletions

View File

@@ -42,6 +42,12 @@ spec:
# SQLAlchemy format (if needed)
- name: DATABASE_URL
value: "postgresql+asyncpg://bluemayne:$(POSTGRES_PASSWORD)@postgresql-primary.postgresql.svc.cluster.local:5432/mas"
- name: POSTGRES_HOST
value: "postgresql-primary.postgresql.svc.cluster.local"
- name: POSTGRES_PORT
value: "5432"
- name: POSTGRES_USER
value: "bluemayne"
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:

View File

@@ -16,5 +16,5 @@ commonLabels:
# 이미지 태그 설정 (ArgoCD Image Updater가 자동으로 업데이트)
images:
- name: gitea0213.kro.kr/bluemayne/mas
newTag: main-sha-28c6e6f8b827900a25c03f3e63c874ddedecf535
newTag: main-sha-22b0840afde85b864df82a5c9408da8c78e28195

View File

@@ -13,6 +13,10 @@ import subprocess
import json
import requests
from datetime import datetime
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import psycopg2
from urllib.parse import quote_plus
class AgentState(TypedDict):
@@ -23,6 +27,26 @@ class AgentState(TypedDict):
result: dict
# ===== Kubernetes Client 초기화 =====
try:
# Try in-cluster config first (Pod 내부에서 실행 시)
config.load_incluster_config()
print("✅ Loaded in-cluster Kubernetes config")
except config.ConfigException:
# Fallback to local kubeconfig (로컬 개발 시)
try:
config.load_kube_config()
print("✅ Loaded local Kubernetes config")
except config.ConfigException:
print("⚠️ No Kubernetes config found - K8s tools will fail")
# Kubernetes API clients
k8s_core_v1 = client.CoreV1Api()
k8s_apps_v1 = client.AppsV1Api()
k8s_batch_v1 = client.BatchV1Api()
k8s_networking_v1 = client.NetworkingV1Api()
# ===== MCP Tools =====
# === 1. Kubernetes MCP Tools ===
@@ -32,34 +56,35 @@ def k8s_get_nodes() -> str:
Get Kubernetes cluster nodes information including status, roles, CPU and memory.
"""
try:
result = subprocess.run(
["kubectl", "get", "nodes", "-o", "json"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
nodes = json.loads(result.stdout)
info = []
for node in nodes.get("items", []):
name = node["metadata"]["name"]
labels = node["metadata"].get("labels", {})
roles = [k.split("/")[1] for k in labels if "node-role.kubernetes.io" in k]
role_str = ",".join(roles) if roles else "worker"
nodes = k8s_core_v1.list_node()
info = []
status = "Unknown"
for cond in node["status"]["conditions"]:
if cond["type"] == "Ready":
status = "Ready" if cond["status"] == "True" else "NotReady"
for node in nodes.items:
name = node.metadata.name
labels = node.metadata.labels or {}
capacity = node["status"]["capacity"]
cpu = capacity.get("cpu", "?")
mem = capacity.get("memory", "?")
# Extract roles from labels
roles = [k.split("/")[1] for k in labels if "node-role.kubernetes.io" in k]
role_str = ",".join(roles) if roles else "worker"
info.append(f"{name} [{role_str}]: {status} | CPU: {cpu}, Memory: {mem}")
# Get node status
status = "Unknown"
for cond in node.status.conditions:
if cond.type == "Ready":
status = "Ready" if cond.status == "True" else "NotReady"
return f"📦 Kubernetes Nodes ({len(info)}):\n" + "\n".join(info)
return f"❌ Error: {result.stderr}"
# Get capacity
capacity = node.status.capacity
cpu = capacity.get("cpu", "?")
mem = capacity.get("memory", "?")
info.append(f"{name} [{role_str}]: {status} | CPU: {cpu}, Memory: {mem}")
return f"📦 Kubernetes Nodes ({len(info)}):\n" + "\n".join(info)
except ApiException as e:
return f"❌ Kubernetes API error: {e.status} {e.reason}"
except Exception as e:
return f"kubectl error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -71,31 +96,36 @@ def k8s_get_pods(namespace: str = "", label_selector: str = "") -> str:
label_selector: Filter by labels (e.g., "app=myapp")
"""
try:
cmd = ["kubectl", "get", "pods", "-o", "json"]
# Get pods based on namespace filter
if namespace:
cmd.extend(["-n", namespace])
pods = k8s_core_v1.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector if label_selector else None
)
else:
cmd.append("--all-namespaces")
if label_selector:
cmd.extend(["-l", label_selector])
pods = k8s_core_v1.list_pod_for_all_namespaces(
label_selector=label_selector if label_selector else None
)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
pods = json.loads(result.stdout)
info = []
for pod in pods.get("items", []):
name = pod["metadata"]["name"]
ns = pod["metadata"]["namespace"]
phase = pod["status"]["phase"]
restarts = sum(c.get("restartCount", 0) for c in pod["status"].get("containerStatuses", []))
info = []
for pod in pods.items:
name = pod.metadata.name
ns = pod.metadata.namespace
phase = pod.status.phase
emoji = "" if phase == "Running" else "⚠️" if phase == "Pending" else ""
info.append(f"{emoji} {ns}/{name}: {phase} (restarts: {restarts})")
# Calculate total restarts
restarts = 0
if pod.status.container_statuses:
restarts = sum(c.restart_count for c in pod.status.container_statuses)
return f"🐳 Pods ({len(info)}):\n" + "\n".join(info[:50]) # Limit to 50
return f"❌ Error: {result.stderr}"
emoji = "" if phase == "Running" else "⚠️" if phase == "Pending" else ""
info.append(f"{emoji} {ns}/{name}: {phase} (restarts: {restarts})")
return f"🐳 Pods ({len(info)}):\n" + "\n".join(info[:50]) # Limit to 50
except ApiException as e:
return f"❌ Kubernetes API error: {e.status} {e.reason}"
except Exception as e:
return f"kubectl error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -106,29 +136,27 @@ def k8s_get_deployments(namespace: str = "") -> str:
namespace: Filter by namespace (empty = all namespaces)
"""
try:
cmd = ["kubectl", "get", "deployments", "-o", "json"]
# Get deployments based on namespace filter
if namespace:
cmd.extend(["-n", namespace])
deployments = k8s_apps_v1.list_namespaced_deployment(namespace=namespace)
else:
cmd.append("--all-namespaces")
deployments = k8s_apps_v1.list_deployment_for_all_namespaces()
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
deployments = json.loads(result.stdout)
info = []
for deploy in deployments.get("items", []):
name = deploy["metadata"]["name"]
ns = deploy["metadata"]["namespace"]
desired = deploy["spec"].get("replicas", 0)
ready = deploy["status"].get("readyReplicas", 0)
info = []
for deploy in deployments.items:
name = deploy.metadata.name
ns = deploy.metadata.namespace
desired = deploy.spec.replicas or 0
ready = deploy.status.ready_replicas or 0
emoji = "" if ready == desired else "⚠️"
info.append(f"{emoji} {ns}/{name}: {ready}/{desired} ready")
emoji = "" if ready == desired else "⚠️"
info.append(f"{emoji} {ns}/{name}: {ready}/{desired} ready")
return f"📦 Deployments ({len(info)}):\n" + "\n".join(info[:30])
return f"❌ Error: {result.stderr}"
return f"📦 Deployments ({len(info)}):\n" + "\n".join(info[:30])
except ApiException as e:
return f"❌ Kubernetes API error: {e.status} {e.reason}"
except Exception as e:
return f"kubectl error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -141,15 +169,16 @@ def k8s_get_pod_logs(namespace: str, pod_name: str, tail: int = 50) -> str:
tail: Number of lines to show (default: 50)
"""
try:
result = subprocess.run(
["kubectl", "logs", "-n", namespace, pod_name, f"--tail={tail}"],
capture_output=True, text=True, timeout=10
logs = k8s_core_v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=tail
)
if result.returncode == 0:
return f"📜 Logs for {namespace}/{pod_name}:\n```\n{result.stdout}\n```"
return f"Error: {result.stderr}"
return f"📜 Logs for {namespace}/{pod_name}:\n```\n{logs}\n```"
except ApiException as e:
return f"Kubernetes API error: {e.status} {e.reason}"
except Exception as e:
return f"kubectl error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -162,20 +191,65 @@ def k8s_describe_resource(resource_type: str, name: str, namespace: str = "defau
namespace: Namespace (default: default)
"""
try:
result = subprocess.run(
["kubectl", "describe", resource_type, name, "-n", namespace],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
# Return last 50 lines to keep it manageable
lines = result.stdout.split("\n")
return f"🔍 Describe {resource_type}/{name} in {namespace}:\n```\n" + "\n".join(lines[-50:]) + "\n```"
return f"❌ Error: {result.stderr}"
resource_type = resource_type.lower()
if resource_type in ["pod", "pods"]:
resource = k8s_core_v1.read_namespaced_pod(name=name, namespace=namespace)
elif resource_type in ["deployment", "deployments", "deploy"]:
resource = k8s_apps_v1.read_namespaced_deployment(name=name, namespace=namespace)
elif resource_type in ["service", "services", "svc"]:
resource = k8s_core_v1.read_namespaced_service(name=name, namespace=namespace)
elif resource_type in ["statefulset", "statefulsets", "sts"]:
resource = k8s_apps_v1.read_namespaced_stateful_set(name=name, namespace=namespace)
elif resource_type in ["daemonset", "daemonsets", "ds"]:
resource = k8s_apps_v1.read_namespaced_daemon_set(name=name, namespace=namespace)
elif resource_type in ["ingress", "ingresses", "ing"]:
resource = k8s_networking_v1.read_namespaced_ingress(name=name, namespace=namespace)
else:
return f"❌ Unsupported resource type: {resource_type}"
# Format resource information
result_lines = [
f"Name: {resource.metadata.name}",
f"Namespace: {resource.metadata.namespace}",
f"Created: {resource.metadata.creation_timestamp}",
f"Labels: {resource.metadata.labels}",
f"Annotations: {resource.metadata.annotations}",
]
# Add status if available
if hasattr(resource, 'status') and resource.status:
result_lines.append(f"Status: {resource.status}")
output = "\n".join(str(line) for line in result_lines)
return f"🔍 Describe {resource_type}/{name} in {namespace}:\n```\n{output}\n```"
except ApiException as e:
return f"❌ Kubernetes API error: {e.status} {e.reason}"
except Exception as e:
return f"kubectl error: {str(e)}"
return f"Error: {str(e)}"
# === 2. PostgreSQL MCP Tools ===
def get_postgres_connection(database: str = "postgres"):
"""
Create PostgreSQL connection using environment variables or k8s service.
"""
# Try to connect via Kubernetes service (when running in cluster)
pg_host = os.getenv("POSTGRES_HOST", "postgresql-primary.postgresql.svc.cluster.local")
pg_port = os.getenv("POSTGRES_PORT", "5432")
pg_user = os.getenv("POSTGRES_USER", "bluemayne")
pg_password = os.getenv("POSTGRES_PASSWORD", "")
return psycopg2.connect(
host=pg_host,
port=pg_port,
user=pg_user,
password=pg_password,
database=database,
connect_timeout=10
)
@tool
def postgres_query(query: str, database: str = "postgres") -> str:
"""
@@ -188,20 +262,28 @@ def postgres_query(query: str, database: str = "postgres") -> str:
return "❌ Only SELECT queries are allowed for safety"
try:
# Use kubectl exec to run psql in the PostgreSQL pod
pg_password = os.getenv("POSTGRES_PASSWORD", "")
cmd = [
"kubectl", "exec", "-n", "postgresql", "postgresql-primary-0", "--",
"env", f"PGPASSWORD={pg_password}",
"psql", "-U", "bluemayne", "-d", database, "-c", query
]
conn = get_postgres_connection(database)
cursor = conn.cursor()
cursor.execute(query)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0:
return f"📊 Query Result:\n```\n{result.stdout}\n```"
return f"❌ Error: {result.stderr}"
# Fetch results
rows = cursor.fetchall()
colnames = [desc[0] for desc in cursor.description]
# Format output
result_lines = [" | ".join(colnames)]
result_lines.append("-" * len(result_lines[0]))
for row in rows[:100]: # Limit to 100 rows
result_lines.append(" | ".join(str(val) for val in row))
cursor.close()
conn.close()
return f"📊 Query Result ({len(rows)} rows):\n```\n" + "\n".join(result_lines) + "\n```"
except psycopg2.Error as e:
return f"❌ PostgreSQL error: {e.pgcode} - {e.pgerror}"
except Exception as e:
return f"PostgreSQL error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -210,19 +292,23 @@ def postgres_list_databases() -> str:
List all databases in PostgreSQL cluster.
"""
try:
pg_password = os.getenv("POSTGRES_PASSWORD", "")
cmd = [
"kubectl", "exec", "-n", "postgresql", "postgresql-primary-0", "--",
"env", f"PGPASSWORD={pg_password}",
"psql", "-U", "bluemayne", "-c", "\\l"
]
conn = get_postgres_connection("postgres")
cursor = conn.cursor()
cursor.execute("SELECT datname, pg_size_pretty(pg_database_size(datname)) AS size FROM pg_database WHERE datistemplate = false ORDER BY datname;")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return f"🗄️ Databases:\n```\n{result.stdout}\n```"
return f"❌ Error: {result.stderr}"
rows = cursor.fetchall()
result_lines = ["Database | Size", "---------|-----"]
for row in rows:
result_lines.append(f"{row[0]} | {row[1]}")
cursor.close()
conn.close()
return f"🗄️ Databases ({len(rows)}):\n```\n" + "\n".join(result_lines) + "\n```"
except psycopg2.Error as e:
return f"❌ PostgreSQL error: {e.pgcode} - {e.pgerror}"
except Exception as e:
return f"PostgreSQL error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -261,9 +347,9 @@ def git_recent_commits(repo: str, limit: int = 10) -> str:
limit: Number of commits to show (default: 10)
"""
try:
repo_path = f"/Users/bluemayne/Projects/{repo}"
repo_path = f"/app/repos/{repo}" # Container 내부 경로
if not os.path.exists(repo_path):
return f"❌ Repository not found: {repo_path}"
return f"❌ Repository not found: {repo_path}. Use git_clone_repo first."
result = subprocess.run(
["git", "-C", repo_path, "log", f"-{limit}", "--oneline"],
@@ -276,6 +362,111 @@ def git_recent_commits(repo: str, limit: int = 10) -> str:
return f"❌ Git error: {str(e)}"
@tool
def git_clone_repo(repo_url: str, repo_name: str = "") -> str:
"""
Clone a Git repository locally.
Args:
repo_url: Full repository URL (e.g., https://gitea0213.kro.kr/bluemayne/harbor.git)
repo_name: Optional local directory name (default: extracted from URL)
"""
try:
if not repo_name:
repo_name = repo_url.split("/")[-1].replace(".git", "")
repo_path = f"/app/repos/{repo_name}"
os.makedirs("/app/repos", exist_ok=True)
if os.path.exists(repo_path):
return f"✅ Repository already exists: {repo_path}"
# Use Gitea token if available
token = os.getenv("GITEA_TOKEN", "")
if token:
# Inject token into URL
if "gitea0213.kro.kr" in repo_url:
repo_url = repo_url.replace("https://", f"https://{token}@")
result = subprocess.run(
["git", "clone", repo_url, repo_path],
capture_output=True, text=True, timeout=60
)
if result.returncode == 0:
return f"✅ Cloned repository: {repo_name} to {repo_path}"
return f"❌ Error: {result.stderr}"
except Exception as e:
return f"❌ Git clone error: {str(e)}"
@tool
def git_create_file(repo_name: str, file_path: str, content: str, commit_message: str = "") -> str:
"""
Create or update a file in a Git repository and commit it.
Args:
repo_name: Repository name (must be cloned first)
file_path: File path relative to repo root
content: File content
commit_message: Commit message (default: "Add/Update {file_path}")
"""
try:
repo_path = f"/app/repos/{repo_name}"
if not os.path.exists(repo_path):
return f"❌ Repository not found: {repo_path}. Use git_clone_repo first."
full_path = os.path.join(repo_path, file_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(content)
# Git add
subprocess.run(["git", "-C", repo_path, "add", file_path], check=True, timeout=10)
# Git commit
if not commit_message:
commit_message = f"Add/Update {file_path}"
result = subprocess.run(
["git", "-C", repo_path, "commit", "-m", commit_message],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
return f"✅ Created/Updated {file_path} and committed"
return f"⚠️ File created but commit failed: {result.stderr}"
except Exception as e:
return f"❌ Git file error: {str(e)}"
@tool
def git_push(repo_name: str, branch: str = "main") -> str:
"""
Push commits to remote repository.
Args:
repo_name: Repository name
branch: Branch name (default: main)
"""
try:
repo_path = f"/app/repos/{repo_name}"
if not os.path.exists(repo_path):
return f"❌ Repository not found: {repo_path}"
# Configure git user if needed
subprocess.run(["git", "-C", repo_path, "config", "user.name", "mas-agent"], timeout=5)
subprocess.run(["git", "-C", repo_path, "config", "user.email", "mas-agent@mas.local"], timeout=5)
result = subprocess.run(
["git", "-C", repo_path, "push", "origin", branch],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return f"✅ Pushed to {branch} branch"
return f"❌ Push failed: {result.stderr}"
except Exception as e:
return f"❌ Git push error: {str(e)}"
# === 4. Prometheus MCP Tools ===
@tool
def prometheus_query(query: str) -> str:
@@ -285,15 +476,21 @@ def prometheus_query(query: str) -> str:
query: PromQL query (e.g., "up", "node_cpu_seconds_total")
"""
try:
# Prometheus is accessible via kubectl port-forward or ingress
# For now, use kubectl proxy approach
result = subprocess.run(
["kubectl", "exec", "-n", "monitoring", "deployment/prometheus-kube-prometheus-operator", "--",
"wget", "-qO-", f"http://localhost:9090/api/v1/query?query={query}"],
capture_output=True, text=True, timeout=10
# Access Prometheus via Kubernetes service
prometheus_url = os.getenv(
"PROMETHEUS_URL",
"http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090"
)
if result.returncode == 0:
data = json.loads(result.stdout)
# Make HTTP request
response = requests.get(
f"{prometheus_url}/api/v1/query",
params={"query": query},
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
results = data.get("data", {}).get("result", [])
output = []
@@ -301,11 +498,13 @@ def prometheus_query(query: str) -> str:
metric = r.get("metric", {})
value = r.get("value", [None, "N/A"])[1]
output.append(f"{metric}: {value}")
return f"📈 Prometheus Query Result:\n" + "\n".join(output)
return f"📈 Prometheus Query Result ({len(results)} metrics):\n" + "\n".join(output)
return f"❌ Query failed: {data}"
return f"❌ Error: {result.stderr}"
return f" HTTP Error: {response.status_code} - {response.text}"
except requests.RequestException as e:
return f"❌ Prometheus request error: {str(e)}"
except Exception as e:
return f"Prometheus error: {str(e)}"
return f"Error: {str(e)}"
@tool
@@ -327,8 +526,8 @@ def fs_read_file(file_path: str, max_lines: int = 100) -> str:
max_lines: Maximum lines to read (default: 100)
"""
try:
# Only allow reading from safe directories
safe_dirs = ["/var/log", "/tmp", os.path.expanduser("~/Projects")]
# Allow reading from safe directories
safe_dirs = ["/app/repos", "/var/log", "/tmp"]
if not any(file_path.startswith(d) for d in safe_dirs):
return f"❌ Access denied: {file_path} (not in safe directories)"