Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,39 @@
import site.icebang.domain.schedule.model.Schedule;
import site.icebang.domain.workflow.scheduler.WorkflowTriggerJob;

/**
* Spring Quartz 스케줄러의 Job과 Trigger를 동적으로 관리하는 서비스 클래스입니다.
*
* <p>이 서비스는 데이터베이스에 정의된 {@code Schedule} 정보를 바탕으로,
* Quartz 엔진에 실제 실행 가능한 작업을 등록, 수정, 삭제하는 역할을 담당합니다.
*
* <h2>주요 기능:</h2>
* <ul>
* <li>DB의 스케줄 정보를 바탕으로 Quartz Job 및 Trigger 생성 또는 업데이트</li>
* <li>기존에 등록된 Quartz 스케줄 삭제</li>
* </ul>
*
* @author [email protected]
* @since v0.1.0
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class QuartzScheduleService {

/** Quartz 스케줄러의 메인 인스턴스 */
private final Scheduler scheduler;

/**
* DB에 정의된 Schedule 객체를 기반으로 Quartz에 스케줄을 등록하거나 업데이트합니다.
*
* <p>지정된 워크플로우 ID에 해당하는 Job이 이미 존재할 경우, 기존 Job과 Trigger를 삭제하고
* 새로운 정보로 다시 생성하여 스케줄을 업데이트합니다. {@code JobDataMap}을 통해
* 실행될 Job에게 어떤 워크플로우를 실행해야 하는지 ID를 전달합니다.
*
* @param schedule Quartz에 등록할 스케줄 정보를 담은 도메인 모델 객체
* @since v0.1.0
*/
public void addOrUpdateSchedule(Schedule schedule) {
try {
JobKey jobKey = JobKey.jobKey("workflow-" + schedule.getWorkflowId());
Expand Down Expand Up @@ -40,6 +67,12 @@ public void addOrUpdateSchedule(Schedule schedule) {
}
}

/**
* 지정된 워크플로우 ID와 연결된 Quartz 스케줄을 삭제합니다.
*
* @param workflowId 삭제할 스케줄에 연결된 워크플로우의 ID
* @since v0.1.0
*/
public void deleteSchedule(Long workflowId) {
try {
JobKey jobKey = JobKey.jobKey("workflow-" + workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,53 @@
import site.icebang.domain.workflow.model.Task;
import site.icebang.domain.workflow.model.TaskRun;

/** 워크플로우의 개별 Task를 실행하는 모든 Runner가 구현해야 할 인터페이스 */
/**
* 워크플로우 내 개별 Task의 실행을 담당하는 모든 Runner 객체가 구현해야 할 공통 인터페이스입니다.
*
* <p>이 인터페이스는 전략 패턴(Strategy Pattern)의 '전략(Strategy)' 역할을 수행합니다. {@code WorkflowExecutionService}는
* 이 인터페이스에 의존하여, Task의 타입('FastAPI' 등)에 따라 적절한 Runner 구현체를 선택하고 실행 로직을 위임합니다.
*
* <h2>주요 구성 요소:</h2>
*
* <ul>
* <li><b>TaskExecutionResult</b>: 모든 Task 실행 결과가 따라야 할 표준 응답 형식을 정의하는 내부 Record
* <li><b>execute</b>: Task 실행을 위한 단일 추상 메소드
* </ul>
*
* @author [email protected]
* @since v0.1.0
*/
public interface TaskRunner {

/** Task 실행 결과를 담는 Record. status: SUCCESS 또는 FAILED message: 실행 결과 또는 에러 메시지 */
/**
* Task 실행 결과를 담는 불변(Immutable) 데이터 객체(Record)입니다.
*
* <p>실행의 성공/실패 여부(status)와 결과 메시지(message)를 표준화된 방식으로 반환합니다.
*
* @param status 실행 상태 ("SUCCESS" 또는 "FAILED")
* @param message 실행 결과 (성공 시 응답 Body, 실패 시 에러 메시지)
* @since v0.1.0
*/
record TaskExecutionResult(String status, String message) {
public static TaskExecutionResult success(String message) {
return new TaskExecutionResult("SUCCESS", message);
}

/**
* 실패 결과를 생성하는 정적 팩토리 메소드입니다.
*
* @param message 실패 원인 메시지
* @return status가 "FAILED"로 설정된 결과 객체
*/
public static TaskExecutionResult failure(String message) {
return new TaskExecutionResult("FAILED", message);
}

/**
* 해당 결과가 실패했는지 여부를 반환합니다.
*
* @return 실패했다면 true, 아니면 false
*/
public boolean isFailure() {
return "FAILED".equals(this.status);
}
Expand All @@ -26,10 +60,11 @@ public boolean isFailure() {
/**
* 특정 Task를 실행합니다.
*
* @param task 실행할 Task의 정적 정의
* @param taskRun 현재 실행에 대한 기록 객체
* @param requestBody 동적으로 생성된 요청 데이터
* @return Task 실행 결과
* @param task 실행할 Task의 정적 정의 (이름, 타입, 파라미터 등)
* @param taskRun 현재 실행에 대한 DB 기록 객체 (ID 추적 등에 사용)
* @param requestBody {@code TaskBodyBuilder}에 의해 동적으로 생성된 최종 요청 Body
* @return Task 실행 결과를 담은 {@code TaskExecutionResult} 객체
* @since v0.1.0
*/
TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,43 @@
import site.icebang.domain.workflow.runner.TaskRunner;
import site.icebang.external.fastapi.adapter.FastApiAdapter;

/**
* FastAPI 서버와 통신하는 Task를 실행하는 구체적인 Runner 구현체입니다.
*
* <p>이 클래스는 {@code TaskRunner} 인터페이스를 구현하며, Task의 타입이 'FastAPI'일 때 선택됩니다. 실제 HTTP 통신은 {@code
* FastApiAdapter}에 위임하고, 이 클래스는 워크플로우의 {@code Task} 객체를 {@code FastApiAdapter}가 이해할 수 있는 호출 형식으로
* 변환하는 **어댑터(Adapter)** 역할을 수행합니다.
*
* <h2>주요 기능:</h2>
*
* <ul>
* <li>Task 파라미터에서 endpoint와 method 정보 파싱
* <li>사전에 생성된 Request Body를 {@code FastApiAdapter}에 전달하여 실행 위임
* <li>어댑터의 실행 결과를 {@code TaskExecutionResult} 형식으로 변환하여 반환
* </ul>
*
* @author [email protected]
* @since v0.1.0
*/
@Component("fastapiTaskRunner")
@RequiredArgsConstructor
public class FastApiTaskRunner implements TaskRunner {

/** FastAPI 서버와의 통신을 전담하는 어댑터 */
private final FastApiAdapter fastApiAdapter;

/**
* FastAPI 타입의 Task를 실행합니다.
*
* <p>Task의 파라미터에서 엔드포인트와 HTTP 메소드를 추출하고, {@code WorkflowExecutionService}로부터 전달받은 동적 Request
* Body를 사용하여 {@code FastApiAdapter}를 호출합니다.
*
* @param task 실행할 Task의 정적 정의
* @param taskRun 현재 실행에 대한 기록 객체
* @param requestBody {@code TaskBodyBuilder}에 의해 동적으로 생성된 최종 요청 Body
* @return {@code FastApiAdapter}의 호출 결과를 담은 {@code TaskExecutionResult} 객체
* @since v0.1.0
*/
@Override
public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) {
JsonNode params = task.getParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,38 @@

import site.icebang.domain.workflow.service.WorkflowExecutionService;

/**
* Spring Quartz 스케줄러에 의해 실행되는 실제 작업(Job) 클래스입니다.
*
* <p>이 클래스는 Quartz의 스케줄링 세계와 애플리케이션의 비즈니스 로직을 연결하는 **브릿지(Bridge)** 역할을 수행합니다. Quartz의 Trigger가 정해진
* 시간에 발동하면, Quartz 엔진은 이 Job을 인스턴스화하고 {@code executeInternal} 메소드를 호출합니다.
*
* <h2>주요 기능:</h2>
*
* <ul>
* <li>스케줄 실행 시점에 {@code JobDataMap}에서 실행할 워크플로우 ID를 추출
* <li>추출된 ID를 사용하여 {@code WorkflowExecutionService}를 호출하여 실제 워크플로우 실행을 위임
* </ul>
*
* @author [email protected]
* @since v0.1.0
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class WorkflowTriggerJob extends QuartzJobBean {
private final WorkflowExecutionService workflowExecutionService;

/**
* Quartz 스케줄러에 의해 트리거가 발동될 때 호출되는 메인 실행 메소드입니다.
*
* <p>이 메소드는 실행 컨텍스트({@code JobExecutionContext})에서 {@code JobDataMap}을 통해 스케줄 등록 시점에 저장된
* 'workflowId'를 추출합니다. 그 후, 해당 ID를 파라미터로 하여 {@code WorkflowExecutionService}의 {@code
* executeWorkflow} 메소드를 호출하여 실제 비즈니스 로직의 실행을 시작합니다.
*
* @param context Quartz가 제공하는 현재 실행에 대한 런타임 정보. JobDetail과 Trigger 정보를 포함합니다.
* @since v0.1.0
*/
@Override
protected void executeInternal(JobExecutionContext context) {
Long workflowId = context.getJobDetail().getJobDataMap().getLong("workflowId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,50 @@
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import site.icebang.domain.workflow.model.Task;
import site.icebang.domain.workflow.model.TaskRun;
import site.icebang.domain.workflow.runner.TaskRunner;

@Slf4j
/**
* 워크플로우 내 개별 Task의 실행과 재시도 정책을 전담하는 서비스입니다.
*
* <p>이 클래스는 {@code WorkflowExecutionService}로부터 Task 실행 책임을 위임받습니다. Spring AOP의 '자기
* 호출(Self-invocation)' 문제를 회피하고, 재시도 로직을 비즈니스 흐름과 분리하기 위해 별도의 서비스로 구현되었습니다.
*
* <h2>주요 기능:</h2>
*
* <ul>
* <li>{@code @Retryable} 어노테이션을 통한 선언적 재시도 처리
* <li>{@code @Recover} 어노테이션을 이용한 최종 실패 시 복구 로직 수행
* <li>Task 타입에 맞는 적절한 {@code TaskRunner} 선택 및 실행
* </ul>
*
* @author [email protected]
* @since v0.1.0
*/
@Service
@RequiredArgsConstructor
public class TaskExecutionService { // 📌 클래스 이름 변경
public class TaskExecutionService {
/** 워크플로우 실행 이력 전용 로거 */
private static final Logger workflowLogger = LoggerFactory.getLogger("WORKFLOW_HISTORY");

private final Map<String, TaskRunner> taskRunners;

/** RestClientException 발생 시, 5초 간격으로 최대 3번 재시도합니다. */
/**
* 지정된 Task를 재시도 정책을 적용하여 실행합니다.
*
* <p>HTTP 통신 오류 등 {@code RestClientException} 발생 시, 5초의 고정된 간격({@code Backoff})으로 최대 3회({@code
* maxAttempts})까지 실행을 재시도합니다. 지원하지 않는 Task 타입의 경우 재시도 없이 즉시 {@code IllegalArgumentException}을
* 발생시킵니다.
*
* @param task 실행할 Task의 도메인 모델
* @param taskRun 현재 실행에 대한 기록 객체
* @param requestBody 동적으로 생성된 요청 Body
* @return Task 실행 결과
* @throws IllegalArgumentException 지원하지 않는 Task 타입일 경우
* @since v0.1.0
*/
@Retryable(
value = {RestClientException.class},
maxAttempts = 3,
Expand All @@ -45,7 +75,19 @@ public TaskRunner.TaskExecutionResult executeWithRetry(
return runner.execute(task, taskRun, requestBody);
}

/** 모든 재시도가 실패했을 때 마지막으로 호출될 복구 메소드입니다. */
/**
* {@code @Retryable} 재시도가 모두 실패했을 때 호출되는 복구 메소드입니다.
*
* <p>이 메소드는 {@code executeWithRetry} 메소드와 동일한 파라미터 시그니처를 가지며, 발생한 예외를 첫 번째 파라미터로 추가로 받습니다. 최종 실패
* 상태를 기록하고 실패 결과를 반환하는 역할을 합니다.
*
* @param e 재시도를 유발한 마지막 예외 객체
* @param task 실패한 Task의 도메인 모델
* @param taskRun 실패한 실행의 기록 객체
* @param requestBody 실패 당시 사용된 요청 Body
* @return 최종 실패를 나타내는 Task 실행 결과
* @since v0.1.0
*/
@Recover
public TaskRunner.TaskExecutionResult recover(
RestClientException e, Task task, TaskRun taskRun, ObjectNode requestBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,44 @@
import site.icebang.common.dto.PageParams;
import site.icebang.common.dto.PageResult;
import site.icebang.common.service.PageableService;
import site.icebang.domain.workflow.dto.*;
import site.icebang.domain.workflow.dto.JobRunDto;
import site.icebang.domain.workflow.dto.TaskRunDto;
import site.icebang.domain.workflow.dto.WorkflowHistoryDTO;
import site.icebang.domain.workflow.dto.WorkflowRunDetailResponse;
import site.icebang.domain.workflow.dto.WorkflowRunDto;
import site.icebang.domain.workflow.dto.WorkflowRunLogsResponse;
import site.icebang.domain.workflow.mapper.WorkflowHistoryMapper;

/**
* 워크플로우 실행 이력(History) 조회 관련 비즈니스 로직을 처리하는 서비스 클래스입니다.
*
* <p>이 서비스는 워크플로우 실행 목록의 페이징 처리, 특정 실행 건의 상세 정보 조회 등 읽기 전용(Read-Only) 기능에 집중합니다.
*
* <h2>주요 기능:</h2>
*
* <ul>
* <li>워크플로우 실행 이력 목록 페이징 조회
* <li>워크플로우 실행 상세 정보 조회 (Job 및 Task 실행 이력 포함)
* </ul>
*
* @author [email protected]
* @since v0.1.0
*/
@Service
@RequiredArgsConstructor
public class WorkflowHistoryService implements PageableService<WorkflowHistoryDTO> {
private final WorkflowHistoryMapper workflowHistoryMapper;

/**
* 워크플로우 런 조회
* 워크플로우 실행 이력 목록을 페이징 처리하여 조회합니다.
*
* @param pageParams pageParams
* @return PageResult
* <p>이 메소드는 {@code PageableService} 인터페이스를 구현하며, {@code PageResult} 유틸리티를 사용하여 전체 카운트 쿼리와 목록 조회
* 쿼리를 실행하고 페이징 결과를 생성합니다.
*
* @param pageParams 페이징 처리에 필요한 파라미터 (페이지 번호, 페이지 크기 등)
* @return 페이징 처리된 워크플로우 실행 이력 목록
* @see PageResult
* @since v0.1.0
*/
@Override
@Transactional(readOnly = true)
Expand All @@ -35,10 +60,14 @@ public PageResult<WorkflowHistoryDTO> getPagedResult(PageParams pageParams) {
}

/**
* 워크플로우 실행 상세 조회
* 특정 워크플로우 실행 건의 상세 정보를 조회합니다.
*
* <p>지정된 실행 ID(`runId`)에 해당하는 워크플로우 실행 정보와, 그에 속한 모든 Job 실행 정보, 그리고 각 Job에 속한 모든 Task 실행 정보를
* 계층적으로 조회하여 반환합니다.
*
* @param runId workflow_run.id
* @return WorkflowRunDetailResponse
* @param runId 조회할 워크플로우 실행의 ID (`workflow_run.id`)
* @return 워크플로우, Job, Task 실행 정보를 포함하는 상세 응답 객체
* @since v0.1.0
*/
@Transactional(readOnly = true)
public WorkflowRunDetailResponse getWorkflowRunDetail(Long runId) {
Expand Down Expand Up @@ -69,21 +98,23 @@ public WorkflowRunDetailResponse getWorkflowRunDetail(Long runId) {
}

/**
* 워크플로우 실행 로그 조회
* 특정 워크플로우 실행과 관련된 모든 로그를 조회합니다.
*
* @param runId workflow_run.id
* @return WorkflowRunLogsResponse
* @param runId 조회할 워크플로우 실행의 ID (`workflow_run.id`)
* @return 워크플로우 실행 로그 응답 객체
* @since v0.1.0
*/
public WorkflowRunLogsResponse getWorkflowRunLogs(Long runId) {
// TODO: 구현 예정
return null;
}

/**
* TraceId로 워크플로우 실행 조회
* Trace ID를 사용하여 특정 워크플로우 실행 정보를 조회합니다.
*
* @param traceId workflow_run.trace_id
* @return WorkflowRunDetailResponse
* @param traceId 조회할 워크플로우 실행의 Trace ID (`workflow_run.trace_id`)
* @return 워크플로우 실행 상세 응답 객체
* @since v0.1.0
*/
public WorkflowRunDetailResponse getWorkflowRunByTraceId(String traceId) {
// TODO: 구현 예정
Expand Down
Loading
Loading