diff --git a/apps/user-service/src/main/java/site/icebang/domain/WorkflowLogInsertExampleController.java b/apps/user-service/src/main/java/site/icebang/domain/WorkflowLogInsertExampleController.java deleted file mode 100644 index c3e225b7..00000000 --- a/apps/user-service/src/main/java/site/icebang/domain/WorkflowLogInsertExampleController.java +++ /dev/null @@ -1,34 +0,0 @@ -package site.icebang.domain; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import lombok.extern.slf4j.Slf4j; - -import site.icebang.common.dto.ApiResponse; - -@RestController -@RequestMapping("/v0/check-execution-log-insert") -@Slf4j -public class WorkflowLogInsertExampleController { - private static final Logger workflowLogger = LoggerFactory.getLogger("WORKFLOW_HISTORY"); - - @GetMapping("") - public ApiResponse test() { - log.info("@@"); - // MDC.put("traceId", UUID.randomUUID().toString()); - MDC.put("sourceId", "o1"); - MDC.put("executionType", "WORKFLOW"); - // MDC.put("sourceId", "test-controller"); - - // 이 로그는 DB에 저장됨 - workflowLogger.info("SLF4J로 찍은 워크플로우 로그"); - - MDC.clear(); - return ApiResponse.success("hi"); - } -} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/manager/ExecutionMdcManager.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/manager/ExecutionMdcManager.java new file mode 100644 index 00000000..e61faa75 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/manager/ExecutionMdcManager.java @@ -0,0 +1,30 @@ +package site.icebang.domain.workflow.manager; + +import org.slf4j.MDC; +import org.springframework.stereotype.Component; + +@Component +public class ExecutionMdcManager { + private static final String SOURCE_ID = "sourceId"; + private static final String EXECUTION_TYPE = "executionType"; + + public void setWorkflowContext(Long workflowId) { + MDC.put(SOURCE_ID, workflowId.toString()); + MDC.put(EXECUTION_TYPE, "WORKFLOW"); + } + + public void setJobContext(Long jobRunId) { + MDC.put(SOURCE_ID, jobRunId.toString()); + MDC.put(EXECUTION_TYPE, "JOB"); + } + + public void setTaskContext(Long taskRunId) { + MDC.put(SOURCE_ID, taskRunId.toString()); + MDC.put(EXECUTION_TYPE, "TASK"); + } + + public void clearExecutionContext() { + MDC.remove(SOURCE_ID); + MDC.remove(EXECUTION_TYPE); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 1d25b856..c6be9ac9 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -24,6 +26,7 @@ import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.execution.model.WorkflowRun; import site.icebang.domain.workflow.dto.TaskDto; +import site.icebang.domain.workflow.manager.ExecutionMdcManager; import site.icebang.domain.workflow.mapper.JobMapper; import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.Task; @@ -34,7 +37,7 @@ @Service @RequiredArgsConstructor public class WorkflowExecutionService { - + private static final Logger workflowLogger = LoggerFactory.getLogger("WORKFLOW_HISTORY"); private final JobMapper jobMapper; private final WorkflowRunMapper workflowRunMapper; private final JobRunMapper jobRunMapper; @@ -42,56 +45,73 @@ public class WorkflowExecutionService { private final Map taskRunners; private final ObjectMapper objectMapper; private final List bodyBuilders; + private final ExecutionMdcManager mdcManager; @Transactional @Async("traceExecutor") public void executeWorkflow(Long workflowId) { - log.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); - WorkflowRun workflowRun = WorkflowRun.start(workflowId); - workflowRunMapper.insert(workflowRun); - - Map workflowContext = new HashMap<>(); - List jobs = jobMapper.findJobsByWorkflowId(workflowId); - log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); - - for (Job job : jobs) { - JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); - jobRunMapper.insert(jobRun); - log.info( - "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - - boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); - - jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); - jobRunMapper.update(jobRun); - - if (!jobSucceeded) { - workflowRun.finish("FAILED"); - workflowRunMapper.update(workflowRun); - log.error("Job 실패로 인해 워크플로우 실행을 중단합니다: WorkflowRunId={}", workflowRun.getId()); - return; + mdcManager.setWorkflowContext(workflowId); + + try { + workflowLogger.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); + + WorkflowRun workflowRun = WorkflowRun.start(workflowId); + workflowRunMapper.insert(workflowRun); + + Map workflowContext = new HashMap<>(); + List jobs = jobMapper.findJobsByWorkflowId(workflowId); + workflowLogger.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); + + for (Job job : jobs) { + JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); + jobRunMapper.insert(jobRun); + + // Job 컨텍스트로 전환 + mdcManager.setJobContext(jobRun.getId()); + workflowLogger.info( + "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + + boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); + + jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); + jobRunMapper.update(jobRun); + + if (!jobSucceeded) { + workflowRun.finish("FAILED"); + workflowRunMapper.update(workflowRun); + workflowLogger.error("Job 실패로 인해 워크플로우 실행을 중단합니다: WorkflowRunId={}", workflowRun.getId()); + return; + } + + workflowLogger.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); + + // 다시 워크플로우 컨텍스트로 복원 + mdcManager.setWorkflowContext(workflowId); } - log.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); - } - workflowRun.finish("SUCCESS"); - workflowRunMapper.update(workflowRun); - log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); + workflowRun.finish("SUCCESS"); + workflowRunMapper.update(workflowRun); + workflowLogger.info( + "========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); + + } finally { + mdcManager.clearExecutionContext(); + } } private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { - // 📌 Mapper로부터 TaskDto 리스트를 조회합니다. List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - - // 📌 convertToTask 메소드를 사용하여 Task 모델 리스트로 변환합니다. List tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList()); - log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); + workflowLogger.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); for (Task task : tasks) { TaskRun taskRun = TaskRun.start(jobRun.getId(), task.getId()); taskRunMapper.insert(taskRun); - log.info("Task 실행 시작: TaskId={}, TaskRunId={}", task.getId(), taskRun.getId()); + + // Task 컨텍스트로 전환 + mdcManager.setTaskContext(taskRun.getId()); + workflowLogger.info("Task 실행 시작: TaskId={}, TaskRunId={}", task.getId(), taskRun.getId()); String runnerBeanName = task.getType().toLowerCase() + "TaskRunner"; TaskRunner runner = taskRunners.get(runnerBeanName); @@ -99,7 +119,8 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow if (runner == null) { taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + task.getType()); taskRunMapper.update(taskRun); - log.error("Task 실행 실패 (미지원 타입): TaskRunId={}, Type={}", taskRun.getId(), task.getType()); + workflowLogger.error("Task 실행 실패 (미지원 타입): Type={}", task.getType()); + mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원 return false; } @@ -115,21 +136,26 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow taskRunMapper.update(taskRun); if (result.isFailure()) { - log.error("Task 실행 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message()); + workflowLogger.error("Task 실행 실패: Message={}", result.message()); + mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원 return false; } try { JsonNode resultJson = objectMapper.readTree(result.message()); workflowContext.put(task.getName(), resultJson); - // TODO: task_io_data 테이블에 requestBody(INPUT)와 resultJson(OUTPUT) 저장 } catch (JsonProcessingException e) { - log.error("Task 결과 JSON 파싱 실패: TaskRunId={}", taskRun.getId(), e); + workflowLogger.error("Task 결과 JSON 파싱 실패"); taskRun.finish("FAILED", "결과 JSON 파싱 실패"); taskRunMapper.update(taskRun); + mdcManager.setJobContext(jobRun.getId()); // Job 컨텍스트로 복원 return false; } - log.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); + + workflowLogger.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); + + // 다시 Job 컨텍스트로 복원 + mdcManager.setJobContext(jobRun.getId()); } return true; }