INIT(api): add FastAPI application

- Initialize FastAPI project structure
- Add basic API configuration
This commit is contained in:
2025-12-01 14:34:20 +09:00
commit 615fe6e574
19 changed files with 1418 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

158
.github/workflows/build.yml vendored Normal file
View File

@@ -0,0 +1,158 @@
name: Build Docker Image
on:
push:
branches: [main]
tags:
- 'v*'
workflow_dispatch:
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
build-and-push:
runs-on: ubuntu-latest
permissions:
contents: write
packages: write
outputs:
image-tag: ${{ steps.meta.outputs.tags }}
image-digest: ${{ steps.build.outputs.digest }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Lowercase repository name
id: lowercase
run: |
echo "repo=$(echo ${{ github.repository }} | tr '[:upper:]' '[:lower:]')" >> $GITHUB_OUTPUT
- name: Extract metadata (tags, labels)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ steps.lowercase.outputs.repo }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha,prefix={{branch}}-sha-,format=long
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push Docker image
id: build
uses: docker/build-push-action@v5
with:
context: ./services/fastapi
file: ./deploy/docker/Dockerfile.prod
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Extract SHA tag
id: extract-tag
run: |
# Extract the SHA-based tag from the tags list
TAGS="${{ steps.meta.outputs.tags }}"
echo "All tags:"
echo "$TAGS"
echo "---"
# Get commit SHA (full 40 characters)
COMMIT_SHA="${{ github.sha }}"
# Method 1: Extract the full SHA tag from docker/metadata-action output
# docker/metadata-action creates: main-sha-<full-40-char-sha>
SHA_TAG=$(echo "$TAGS" | grep -oE 'main-sha-[a-f0-9]{40}' | head -n 1)
# Method 2: If not found, try to extract any main-sha- tag (fallback)
if [ -z "$SHA_TAG" ]; then
SHA_TAG=$(echo "$TAGS" | grep -oE 'main-sha-[a-f0-9]+' | head -n 1)
if [ -n "$SHA_TAG" ]; then
echo "⚠️ Found SHA tag (may not be full 40 chars): $SHA_TAG"
fi
fi
# Method 3: Fallback to commit SHA directly (construct the tag)
if [ -z "$SHA_TAG" ]; then
SHA_TAG="main-sha-$COMMIT_SHA"
echo "⚠️ Could not extract from tags, using commit SHA: $SHA_TAG"
fi
if [ -z "$SHA_TAG" ]; then
echo "❌ ERROR: Failed to extract SHA tag"
exit 1
fi
echo "sha-tag=$SHA_TAG" >> $GITHUB_OUTPUT
echo "✅ Extracted SHA tag: $SHA_TAG"
- name: Update kustomization with new image tag
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
git config --global user.name "github-actions[bot]"
git config --global user.email "github-actions[bot]@users.noreply.github.com"
# Validate that SHA_TAG is not empty
SHA_TAG="${{ steps.extract-tag.outputs.sha-tag }}"
if [ -z "$SHA_TAG" ]; then
echo "❌ ERROR: SHA_TAG is empty, cannot update kustomization"
exit 1
fi
echo "📝 Updating kustomization.yaml with tag: $SHA_TAG"
# Update kustomization.yaml with new image tag
# Handle both cases: newTag: (with value) and newTag: (empty)
sed -i.bak "s|newTag:.*|newTag: $SHA_TAG|" deploy/k8s/overlays/prod/kustomization.yaml
# Verify the update was successful
if grep -q "newTag: $SHA_TAG" deploy/k8s/overlays/prod/kustomization.yaml; then
echo "✅ Successfully updated kustomization.yaml"
rm -f deploy/k8s/overlays/prod/kustomization.yaml.bak
else
echo "❌ ERROR: Failed to update kustomization.yaml"
cat deploy/k8s/overlays/prod/kustomization.yaml
exit 1
fi
# Commit and push if there are changes
if git diff --quiet; then
echo "No changes to commit"
else
git add deploy/k8s/overlays/prod/kustomization.yaml
git commit -m "Update image to $SHA_TAG"
git push
echo "✅ Kustomization updated with new image tag: $SHA_TAG"
fi
- name: Display image information
run: |
echo "✅ Image built and pushed successfully!"
echo "📦 Image tags:"
echo "${{ steps.meta.outputs.tags }}"
echo "🔖 SHA tag: ${{ steps.extract-tag.outputs.sha-tag }}"
echo "🔖 Digest: ${{ steps.build.outputs.digest }}"
echo ""
echo "🚀 Kustomization updated with new image tag"
echo " ArgoCD will automatically detect and deploy this new image"
echo " Monitor deployment at your ArgoCD dashboard"

48
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,48 @@
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
jobs:
lint-and-test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
cache-dependency-path: services/fastapi/requirements.txt
- name: Install dependencies
working-directory: services/fastapi
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install ruff pytest httpx
- name: Run Ruff linter
working-directory: services/fastapi
run: ruff check . --ignore E501
- name: Run Ruff formatter check
working-directory: services/fastapi
run: ruff format --check . || true
- name: Test FastAPI application import
working-directory: services/fastapi
run: |
python -c "from main import app; print('✅ FastAPI app imported successfully')"
- name: Check application health endpoint
working-directory: services/fastapi
run: |
echo "✅ CI completed successfully"

View File

@@ -0,0 +1,42 @@
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: joossam
namespace: argocd
annotations:
argocd-image-updater.argoproj.io/image-list: joossam=ghcr.io/mayne0213/joossam
argocd-image-updater.argoproj.io/joossam.update-strategy: latest
argocd-image-updater.argoproj.io/write-back-method: argocd
finalizers:
- resources-finalizer.argocd.argoproj.io
spec:
project: default
source:
repoURL: https://github.com/Mayne0213/joossam.git
targetRevision: main
path: deploy/k8s/overlays/prod
destination:
server: https://kubernetes.default.svc
namespace: joossam
syncPolicy:
automated:
prune: true # 매니페스트에서 제거된 리소스 자동 삭제
selfHeal: true # 클러스터에서 수동 변경 시 자동 복구
allowEmpty: false
syncOptions:
- CreateNamespace=true # namespace가 없으면 자동 생성
- PrunePropagationPolicy=foreground
- PruneLast=true
retry:
limit: 5
backoff:
duration: 5s
factor: 2
maxDuration: 3m
revisionHistoryLimit: 10

View File

@@ -0,0 +1,34 @@
# Development Dockerfile for Joossam FastAPI OMR Grading Service
FROM python:3.11-slim
# 시스템 의존성 설치 (OpenCV용)
RUN apt-get update && apt-get install -y --no-install-recommends \
libgl1-mesa-glx \
libglib2.0-0 \
libsm6 \
libxext6 \
libxrender-dev \
curl \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 개발용 의존성 추가
RUN pip install --no-cache-dir watchfiles
# 애플리케이션 코드 복사
COPY . .
EXPOSE 8000
# 헬스체크
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 개발 서버 실행 (hot-reload 활성화)
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

View File

@@ -0,0 +1,39 @@
# Production Dockerfile for Joossam FastAPI OMR Grading Service
FROM python:3.11-slim AS base
# 시스템 의존성 설치 (OpenCV용)
RUN apt-get update && apt-get install -y --no-install-recommends \
libgl1-mesa-glx \
libglib2.0-0 \
libsm6 \
libxext6 \
libxrender-dev \
curl \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 애플리케이션 코드 복사
COPY . .
# 비루트 사용자 생성 및 전환
RUN addgroup --system --gid 1001 appgroup && \
adduser --system --uid 1001 --gid 1001 appuser && \
chown -R appuser:appgroup /app
USER appuser
EXPOSE 8000
ENV HOST=0.0.0.0
ENV PORT=8000
# 헬스체크
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -0,0 +1,25 @@
services:
# Development Joossam FastAPI Application
app:
build:
context: ../../services/fastapi
dockerfile: ../../deploy/docker/Dockerfile.dev
container_name: joossam-app-dev
restart: unless-stopped
labels:
kompose.namespace: joossam
ports:
- 8001:8000
environment:
- ENV=development
networks:
- joossam-network
volumes:
- ../../services/fastapi:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
networks:
joossam-network:
driver: bridge
name: joossam-network-dev

View File

@@ -0,0 +1,23 @@
services:
# Production Joossam FastAPI Application
app:
image: joossam-app
build:
context: ../../services/fastapi
dockerfile: ../../deploy/docker/Dockerfile.prod
container_name: joossam-app-prod
restart: unless-stopped
labels:
kompose.namespace: joossam
ports:
- 8001:8000
environment:
- ENV=production
networks:
- joossam-network
networks:
joossam-network:
driver: bridge
name: joossam-network-prod

View File

@@ -0,0 +1,51 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: joossam-app
labels:
app: joossam-app
spec:
replicas: 1
selector:
matchLabels:
app: joossam-app
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
template:
metadata:
labels:
app: joossam-app
spec:
containers:
- name: joossam-app
image: ghcr.io/mayne0213/joossam:latest
imagePullPolicy: Always
ports:
- containerPort: 8000
protocol: TCP
env:
- name: ENV
value: production
resources:
requests:
memory: "150Mi"
cpu: "100m"
limits:
memory: "300Mi"
cpu: "300m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
restartPolicy: Always

View File

@@ -0,0 +1,14 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- deployment.yaml
- service.yaml
commonLabels:
app.kubernetes.io/name: joossam
app.kubernetes.io/component: api
images:
- name: ghcr.io/mayne0213/joossam
newTag: latest

View File

@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: joossam-service
labels:
app: joossam-app
spec:
type: ClusterIP
ports:
- name: http
port: 80
targetPort: 8000
protocol: TCP
selector:
app: joossam-app

View File

@@ -0,0 +1,19 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: joossam-app
labels:
environment: production
spec:
replicas: 1
template:
spec:
containers:
- name: joossam-app
resources:
requests:
memory: "100Mi"
cpu: "50m"
limits:
memory: "200Mi"
cpu: "200m"

View File

@@ -0,0 +1,19 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: joossam
resources:
- ../../base
- resourcequota.yaml
commonLabels:
environment: production
# 이미지 태그 설정
images:
- name: ghcr.io/mayne0213/joossam
newTag: latest
patchesStrategicMerge:
- deployment-patch.yaml

View File

@@ -0,0 +1,12 @@
apiVersion: v1
kind: ResourceQuota
metadata:
name: joossam-quota
namespace: joossam
spec:
hard:
requests.memory: "300Mi"
requests.cpu: "200m"
limits.memory: "400Mi"
limits.cpu: "400m"
pods: "3"

172
scripts/common.sh Executable file
View File

@@ -0,0 +1,172 @@
#!/bin/bash
# Joossam 스크립트 공통 유틸리티 함수들
# 모든 Joossam 스크립트에서 사용할 수 있는 공통 기능들을 정의
set -e
# 공통 스크립트의 절대 경로 기반 디렉토리 상수
JOOSSAM_SCRIPTS_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
JOOSSAM_ROOT="$(dirname "$JOOSSAM_SCRIPTS_DIR")"
# 색상 정의
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 로깅 함수들
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
log_debug() {
echo -e "${BLUE}[DEBUG]${NC} $1"
}
# 경로 계산 함수
get_joossam_root() {
echo "$JOOSSAM_ROOT"
}
get_mayne_root() {
local joossam_root="$(get_joossam_root)"
echo "$(dirname "$(dirname "$joossam_root")")"
}
# 확인 함수
confirm_action() {
local message="$1"
local default="${2:-N}"
if [[ "$default" == "Y" ]]; then
read -p "$message (Y/n): " -n 1 -r
echo
if [[ $REPLY =~ ^[Nn]$ ]]; then
return 1
fi
return 0
else
read -p "$message (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
return 0
fi
return 1
fi
}
# 필수 디렉토리 확인
check_required_dirs() {
local joossam_root="$(get_joossam_root)"
local dirs=("$@")
for dir in "${dirs[@]}"; do
if [ ! -d "$joossam_root/$dir" ]; then
log_error "필수 디렉토리가 없습니다: $dir"
exit 1
fi
done
}
# 필수 파일 확인
check_required_files() {
local joossam_root="$(get_joossam_root)"
local files=("$@")
for file in "${files[@]}"; do
if [ ! -f "$joossam_root/$file" ]; then
log_error "필수 파일이 없습니다: $file"
exit 1
fi
done
}
# Docker 관련 유틸리티
docker_cleanup_joossam() {
log_info "Joossam 관련 Docker 리소스 정리 중..."
# 컨테이너 중지 및 삭제
docker-compose -p joossam -f deploy/docker/docker-compose.yml down --remove-orphans 2>/dev/null || true
docker-compose -p joossam -f deploy/docker/docker-compose.dev.yml down --remove-orphans 2>/dev/null || true
local containers=$(docker ps -aq --filter "name=joossam" 2>/dev/null)
if [[ -n "$containers" ]]; then
echo "$containers" | xargs docker rm -f 2>/dev/null || true
fi
# 이미지 삭제
local images=$(docker images --filter "reference=joossam*" -q 2>/dev/null)
if [[ -n "$images" ]]; then
echo "$images" | xargs docker rmi -f 2>/dev/null || true
fi
images=$(docker images --filter "reference=*joossam*" -q 2>/dev/null)
if [[ -n "$images" ]]; then
echo "$images" | xargs docker rmi -f 2>/dev/null || true
fi
# 볼륨 삭제
local volumes=$(docker volume ls -q --filter "name=joossam" 2>/dev/null)
if [[ -n "$volumes" ]]; then
echo "$volumes" | xargs docker volume rm 2>/dev/null || true
fi
# 네트워크 삭제
docker network rm joossam-network-dev joossam-network-prod 2>/dev/null || true
# 시스템 정리
docker system prune -f
log_info "Docker 리소스 정리 완료"
}
# 환경 변수 로드
load_env_file() {
local joossam_root="$(get_joossam_root)"
local env_file="$joossam_root/.env"
if [ -f "$env_file" ]; then
log_info "환경 변수 파일 로드: $env_file"
source "$env_file"
return 0
else
log_warn "환경 변수 파일이 없습니다: $env_file"
return 1
fi
}
# 에러 처리
handle_error() {
local exit_code=$?
log_error "스크립트 실행 중 오류가 발생했습니다 (종료 코드: $exit_code)"
exit $exit_code
}
# 스크립트 시작 시 공통 설정
setup_script() {
trap handle_error ERR
cd "$(get_joossam_root)"
log_info "스크립트 시작: $(basename "${BASH_SOURCE[1]}")"
}
# 스크립트 종료 시 정리
cleanup_script() {
local exit_code=$?
if [ $exit_code -eq 0 ]; then
log_info "스크립트 완료: $(basename "${BASH_SOURCE[1]}")"
else
log_error "스크립트 실패: $(basename "${BASH_SOURCE[1]}") (종료 코드: $exit_code)"
fi
exit $exit_code
}
# 스크립트 종료 시 정리 함수 등록
trap cleanup_script EXIT

98
scripts/docker-build.sh Executable file
View File

@@ -0,0 +1,98 @@
#!/bin/bash
# Joossam Docker 빌드 및 실행 스크립트
# 공통 유틸리티 함수 로드
source "$(dirname "${BASH_SOURCE[0]}")/common.sh"
# 스크립트 설정
setup_script
log_info "🚀 Joossam Docker 빌드 및 실행 시작..."
# 필수 디렉토리 및 파일 확인
log_info "📁 폴더 구조 확인 중..."
check_required_dirs "services" "deploy/docker"
log_info "📄 필수 파일 확인 중..."
check_required_files "deploy/docker/docker-compose.yml" "deploy/docker/docker-compose.dev.yml" "deploy/docker/Dockerfile.prod" "deploy/docker/Dockerfile.dev"
log_info "✅ 폴더 구조 및 필수 파일 확인 완료!"
# 환경 선택
echo ""
log_info "🎯 실행할 환경을 선택하세요:"
echo "1) 개발 환경 (Development)"
echo "2) 프로덕션 환경 (Production)"
echo "3) 빌드만 (Build Only)"
read -p "선택 (1-3): " -n 1 -r
echo
case ${REPLY} in
1)
log_info "🔧 개발 환경 빌드 및 실행 중..."
cd deploy/docker
docker-compose -p joossam -f docker-compose.dev.yml build --no-cache
docker-compose -p joossam -f docker-compose.dev.yml up -d
JOOSSAM_ROOT=$(get_joossam_root)
cd "${JOOSSAM_ROOT}"
ENV_TYPE="development"
COMPOSE_FILE_PATH="deploy/docker/docker-compose.dev.yml"
;;
2)
log_info "🏭 프로덕션 환경 빌드 및 실행 중..."
cd deploy/docker
docker-compose -p joossam -f docker-compose.yml build --no-cache
docker-compose -p joossam -f docker-compose.yml up -d
JOOSSAM_ROOT=$(get_joossam_root)
cd "${JOOSSAM_ROOT}"
ENV_TYPE="production"
COMPOSE_FILE_PATH="deploy/docker/docker-compose.yml"
;;
3)
log_info "🔨 이미지 빌드만 실행 중..."
cd deploy/docker
log_info " - 개발 이미지 빌드 중..."
docker-compose -p joossam -f docker-compose.dev.yml build --no-cache
log_info " - 프로덕션 이미지 빌드 중..."
docker-compose -p joossam -f docker-compose.yml build --no-cache
JOOSSAM_ROOT=$(get_joossam_root)
cd "${JOOSSAM_ROOT}"
log_info "✅ 빌드 완료! 실행하려면 다시 이 스크립트를 실행하고 환경을 선택하세요."
exit 0
;;
*)
log_error "잘못된 선택입니다."
exit 1
;;
esac
# 서비스 상태 확인
echo ""
log_info "⏳ 서비스 시작 대기 중..."
sleep 10
echo ""
log_info "📊 서비스 상태 확인:"
docker-compose -p joossam -f "$COMPOSE_FILE_PATH" ps
echo ""
log_info "🔍 컨테이너 로그 확인:"
echo " - 애플리케이션 로그: docker-compose -p joossam -f $COMPOSE_FILE_PATH logs -f app"
echo ""
log_info "🌐 접속 URL:"
if [ "$ENV_TYPE" = "development" ]; then
echo " - API: http://localhost:8001"
echo " - API 문서: http://localhost:8001/docs"
else
echo " - API: http://localhost:8001"
echo " - API 문서: http://localhost:8001/docs"
fi
echo ""
log_info "✅ Docker 빌드 및 실행 완료!"
echo ""
log_info "📋 유용한 명령어:"
echo " - 서비스 중지: docker-compose -p joossam -f $COMPOSE_FILE_PATH down"
echo " - 로그 확인: docker-compose -p joossam -f $COMPOSE_FILE_PATH logs -f"
echo " - 서비스 재시작: docker-compose -p joossam -f $COMPOSE_FILE_PATH restart"

21
scripts/docker-cleanup.sh Executable file
View File

@@ -0,0 +1,21 @@
#!/bin/bash
# Joossam Docker Cleanup Script
# 공통 유틸리티 함수 로드
source "$(dirname "${BASH_SOURCE[0]}")/common.sh"
# 스크립트 설정
setup_script
log_info "🧹 Joossam Docker 리소스 정리 시작..."
# 확인 메시지
if ! confirm_action "모든 Joossam 관련 Docker 리소스를 정리하시겠습니까?"; then
log_info "정리를 취소했습니다."
exit 0
fi
# Docker 리소스 정리 실행
docker_cleanup_joossam
log_info "✅ Joossam Docker 리소스 정리 완료!"

620
services/fastapi/main.py Normal file
View File

@@ -0,0 +1,620 @@
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, List, Optional
import json
import tempfile
import os
import sys
import cv2
import numpy as np
app = FastAPI(
title="Joossam OMR Grading API",
description="OMR 채점을 위한 FastAPI 서비스",
version="1.0.0"
)
# CORS 설정 - Vercel에서 호스팅되는 프론트엔드 허용
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://joossameng.vercel.app",
"https://*.vercel.app",
"http://localhost:3000",
"http://localhost:3001",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 상수 및 설정 ---
TARGET_WIDTH = 2480
TARGET_HEIGHT = 3508
# --- 함수 정의 ---
def load_image_from_bytes(image_bytes: bytes):
"""바이트 데이터에서 이미지 로드"""
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise ValueError("이미지를 읽을 수 없습니다")
return img, img.shape[:2]
def resize_image_to_target(img, target_width=TARGET_WIDTH, target_height=TARGET_HEIGHT):
"""이미지를 타겟 크기로 리사이징 (비율 유지)"""
h, w = img.shape[:2]
if w == target_width and h == target_height:
return img, 1.0, 1.0
scale_x = target_width / w
scale_y = target_height / h
new_width = int(w * scale_x)
new_height = int(h * scale_y)
resized_img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
return resized_img, scale_x, scale_y
def gamma_correction(img, gamma=0.7):
"""감마 보정으로 밝기 곡선 조정"""
inv_gamma = 1.0 / gamma
table = np.array([((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
return cv2.LUT(img, table)
def unsharp_mask(img, kernel_size=(5, 5), sigma=1.0, amount=1.0):
"""언샤프 마스킹으로 경계 선명화"""
if len(img.shape) == 3:
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(img, kernel_size, sigma)
sharpened = cv2.addWeighted(img, 1.0 + amount, blurred, -amount, 0)
return sharpened
def deskew_image_with_barcodes(img):
"""바코드를 기준으로 이미지 기울기 보정"""
try:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY_INV)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
img_height, img_width = img.shape[:2]
top_area_threshold = img_height * 0.15
top_rectangles = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if (y < top_area_threshold and
w > 10 and h > 10 and
w < 300 and h < 300):
center_x = x + w // 2
center_y = y + h // 2
top_rectangles.append({'center': (center_x, center_y)})
if len(top_rectangles) < 15:
return img
top_rectangles.sort(key=lambda x: x['center'][1])
top_rectangles = top_rectangles[:23]
top_rectangles.sort(key=lambda x: x['center'][0])
if len(top_rectangles) >= 10:
left_points = top_rectangles[:5]
right_points = top_rectangles[-5:]
left_avg_x = np.mean([p['center'][0] for p in left_points])
left_avg_y = np.mean([p['center'][1] for p in left_points])
right_avg_x = np.mean([p['center'][0] for p in right_points])
right_avg_y = np.mean([p['center'][1] for p in right_points])
delta_y = right_avg_y - left_avg_y
delta_x = right_avg_x - left_avg_x
if delta_x == 0:
return img
angle_rad = np.arctan2(delta_y, delta_x)
angle_deg = np.degrees(angle_rad)
if abs(angle_deg) < 0.3:
return img
if abs(angle_deg) > 10:
return img
center = (img_width // 2, img_height // 2)
rotation_matrix = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
deskewed = cv2.warpAffine(
img, rotation_matrix, (img_width, img_height),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE
)
return deskewed
return img
except Exception:
return img
def preprocess_omr_image(img):
"""OMR 이미지 전처리 강화"""
denoised = cv2.GaussianBlur(img, (3, 3), 0)
gamma_corrected = gamma_correction(denoised, gamma=0.7)
if len(gamma_corrected.shape) == 3:
gray = cv2.cvtColor(gamma_corrected, cv2.COLOR_BGR2GRAY)
else:
gray = gamma_corrected
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
sharpened = unsharp_mask(enhanced, amount=0.8)
result = cv2.cvtColor(sharpened, cv2.COLOR_GRAY2BGR)
return result
def calculate_marking_density(img, x, y, width=30, height=60):
"""특정 좌표 주변 영역의 마킹 밀도 계산"""
h, w = img.shape[:2]
x1 = max(0, x - width//2)
y1 = max(0, y - height//2)
x2 = min(w, x + width//2)
y2 = min(h, y + height//2)
region = img[y1:y2, x1:x2]
if region.size == 0:
return 0.0
if len(region.shape) == 3:
region = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY)
avg_darkness = 255 - np.mean(region)
dark_pixels = np.sum(region < 180)
total_pixels = region.size
dark_ratio = dark_pixels / total_pixels
medium_dark_pixels = np.sum(region < 160)
medium_dark_ratio = medium_dark_pixels / total_pixels
very_dark_pixels = np.sum(region < 120)
very_dark_ratio = very_dark_pixels / total_pixels
density_score = (avg_darkness / 255.0) * 0.2 + dark_ratio * 0.2 + medium_dark_ratio * 0.4 + very_dark_ratio * 0.2
return density_score
upperValueSquare = 180
def find_top_black_rectangles(img):
"""상단 검은색 사각형들을 찾아서 좌표 반환"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, upperValueSquare, 255, cv2.THRESH_BINARY_INV)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
top_rectangles = []
img_height = img.shape[0]
top_area_threshold = img_height * 0.15
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if (y < top_area_threshold and
w > 10 and h > 10 and
w < 300 and h < 300):
center_x = x + w // 2
center_y = y + h // 2
top_rectangles.append({
'center': (center_x, center_y),
'bbox': (x, y, w, h),
'area': w * h
})
top_rectangles.sort(key=lambda x: x['center'][1])
selected_rectangles = top_rectangles[:23]
selected_rectangles.sort(key=lambda x: x['center'][0])
return selected_rectangles
def find_side_black_rectangles(img):
"""좌우측 검은색 사각형들을 찾아서 좌표 반환 (Y축 계산용)"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, upperValueSquare, 255, cv2.THRESH_BINARY_INV)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
img_height, img_width = img.shape[:2]
left_area_threshold = img_width * 0.15
right_area_start = img_width * 0.85
left_rectangles = []
right_rectangles = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
center_x = x + w // 2
center_y = y + h // 2
if (w > 20 and h > 20 and
w < 70 and h < 70):
if center_x < left_area_threshold:
left_rectangles.append({
'center': (center_x, center_y),
'bbox': (x, y, w, h),
'area': w * h
})
elif center_x > right_area_start:
right_rectangles.append({
'center': (center_x, center_y),
'bbox': (x, y, w, h),
'area': w * h
})
left_rectangles.sort(key=lambda x: x['center'][0])
left_selected = left_rectangles[:10]
left_selected.sort(key=lambda x: x['center'][1])
right_rectangles.sort(key=lambda x: x['center'][0])
right_selected = right_rectangles[-20:] if len(right_rectangles) >= 20 else right_rectangles
right_selected.sort(key=lambda x: x['center'][1])
return left_selected, right_selected
def define_phone_positions(img_width, img_height, top_rectangles, left_rectangles):
"""전화번호 전용 위치 계산 (1-8번 사각형, 1-10번 숫자 자리)"""
phone_positions = {}
for rect_index in range(8):
if rect_index < len(top_rectangles):
rect_center_x = top_rectangles[rect_index]['center'][0]
digit_position = rect_index + 1
if len(left_rectangles) >= 10:
phone_positions[digit_position] = {}
for digit in range(10):
if digit < len(left_rectangles):
y = left_rectangles[digit]['center'][1]
phone_positions[digit_position][str(digit)] = (rect_center_x, y)
return phone_positions
def define_answer_positions(img_width, img_height, top_rectangles, left_rectangles, right_rectangles):
"""답안 전용 위치 계산 (9-23번 사각형, 1-45번 문제)"""
positions = {}
for rect_index in range(8, 13):
if rect_index < len(top_rectangles):
rect_center_x = top_rectangles[rect_index]['center'][0]
choice_num = rect_index - 7
if len(right_rectangles) >= 20:
for q in range(1, 21):
if q not in positions:
positions[q] = {}
question_index = q - 1
if question_index < len(right_rectangles):
y = right_rectangles[question_index]['center'][1]
positions[q][str(choice_num)] = (rect_center_x, y)
for rect_index in range(13, 18):
if rect_index < len(top_rectangles):
rect_center_x = top_rectangles[rect_index]['center'][0]
choice_num = rect_index - 12
if len(right_rectangles) >= 20:
for q in range(21, 41):
if q not in positions:
positions[q] = {}
question_index = q - 21
if question_index < len(right_rectangles):
y = right_rectangles[question_index]['center'][1]
positions[q][str(choice_num)] = (rect_center_x, y)
for rect_index in range(18, 23):
if rect_index < len(top_rectangles):
rect_center_x = top_rectangles[rect_index]['center'][0]
choice_num = rect_index - 17
if len(right_rectangles) >= 5:
for q in range(41, 46):
if q not in positions:
positions[q] = {}
question_index = q - 41
if question_index < len(right_rectangles):
y = right_rectangles[question_index]['center'][1]
positions[q][str(choice_num)] = (rect_center_x, y)
return positions
def estimate_phone_number_with_density(img, phone_positions, min_density=0.17):
"""전화번호 추정"""
phone_selected = {}
for digit_pos, digit_choices in phone_positions.items():
if not digit_choices:
phone_selected[digit_pos] = "0"
continue
digit_densities = {}
for digit, coord in digit_choices.items():
x, y = coord
density = calculate_marking_density(img, x, y)
digit_densities[digit] = density
highest_digit, highest_density = max(digit_densities.items(), key=lambda x: x[1])
if highest_density >= min_density:
phone_selected[digit_pos] = highest_digit
else:
phone_selected[digit_pos] = "0"
return phone_selected
def estimate_selected_answers_with_density(img, answer_positions, min_density=0.2):
"""답안 추정"""
selected = {}
for q_num, choices in answer_positions.items():
if not choices:
selected[str(q_num)] = "무효"
continue
choice_densities = {}
for choice, coord in choices.items():
x, y = coord
density = calculate_marking_density(img, x, y)
choice_densities[choice] = density
highest_choice, highest_density = max(choice_densities.items(), key=lambda x: x[1])
if highest_density >= min_density:
selected[str(q_num)] = highest_choice
else:
selected[str(q_num)] = "무효"
return selected
def extract_phone_number(phone_selected):
"""전화번호 8자리 추출"""
phone_digits = []
for i in range(1, 9):
if i in phone_selected:
digit = phone_selected[i]
if digit and digit != "무효":
try:
digit_int = int(digit)
if 0 <= digit_int <= 9:
phone_digits.append(str(digit_int))
else:
phone_digits.append("0")
except ValueError:
phone_digits.append("0")
else:
phone_digits.append("0")
else:
phone_digits.append("0")
phone_number = "".join(phone_digits)
return phone_number
def calculate_total_score(selected_answers, correct_answers, question_scores):
"""총점 계산"""
total = 0
for q_num, correct_answer in correct_answers.items():
if q_num in selected_answers:
student_answer = selected_answers[q_num]
if student_answer == correct_answer:
score = question_scores.get(q_num, 0)
total += score
return total
def calculate_grade(total_score):
"""등급 계산"""
if total_score >= 90:
return 1
elif total_score >= 80:
return 2
elif total_score >= 70:
return 3
elif total_score >= 60:
return 4
elif total_score >= 50:
return 5
elif total_score >= 40:
return 6
elif total_score >= 30:
return 7
elif total_score >= 20:
return 8
else:
return 9
def create_results_array(selected_answers, correct_answers, question_scores, question_types):
"""결과 배열 생성"""
results = []
for q_num in sorted(correct_answers.keys(), key=lambda x: int(x)):
try:
q_num_int = int(q_num)
student_answer = selected_answers.get(q_num, "무효")
correct_answer = correct_answers[q_num]
score = question_scores.get(q_num, 0)
question_type = question_types.get(q_num, "기타")
earned_score = score if student_answer == correct_answer else 0
results.append({
"questionNumber": q_num_int,
"studentAnswer": str(student_answer),
"correctAnswer": correct_answer,
"score": score,
"earnedScore": earned_score,
"questionType": question_type
})
except (ValueError, TypeError):
continue
return results
def grade_omr_from_bytes(image_bytes: bytes, correct_answers: Dict, question_scores: Dict, question_types: Dict):
"""OMR 채점 메인 함수 (바이트 입력)"""
try:
img, (h, w) = load_image_from_bytes(image_bytes)
deskewed_img = deskew_image_with_barcodes(img)
expected_ratio = TARGET_WIDTH / TARGET_HEIGHT
actual_ratio = w / h
resized_img, scale_x, scale_y = resize_image_to_target(deskewed_img)
resized_h, resized_w = resized_img.shape[:2]
preprocessed_img = preprocess_omr_image(resized_img)
top_rectangles = find_top_black_rectangles(resized_img)
left_rectangles, right_rectangles = find_side_black_rectangles(resized_img)
phone_positions = define_phone_positions(resized_w, resized_h, top_rectangles, left_rectangles)
answer_positions = define_answer_positions(resized_w, resized_h, top_rectangles, left_rectangles, right_rectangles)
phone_selected = estimate_phone_number_with_density(preprocessed_img, phone_positions)
phone_number = extract_phone_number(phone_selected)
selected_answers = estimate_selected_answers_with_density(preprocessed_img, answer_positions)
correct_count = 0
for q_num, correct_answer in correct_answers.items():
if q_num in selected_answers:
student_answer = selected_answers[q_num]
if str(student_answer) == str(correct_answer):
correct_count += 1
total_score = calculate_total_score(selected_answers, correct_answers, question_scores)
grade = calculate_grade(total_score)
results = create_results_array(selected_answers, correct_answers, question_scores, question_types)
final_result = {
"totalScore": total_score,
"grade": grade,
"phoneNumber": phone_number,
"results": results,
"imageInfo": {
"originalSize": f"{w}x{h}",
"resizedSize": f"{resized_w}x{resized_h}",
"scaleFactors": {"x": scale_x, "y": scale_y},
"aspectRatio": {"expected": expected_ratio, "actual": actual_ratio}
}
}
return final_result
except Exception as e:
raise HTTPException(status_code=500, detail=f"OMR 채점 실패: {str(e)}")
# API 엔드포인트들
@app.get("/")
async def root():
"""헬스체크 엔드포인트"""
return {"status": "healthy", "message": "Joossam OMR Grading API is running"}
@app.get("/health")
async def health():
"""헬스체크 엔드포인트"""
return {"status": "healthy"}
class GradingRequest(BaseModel):
correct_answers: Dict[str, str]
question_scores: Dict[str, int]
question_types: Dict[str, str]
@app.post("/api/omr/grade")
async def grade_omr(
image: UploadFile = File(...),
correct_answers: str = Form(...),
question_scores: str = Form(...),
question_types: str = Form(...)
):
"""
OMR 채점 API
- image: OMR 이미지 파일
- correct_answers: 정답 JSON (예: {"1": "3", "2": "1", ...})
- question_scores: 문제별 점수 JSON (예: {"1": 2, "2": 2, ...})
- question_types: 문제 유형 JSON (예: {"1": "어휘", "2": "문법", ...})
"""
try:
# JSON 파싱
correct_answers_dict = json.loads(correct_answers)
question_scores_dict = json.loads(question_scores)
question_types_dict = json.loads(question_types)
# 이미지 읽기
image_bytes = await image.read()
# OMR 채점 실행
result = grade_omr_from_bytes(
image_bytes,
correct_answers_dict,
question_scores_dict,
question_types_dict
)
return result
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"JSON 파싱 오류: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"채점 오류: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,8 @@
fastapi==0.109.0
uvicorn[standard]==0.27.0
python-multipart==0.0.6
opencv-python-headless==4.8.1.78
numpy==1.24.3
Pillow==10.3.0
pydantic==2.5.3