diff --git a/README.md b/README.md index d6b58260..3155bda2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,9 @@ 8. [주요 구성 요소 및 역할](#8-주요-구성-요소-및-역할) 9. [프로젝트 디렉토리 구조](#9-프로젝트-디렉토리-구조) 10. [환경 변수 관리 전략](#10-환경-변수-관리-전략) -11. [시연 영상](#11-시연-영상) +11. [실행 방법 (Getting Started)](#11-실행-방법-getting-started) +12. [배포 방법 (Deployment)](#12-배포-방법-deployment) +13. [시연 영상](#13-시연-영상) --- @@ -223,14 +225,201 @@ pre-processing-service/ ## 10. 환경 변수 관리 전략 -추후 작성 예정 -* **FastAPI (`pre-processing-service`)**: -* **Spring Boot (`user-service`)**: + +각 서비스는 환경별 설정 관리를 위해 다음과 같은 전략을 사용합니다. + + + +### 10.1. FastAPI (`pre-processing-service`) + + + +`.env` 파일을 통해 환경 변수를 로드합니다. `apps/pre-processing-service/app/core/config.py`의 `BaseSettings`를 기반으로 동작합니다. + + + +* **필수 변수**: + + * `DB_HOST`, `DB_PORT`, `DB_USER`, `DB_PASS`, `DB_NAME`: MariaDB 연결 정보 + + * `LOKI_HOST`, `LOKI_PORT`: Loki 로깅 서버 정보 + + * `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `S3_BUCKET_NAME`: AWS S3 저장소 설정 (OCR 및 이미지 업로드) + +* **선택 변수**: + + * `OPENAI_API_KEY`: AI 콘텐츠 생성 기능 사용 시 필요 + + * `MODE`: `dev` 또는 `prd` (기본값: `dev`) + + + +### 10.2. Spring Boot (`user-service`) + + + +Spring Profiles(`develop`, `production`)를 사용하여 환경을 분리합니다. + + + +* **Local (Develop)**: `application-develop.yml`을 사용하며, 로컬 Docker 인프라(localhost)에 맞춰져 있습니다. + +* **Production**: `application-production.yml`을 사용하며, 민감한 정보(DB 비밀번호, API 키 등)는 **배포 시점의 환경 변수** 또는 **Docker Compose의 environment** 설정을 통해 주입받습니다. + + --- -## 11. 시연 영상 + + +## 11. 실행 방법 (Getting Started) + + + +로컬 개발 환경에서 프로젝트를 실행하는 방법입니다. + + + +### 11.1. 사전 준비 사항 (Prerequisites) + + + +* **Java 21** (Amazon Corretto 21 권장) + +* **Python 3.11** (Poetry 패키지 매니저 설치 필요) + +* **Docker & Docker Compose** + + + +### 11.2. 1단계: 인프라 실행 (Database & Monitoring) + + + +프로젝트 실행에 필요한 DB(MariaDB)와 모니터링 도구(Loki, Promtail, Grafana)를 Docker로 실행합니다. + + + +```bash + +cd docker/local + +docker-compose up -d + +``` + + + +### 11.3. 2단계: Backend (FastAPI - Worker) 실행 + + + +Python 기반의 AI/전처리 워커 서비스를 실행합니다. + + + +```bash + +cd apps/pre-processing-service + + + +# 1. 의존성 설치 + +poetry install + + + +# 2. 서비스 실행 (Uvicorn) + +poetry run uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + +``` + + + +### 11.4. 3단계: Backend (Spring Boot - Orchestrator) 실행 + + + +Java 기반의 메인 오케스트레이터 서비스를 실행합니다. + + + +```bash + +cd apps/user-service + + + +# develop 프로파일로 실행 + +./gradlew bootRun --args='--spring.profiles.active=develop' + +``` + + + +* **API 문서 (Swagger)**: + + * Spring Boot: `http://localhost:8081/swagger-ui/index.html` (설정 필요 시) + + * FastAPI: `http://localhost:8000/docs` + + + +--- + + + +## 12. 배포 방법 (Deployment) + + + +본 프로젝트는 **GitHub Actions**를 통해 CI/CD 파이프라인이 구축되어 있습니다. + + + +### 12.1. CI/CD 파이프라인 + + + +1. **CI (Continuous Integration)**: + + * `main` 브랜치에 Push 또는 PR 시 자동으로 Java/Python 테스트 및 빌드가 수행됩니다. + +2. **CD (Continuous Deployment)**: + + * 릴리즈 태그 생성 시 Docker Image를 빌드하여 Docker Hub에 Push 합니다. + + * AWS EC2 인스턴스에 SSH로 접속하여 최신 이미지를 Pull 하고 `docker-compose`를 재실행합니다. + + + +### 12.2. 프로덕션 실행 설정 + + + +프로덕션 환경의 Docker 설정은 `docker/production` 디렉토리에 위치합니다. + + + +```bash + +cd docker/production + +docker-compose up -d + +``` + + + +--- + + + +## 13. 시연 영상 [https://www.youtube.com/watch?v=1vApNttVxVg](https://www.youtube.com/watch?v=1vApNttVxVg) [![Video Label](http://img.youtube.com/vi/1vApNttVxVg/0.jpg)](https://www.youtube.com/watch?v=1vApNttVxVg) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java index 66ef57aa..47c8c5e2 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/RequestContextDto.java @@ -19,4 +19,14 @@ public class RequestContextDto { public static RequestContextDto forScheduler(String traceId) { return new RequestContextDto(traceId, "scheduler", "quartz-scheduler"); } + + /** + * 시스템 복구 실행용 컨텍스트를 생성하는 정적 팩토리 메서드입니다. + * + * @param traceId 기존 실행에서 사용하던 추적 ID + * @return 복구용 RequestContext 객체 + */ + public static RequestContextDto forRecovery(String traceId) { + return new RequestContextDto(traceId, "system-recovery", "workflow-system-recovery"); + } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java index a3546069..55e6197d 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/JobRunMapper.java @@ -1,6 +1,7 @@ package site.icebang.domain.workflow.mapper; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import site.icebang.domain.workflow.model.JobRun; @@ -9,4 +10,7 @@ public interface JobRunMapper { void insert(JobRun jobRun); void update(JobRun jobRun); + + JobRun findSuccessfulJobByWorkflowRunId( + @Param("workflowRunId") Long workflowRunId, @Param("jobId") Long jobId); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java index 267e931a..5ac48e81 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/TaskRunMapper.java @@ -1,8 +1,7 @@ package site.icebang.domain.workflow.mapper; -import java.util.Optional; - import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import site.icebang.domain.workflow.model.TaskRun; @@ -12,5 +11,6 @@ public interface TaskRunMapper { void update(TaskRun taskRun); - Optional findLatestSuccessRunInJob(Long jobRunId, String taskName); + TaskRun findSuccessfulTaskRunByWorkflowRunId( + @Param("workflowRunId") Long workflowRunId, @Param("taskName") String taskName); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java index 64bbcbc6..c644c974 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowRunMapper.java @@ -1,6 +1,9 @@ package site.icebang.domain.workflow.mapper; +import java.util.List; + import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; import site.icebang.domain.workflow.model.WorkflowRun; @@ -9,4 +12,6 @@ public interface WorkflowRunMapper { void insert(WorkflowRun workflowRun); void update(WorkflowRun workflowRun); + + List findByStatus(@Param("status") String status); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java index bbdff181..39fb3aaf 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowContextService.java @@ -13,6 +13,7 @@ import site.icebang.domain.workflow.mapper.TaskIoDataMapper; import site.icebang.domain.workflow.mapper.TaskRunMapper; import site.icebang.domain.workflow.model.JobRun; +import site.icebang.domain.workflow.model.TaskIoData; @Slf4j @Service @@ -24,30 +25,32 @@ public class WorkflowContextService { private final ObjectMapper objectMapper; /** - * 특정 Job 실행 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. + * 전체 워크플로우 실행 범위(WorkflowRun) 내에서, 이전에 성공한 Task의 이름으로 결과(Output)를 조회합니다. Resume(이어하기) 시, 이전 Job이 + * 스킵되더라도 DB에서 전체 이력을 조회하여 데이터를 가져옵니다. * - * @param jobRun 현재 실행중인 JobRun + * @param jobRun 현재 실행중인 JobRun (내부의 workflowRunId를 사용하여 전체 범위 조회) * @param sourceTaskName 결과를 조회할 이전 Task의 이름 * @return 조회된 결과 데이터 (JsonNode) */ public Optional getPreviousTaskOutput(JobRun jobRun, String sourceTaskName) { + Long workflowRunId = jobRun.getWorkflowRunId(); try { - return taskRunMapper - .findLatestSuccessRunInJob(jobRun.getId(), sourceTaskName) + return Optional.ofNullable( + taskRunMapper.findSuccessfulTaskRunByWorkflowRunId(workflowRunId, sourceTaskName)) .flatMap(taskRun -> taskIoDataMapper.findOutputByTaskRunId(taskRun.getId())) - .map( - ioData -> { - try { - return objectMapper.readTree(ioData.getDataValue()); - } catch (Exception e) { - log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); - return null; - } - }); + .map(this::parseJson); } catch (Exception e) { - log.error( - "이전 Task 결과 조회 중 오류 발생: JobRunId={}, TaskName={}", jobRun.getId(), sourceTaskName, e); + log.error("워크플로우 데이터 조회 실패: WorkflowRunId={}, TaskName={}", workflowRunId, sourceTaskName, e); return Optional.empty(); } } + + private JsonNode parseJson(TaskIoData ioData) { + try { + return objectMapper.readTree(ioData.getDataValue()); + } catch (Exception e) { + log.error("TaskIoData JSON 파싱 실패: TaskIoDataId={}", ioData.getId(), e); + return null; + } + } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index b18b87fe..6089c1cc 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -85,6 +85,18 @@ public void executeWorkflow(Long workflowId, RequestContextDto context) { for (JobDto jobDto : jobDtos) { Job job = new Job(jobDto); mdcManager.setJobContext(job.getId()); + + // 📌 이미 성공한 Job인지 확인하여 중복 실행 방지 (Resume 기능) + JobRun existingSuccessfulJob = + jobRunMapper.findSuccessfulJobByWorkflowRunId(workflowRun.getId(), job.getId()); + if (existingSuccessfulJob != null) { + workflowLogger.info( + "---------- Job 스킵 (이미 성공함): JobId={}, PreviousJobRunId={} ----------", + job.getId(), + existingSuccessfulJob.getId()); + continue; + } + JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); workflowLogger.info( diff --git a/apps/user-service/src/main/java/site/icebang/global/config/WorkflowRecoveryInitializer.java b/apps/user-service/src/main/java/site/icebang/global/config/WorkflowRecoveryInitializer.java new file mode 100644 index 00000000..33cd20bd --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/global/config/WorkflowRecoveryInitializer.java @@ -0,0 +1,79 @@ +package site.icebang.global.config; + +import java.util.List; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import site.icebang.domain.workflow.dto.RequestContextDto; +import site.icebang.domain.workflow.mapper.WorkflowRunMapper; +import site.icebang.domain.workflow.model.WorkflowRun; +import site.icebang.domain.workflow.service.WorkflowExecutionService; + +/** + * 애플리케이션 시작 시, 비정상 종료된 워크플로우를 감지하고 '실제로 복구(재실행)'하는 클래스입니다. + * + *

서버가 시작될 때 DB에 'RUNNING' 상태로 남아있는 워크플로우는 시스템 장애(전원 차단, OOM 등)로 인해 중단된 작업입니다. 이 클래스는 해당 작업들을 + * 찾아내어 이전 기록을 정리하고, 작업을 자동으로 재시작합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class WorkflowRecoveryInitializer implements ApplicationRunner { + + private final WorkflowRunMapper workflowRunMapper; + private final WorkflowExecutionService workflowExecutionService; + + @Override + public void run(ApplicationArguments args) { + try { + List runningWorkflows = workflowRunMapper.findByStatus("RUNNING"); + + if (runningWorkflows.isEmpty()) { + return; + } + + log.warn("비정상 종료된 워크플로우 {}건 발견. 자동 복구(재실행) 프로세스를 시작합니다.", runningWorkflows.size()); + + int recoveredCount = 0; + for (WorkflowRun run : runningWorkflows) { + if (recoverAndRestart(run)) { + recoveredCount++; + } + } + log.info("총 {}건의 워크플로우가 성공적으로 복구(재실행 요청)되었습니다.", recoveredCount); + + } catch (Exception e) { + log.error("워크플로우 복구 프로세스 전체 실패", e); + } + } + + private boolean recoverAndRestart(WorkflowRun run) { + try { + // 1. 기존 실행 이력 마감 (Cleaning) + // 중단된 시점의 상태를 FAILED로 확정지어 데이터 정합성을 맞춥니다. + run.finish("FAILED"); + workflowRunMapper.update(run); + log.info("[복구 1단계] 기존 이력 정리 완료: RunID={}, WorkflowID={}", run.getId(), run.getWorkflowId()); + + // 2. 워크플로우 재실행 (Restarting) + // 기존 Trace ID를 승계하여 로그 추적성을 유지하며 서비스를 다시 시작합니다. + RequestContextDto recoveryContext = RequestContextDto.forRecovery(run.getTraceId()); + + workflowExecutionService.executeWorkflow(run.getWorkflowId(), recoveryContext); + log.info( + "[복구 2단계] 워크플로우 재실행 요청 완료: WorkflowID={}, TraceID={}", + run.getWorkflowId(), + run.getTraceId()); + + return true; + } catch (Exception e) { + log.error("워크플로우 개별 복구 실패: RunID={}", run.getId(), e); + return false; + } + } +} diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml index 2cc51d78..90f20565 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml @@ -25,4 +25,14 @@ WHERE id = #{id} + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml index 0bdf8cb0..2a3016b3 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/TaskRunMapper.xml @@ -23,17 +23,20 @@ UPDATE task_run SET status = #{status}, + message = #{message}, finished_at = #{finishedAt} WHERE id = #{id} - SELECT tr.* FROM task_run tr - JOIN task t ON tr.task_id = t.id - where t.name = #{taskName} + JOIN job_run jr ON tr.job_run_id = jr.id + JOIN task t ON tr.task_id = t.id + WHERE jr.workflow_run_id = #{workflowRunId} + AND t.name = #{taskName} AND tr.status = 'SUCCESS' ORDER BY tr.id DESC - LIMIT 1 + LIMIT 1 \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml index 8011fc6c..74883182 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml @@ -25,4 +25,8 @@ WHERE id = #{id} + + \ No newline at end of file