diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java new file mode 100644 index 00000000..5a36afa3 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java @@ -0,0 +1,34 @@ +package site.icebang.domain.workflow.runner; + +import org.springframework.http.HttpMethod; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.execution.model.TaskRun; +import site.icebang.domain.workflow.model.Task; +import site.icebang.external.fastapi.adapter.FastApiAdapter; + +@Component("fastapiTaskRunner") +@RequiredArgsConstructor +public class FastApiTaskRunner implements TaskRunner { + + private final FastApiAdapter fastApiAdapter; + + @Override + public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { + JsonNode params = task.getParameters(); + String endpoint = params.path("endpoint").asText(); + HttpMethod method = HttpMethod.valueOf(params.path("method").asText("POST").toUpperCase()); + + String responseBody = fastApiAdapter.call(endpoint, method, requestBody.toString()); + + if (responseBody == null) { + return TaskExecutionResult.failure("FastApiAdapter 호출에 실패했습니다."); + } + return TaskExecutionResult.success(responseBody); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java deleted file mode 100644 index 861edd5a..00000000 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java +++ /dev/null @@ -1,67 +0,0 @@ -package site.icebang.domain.workflow.runner; - -import org.springframework.http.*; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestClientException; -import org.springframework.web.client.RestTemplate; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import site.icebang.domain.execution.model.TaskRun; -import site.icebang.domain.workflow.model.Task; - -@Slf4j -@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록 -@RequiredArgsConstructor -public class HttpTaskRunner implements TaskRunner { - - private final RestTemplate restTemplate; - - // private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입 - - @Override - public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { - JsonNode params = task.getParameters(); - if (params == null) { - return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다."); - } - - String url = params.path("url").asText(); - String method = params.path("method").asText("POST"); // 기본값 POST - - if (url.isEmpty()) { - return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다."); - } - - try { - // 1. HTTP 헤더 설정 - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - - // 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body) - HttpEntity requestEntity = new HttpEntity<>(requestBody.toString(), headers); - - log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString()); - - // 3. RestTemplate으로 API 호출 - ResponseEntity responseEntity = - restTemplate.exchange( - url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class); - - String responseBody = responseEntity.getBody(); - log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); - - // TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장 - - return TaskExecutionResult.success(responseBody); - - } catch (RestClientException e) { - log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); - return TaskExecutionResult.failure(e.getMessage()); - } - } -} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java new file mode 100644 index 00000000..3385a8ed --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java @@ -0,0 +1,36 @@ +package site.icebang.domain.workflow.runner.body; + +import java.util.Map; +import java.util.Set; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class EmptyBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final Set SUPPORTED_TASKS = + Set.of("상품 유사도 분석 태스크", "상품 정보 크롤링 태스크", "블로그 RAG 생성 태스크", "블로그 발행 태스크"); + + @Override + public boolean supports(String taskName) { + return SUPPORTED_TASKS.contains(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + // 이 Task들은 Body가 필요 없으므로 빈 객체를 반환합니다. + // TODO: 나중에 이 Task들이 이전 단계의 결과값을 필요로 하게 되면, + // 다른 빌더들처럼 workflowContext에서 데이터를 꺼내 Body를 구성하도록 수정합니다. + return objectMapper.createObjectNode(); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java new file mode 100644 index 00000000..f1bd5509 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java @@ -0,0 +1,32 @@ +package site.icebang.domain.workflow.runner.body; + +import java.util.Map; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class KeywordSearchBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "키워드 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + // 이 Task는 항상 정적인 Body를 가집니다. + return objectMapper.createObjectNode().put("tag", "naver"); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java new file mode 100644 index 00000000..610334cf --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java @@ -0,0 +1,46 @@ +package site.icebang.domain.workflow.runner.body; + +import java.util.Map; +import java.util.Optional; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class ProductMatchBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 매칭 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + // 상품 검색 결과 정보 가져오기 + Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) + .map(node -> node.path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java new file mode 100644 index 00000000..2dd3fcb6 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java @@ -0,0 +1,34 @@ +package site.icebang.domain.workflow.runner.body; + +import java.util.Map; + +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import lombok.RequiredArgsConstructor; + +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class ProductSearchBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 검색 태스크"; + private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); + String keyword = sourceResult != null ? sourceResult.path("keyword").asText("") : ""; + return objectMapper.createObjectNode().put("keyword", keyword); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java new file mode 100644 index 00000000..0d807d86 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java @@ -0,0 +1,28 @@ +package site.icebang.domain.workflow.runner.body; + +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import site.icebang.domain.workflow.model.Task; + +public interface TaskBodyBuilder { + + /** + * 이 빌더가 어떤 Task를 지원하는지 식별합니다. + * + * @param taskName Task의 고유한 이름 + * @return 지원하면 true, 아니면 false + */ + boolean supports(String taskName); + + /** + * 실제 API 요청에 사용될 Body를 생성합니다. + * + * @param task DB에 저장된 Task의 원본 정의 + * @param workflowContext 이전 Task들의 결과가 담긴 컨텍스트 + * @return 생성된 JSON Body + */ + ObjectNode build(Task task, Map workflowContext); +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 0f0e316d..d142b630 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -26,6 +27,7 @@ import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.runner.TaskRunner; +import site.icebang.domain.workflow.runner.body.TaskBodyBuilder; @Slf4j @Service @@ -37,7 +39,8 @@ public class WorkflowExecutionService { private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; private final Map taskRunners; - private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입 + private final ObjectMapper objectMapper; + private final List bodyBuilders; @Transactional public void executeWorkflow(Long workflowId) { @@ -45,9 +48,7 @@ public void executeWorkflow(Long workflowId) { WorkflowRun workflowRun = WorkflowRun.start(workflowId); workflowRunMapper.insert(workflowRun); - // 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성 Map workflowContext = new HashMap<>(); - List jobs = jobMapper.findJobsByWorkflowId(workflowId); log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); @@ -57,7 +58,6 @@ public void executeWorkflow(Long workflowId) { log.info( "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - // 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행 boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); @@ -77,16 +77,13 @@ public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); } - /** - * 특정 Job에 속한 Task들을 순차적으로 실행합니다. - * - * @param jobRun 실행중인 Job의 기록 객체 - * @return 모든 Task가 성공하면 true, 하나라도 실패하면 false - */ private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { - // TaskDto를 조회하고 Task로 변환 + // 📌 Mapper로부터 TaskDto 리스트를 조회합니다. List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - List tasks = taskDtos.stream().map(this::convertToTask).toList(); + + // 📌 convertToTask 메소드를 사용하여 Task 모델 리스트로 변환합니다. + List tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList()); + log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); for (Task task : tasks) { @@ -104,10 +101,13 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } - // 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성 - ObjectNode requestBody = prepareRequestBody(task, workflowContext); + ObjectNode requestBody = + bodyBuilders.stream() + .filter(builder -> builder.supports(task.getName())) + .findFirst() + .map(builder -> builder.build(task, workflowContext)) + .orElse(objectMapper.createObjectNode()); - // 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행 TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); taskRunMapper.update(taskRun); @@ -117,7 +117,6 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } - // 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장 try { JsonNode resultJson = objectMapper.readTree(result.message()); workflowContext.put(task.getName(), resultJson); @@ -133,82 +132,7 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return true; } - /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ - private ObjectNode prepareRequestBody(Task task, Map context) { - ObjectNode requestBody = objectMapper.createObjectNode(); - JsonNode params = task.getParameters(); - if (params == null) return requestBody; - - JsonNode mappingRules = params.get("input_mapping"); - JsonNode staticBody = params.get("body"); - - // 정적 body가 있으면 우선적으로 복사 - if (staticBody != null && staticBody.isObject()) { - requestBody.setAll((ObjectNode) staticBody); - } - - // 📌 디버깅용: 현재 컨텍스트 출력 - log.debug("=== 워크플로우 컨텍스트 확인 ==="); - for (Map.Entry entry : context.entrySet()) { - log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString()); - } - - // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 - if (mappingRules != null && mappingRules.isObject()) { - mappingRules - .fields() - .forEachRemaining( - entry -> { - String targetField = entry.getKey(); // 예: "product_url" - String sourcePath = - entry - .getValue() - .asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url" - - log.debug("=== input_mapping 처리 ==="); - log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath); - - String[] parts = sourcePath.split("\\.", 2); - if (parts.length == 2) { - String sourceTaskName = parts[0]; - String sourceFieldPath = parts[1]; - - log.debug( - "sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath); - - JsonNode sourceData = context.get(sourceTaskName); - log.debug("sourceData found: {}", sourceData != null); - - if (sourceData != null) { - log.debug("sourceData content: {}", sourceData.toString()); - - String jsonPath = "/" + sourceFieldPath.replace('.', '/'); - log.debug("jsonPath: {}", jsonPath); - - JsonNode valueToSet = sourceData.at(jsonPath); - log.debug( - "valueToSet found: {}, isMissing: {}", - valueToSet, - valueToSet.isMissingNode()); - - if (!valueToSet.isMissingNode()) { - log.debug("설정할 값: {}", valueToSet.toString()); - requestBody.set(targetField, valueToSet); - } else { - log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath); - } - } else { - log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName); - } - } - }); - } - - log.debug("최종 requestBody: {}", requestBody.toString()); - return requestBody; - } - - /** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */ + /** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */ private Task convertToTask(TaskDto taskDto) { Task task = new Task(); try { @@ -231,7 +155,6 @@ private Task convertToTask(TaskDto taskDto) { } catch (Exception e) { throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e); } - return task; } } diff --git a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java new file mode 100644 index 00000000..2a5bd001 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java @@ -0,0 +1,40 @@ +package site.icebang.external.fastapi.adapter; + +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import site.icebang.global.config.properties.FastApiProperties; + +@Slf4j +@Component +@RequiredArgsConstructor +public class FastApiAdapter { + + private final RestTemplate restTemplate; + private final FastApiProperties properties; + + // 📌 Task나 context에 대한 의존성이 완전히 사라짐 + public String call(String endpoint, HttpMethod method, String requestBody) { + String fullUrl = properties.getUrl() + endpoint; + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity requestEntity = new HttpEntity<>(requestBody, headers); + + try { + log.debug("FastAPI 요청: URL={}, Method={}, Body={}", fullUrl, method, requestBody); + ResponseEntity responseEntity = + restTemplate.exchange(fullUrl, method, requestEntity, String.class); + String responseBody = responseEntity.getBody(); + log.debug("FastAPI 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); + return responseBody; + } catch (RestClientException e) { + log.error("FastAPI 호출 실패: URL={}, Error={}", fullUrl, e.getMessage()); + return null; + } + } +} diff --git a/apps/user-service/src/main/resources/sql/01-schema-h2.sql b/apps/user-service/src/main/resources/sql/01-schema-h2.sql deleted file mode 100644 index 018ebb1d..00000000 --- a/apps/user-service/src/main/resources/sql/01-schema-h2.sql +++ /dev/null @@ -1,328 +0,0 @@ --- H2 데이터베이스 호환 스키마 (테스트용) --- MySQL의 unsigned, AFTER 절 등을 H2 호환으로 변경 - -CREATE TABLE `permission` ( - `id` int NOT NULL AUTO_INCREMENT, - `resource` varchar(100) NULL, - `description` varchar(255) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `is_active` boolean DEFAULT TRUE, - `updated_by` bigint NULL, - `created_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `organization` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(150) NULL, - `domain_name` varchar(100) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `role` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `organization_id` bigint NULL, - `name` varchar(100) NULL, - `description` varchar(500) NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `user` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(50) NULL, - `email` varchar(100) NULL, - `password` varchar(255) NULL, - `status` varchar(20) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `department` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `organization_id` bigint NOT NULL, - `name` varchar(100) NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `position` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `organization_id` bigint NOT NULL, - `title` varchar(100) NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `user_organization` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `user_id` bigint NOT NULL, - `organization_id` bigint NOT NULL, - `position_id` bigint NOT NULL, - `department_id` bigint NOT NULL, - `employee_number` varchar(50) NULL, - `status` varchar(20) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `role_permission` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `role_id` bigint NOT NULL, - `permission_id` int NOT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_role_permission` (`role_id`, `permission_id`) -); - -CREATE TABLE `user_role` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `role_id` bigint NOT NULL, - `user_organization_id` bigint NOT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_user_role` (`role_id`, `user_organization_id`) -); - --- 성능 최적화를 위한 인덱스 -CREATE INDEX `idx_user_email` ON `user` (`email`); -CREATE INDEX `idx_user_status` ON `user` (`status`); -CREATE INDEX `idx_user_organization_user` ON `user_organization` (`user_id`); -CREATE INDEX `idx_user_organization_org` ON `user_organization` (`organization_id`); -CREATE INDEX `idx_user_organization_status` ON `user_organization` (`status`); -CREATE INDEX `idx_role_org` ON `role` (`organization_id`); -CREATE INDEX `idx_permission_resource` ON `permission` (`resource`); -CREATE INDEX `idx_permission_active` ON `permission` (`is_active`); - -CREATE TABLE `workflow` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NOT NULL UNIQUE, - `description` text NULL, - `is_enabled` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `updated_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `schedule` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_id` bigint NOT NULL, - `cron_expression` varchar(50) NULL, - `parameters` json NULL, - `is_active` boolean DEFAULT TRUE, - `last_run_status` varchar(20) NULL, - `last_run_at` timestamp NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `updated_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `job` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NOT NULL UNIQUE, - `description` text NULL, - `is_enabled` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `updated_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `task` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NOT NULL UNIQUE, - `type` varchar(50) NULL, - `parameters` json NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `workflow_job` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_id` bigint NOT NULL, - `job_id` bigint NOT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_workflow_job` (`workflow_id`, `job_id`) -); - -CREATE TABLE `job_task` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `job_id` bigint NOT NULL, - `task_id` bigint NOT NULL, - `execution_order` int NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_job_task` (`job_id`, `task_id`) -); - -CREATE TABLE `execution_log` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `execution_type` varchar(20) NULL COMMENT 'task, schedule, job, workflow', - `source_id` bigint NULL COMMENT '모든 데이터에 대한 ID ex: job_id, schedule_id, task_id, ...', - `log_level` varchar(20) NULL, - `executed_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `log_message` text NULL, - `trace_id` char(36) NULL, - `config_snapshot` json NULL, - PRIMARY KEY (`id`), - INDEX `idx_source_id_type` (`source_id`, `execution_type`) -); - -CREATE TABLE `task_io_data` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `task_run_id` bigint NOT NULL, - `io_type` varchar(10) NOT NULL COMMENT 'INPUT, OUTPUT', - `name` varchar(100) NOT NULL COMMENT '파라미터/변수 이름', - `data_type` varchar(50) NOT NULL COMMENT 'string, number, json, file, etc', - `data_value` json NULL COMMENT '실제 데이터 값', - `data_size` bigint NULL COMMENT '데이터 크기 (bytes)', - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - INDEX `idx_task_io_task_run_id` (`task_run_id`), - INDEX `idx_task_io_type` (`io_type`), - INDEX `idx_task_io_name` (`name`) -); - -CREATE TABLE `config` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `target_type` varchar(50) NULL COMMENT 'user, job, workflow', - `target_id` bigint NULL, - `version` int NULL, - `json` json NULL, - `is_active` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_config_target` (`target_type`, `target_id`) -); - -CREATE TABLE `category` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NULL, - `description` text NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `user_config` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `user_id` bigint NOT NULL, - `type` varchar(50) NULL, - `name` varchar(100) NULL, - `json` json NULL, - `is_active` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - --- 인덱스 추가 (성능 최적화) -CREATE INDEX `idx_schedule_workflow` ON `schedule` (`workflow_id`); -CREATE INDEX `idx_job_enabled` ON `job` (`is_enabled`); -CREATE INDEX `idx_task_type` ON `task` (`type`); -CREATE INDEX `idx_workflow_enabled` ON `workflow` (`is_enabled`); -CREATE UNIQUE INDEX `uk_schedule_workflow` ON `schedule` (`workflow_id`); -CREATE UNIQUE INDEX `uk_job_name` ON `job` (`name`); -CREATE UNIQUE INDEX `uk_task_name` ON `task` (`name`); -CREATE UNIQUE INDEX `uk_workflow_name` ON `workflow` (`name`); -CREATE INDEX `idx_user_config_user` ON `user_config` (`user_id`); - --- 워크플로우 실행 테이블 -CREATE TABLE `workflow_run` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_id` bigint NOT NULL, - `trace_id` char(36) NOT NULL, - `run_number` varchar(20) NULL, - `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled', - `trigger_type` varchar(20) NULL COMMENT 'manual, schedule, push, pull_request', - `started_at` timestamp NULL, - `finished_at` timestamp NULL, - `created_by` bigint NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_workflow_run_trace` (`trace_id`), - INDEX `idx_workflow_run_status` (`status`), - INDEX `idx_workflow_run_workflow_id` (`workflow_id`), - INDEX `idx_workflow_run_created_at` (`created_at`) -); - --- Job 실행 테이블 -CREATE TABLE `job_run` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_run_id` bigint NOT NULL, - `job_id` bigint NOT NULL, - `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled, skipped', - `started_at` timestamp NULL, - `finished_at` timestamp NULL, - `execution_order` int NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - INDEX `idx_job_run_workflow_run_id` (`workflow_run_id`), - INDEX `idx_job_run_status` (`status`), - INDEX `idx_job_run_job_id` (`job_id`) -); - --- Task 실행 테이블 -CREATE TABLE `task_run` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `job_run_id` bigint NOT NULL, - `task_id` bigint NOT NULL, - `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled, skipped', - `started_at` timestamp NULL, - `finished_at` timestamp NULL, - `execution_order` int NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - INDEX `idx_task_run_job_run_id` (`job_run_id`), - INDEX `idx_task_run_status` (`status`), - INDEX `idx_task_run_task_id` (`task_id`) -); - --- v0.0.3 - H2 호환 버전 -DROP TABLE IF EXISTS `config`; - --- H2에서는 한 번에 하나씩 컬럼 추가 -ALTER TABLE `workflow_job` ADD COLUMN `execution_order` INT NULL; - -ALTER TABLE `schedule` ADD COLUMN `schedule_text` varchar(20) NULL; - -ALTER TABLE `workflow` ADD COLUMN `default_config` json NULL; - -ALTER TABLE `user` ADD COLUMN `joined_at` timestamp NULL; - -ALTER TABLE `department` ADD COLUMN `description` varchar(100) NULL; - --- v0.4 - H2 호환 버전 (AFTER 절 제거, unsigned 제거, 개별 ALTER 구문으로 분리) --- execution_log 테이블 컬럼 추가 (H2 호환) -ALTER TABLE `execution_log` ADD COLUMN `run_id` bigint NULL; -ALTER TABLE `execution_log` ADD COLUMN `status` varchar(20) NULL; -ALTER TABLE `execution_log` ADD COLUMN `duration_ms` int NULL; -ALTER TABLE `execution_log` ADD COLUMN `error_code` varchar(50) NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved1` varchar(100) NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved2` varchar(100) NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved3` int NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved4` json NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved5` timestamp NULL; - --- 기존 컬럼 수정 (H2 호환) -ALTER TABLE `execution_log` ALTER COLUMN `log_message` varchar(500) NOT NULL; -ALTER TABLE `execution_log` ALTER COLUMN `executed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP; - --- 기존 불필요한 컬럼 제거 -ALTER TABLE `execution_log` DROP COLUMN IF EXISTS `config_snapshot`; - --- 새로운 인덱스 추가 -CREATE INDEX `idx_run_id` ON `execution_log` (`run_id`); -CREATE INDEX `idx_log_level_status` ON `execution_log` (`log_level`, `status`); -CREATE INDEX `idx_error_code` ON `execution_log` (`error_code`); -CREATE INDEX `idx_duration` ON `execution_log` (`duration_ms`); - --- 기존 인덱스 수정 -DROP INDEX IF EXISTS `idx_source_id_type`; -CREATE INDEX `idx_execution_type_source` ON `execution_log` (`execution_type`, `source_id`); diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index 8e902745..9b6db4c0 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -1,89 +1,90 @@ --- 기존 워크플로우 관련 데이터 삭제 +-- =================================================================== +-- 워크플로우 관련 데이터 초기화 +-- =================================================================== +-- 참조 관계 역순으로 데이터 삭제 +DELETE FROM `schedule`; DELETE FROM `job_task`; DELETE FROM `workflow_job`; DELETE FROM `task`; DELETE FROM `job`; DELETE FROM `workflow`; -DELETE FROM `schedule`; + +-- =================================================================== +-- 워크플로우 정적 데이터 삽입 +-- =================================================================== -- 워크플로우 생성 (ID: 1) -INSERT INTO `workflow` (`id`, `name`, `description`) VALUES - (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스'); +INSERT INTO `workflow` (`id`, `name`, `description`, `created_by`) VALUES + (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스', 1) + ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); -- Job 생성 (ID: 1, 2) -INSERT INTO `job` (`id`, `name`, `description`) VALUES - (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업'), - (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업'); +INSERT INTO `job` (`id`, `name`, `description`, `created_by`) VALUES + (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업', 1), + (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업', 1) + ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); --- Task 생성 (ID: 1 ~ 7) +-- Task 생성 (ID: 1 ~ 7) - FastAPI Request Body 스키마 반영 INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES --- Job 1의 Task들 - (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/keywords/search', - 'method', 'POST', - 'body', JSON_OBJECT('tag', 'naver') - )), - (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/products/search', - 'method', 'POST', - 'input_mapping', JSON_OBJECT( - 'keyword', '키워드 검색 태스크.data.keyword' - ) - )), - (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/products/match', - 'method', 'POST', - 'input_mapping', JSON_OBJECT( - 'keyword', '키워드 검색 태스크.data.keyword', - 'search_results', '상품 검색 태스크.data.search_results' - ) - )), - (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/products/similarity', - 'method', 'POST', - 'input_mapping', JSON_OBJECT( - 'keyword', '키워드 검색 태스크.data.keyword', - 'matched_products', '상품 매칭 태스크.data.matched_products' - ) + (1, '키워드 검색 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/keywords/search', 'method', 'POST', + 'body', JSON_OBJECT('tag', 'String') -- { "tag": str } )), - (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/products/crawl', - 'method', 'POST', - 'input_mapping', JSON_OBJECT( - 'product_url', '상품 유사도 분석 태스크.data.selected_product.url' - ) - )), - - -- Job 2의 Task들 - (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/blogs/rag/create', - 'method', 'POST', - 'input_mapping', JSON_OBJECT( - 'keyword', '키워드 검색 태스크.data.keyword', - 'product_info', '상품 정보 크롤링 태스크.data.product_detail' - ))), - --- Task 7 설정 확인 필요 - (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/blogs/publish', - 'method', 'POST', - 'body', JSON_OBJECT('tag', 'tistory', 'blog_id', 'test', 'blog_pw', 'test'), - 'input_mapping', JSON_OBJECT( - 'post_title', '블로그 RAG 생성 태스크.data.title', - 'post_content', '블로그 RAG 생성 태스크.data.content', - 'post_tags', '블로그 RAG 생성 태스크.data.tags' - ))); + (2, '상품 검색 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/search', 'method', 'POST', + 'body', JSON_OBJECT('keyword', 'String') -- { "keyword": str } + )), + (3, '상품 매칭 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/match', 'method', 'POST', + 'body', JSON_OBJECT( -- { keyword: str, search_results: List } + 'keyword', 'String', + 'search_results', 'List' + ) + )), + (4, '상품 유사도 분석 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/similarity', 'method', 'POST', + 'body', JSON_OBJECT( -- { keyword: str, matched_products: List, search_results: List } + 'keyword', 'String', + 'matched_products', 'List', + 'search_results', 'List' + ) + )), + (5, '상품 정보 크롤링 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/crawl', 'method', 'POST', + 'body', JSON_OBJECT('product_url', 'String') -- { "product_url": str } + )), + -- RAG관련 request body는 추후에 결정될 예정 + (6, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), + (7, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/blogs/publish', 'method', 'POST', + 'body', JSON_OBJECT( -- { tag: str, blog_id: str, ... } + 'tag', 'String', + 'blog_id', 'String', + 'blog_pw', 'String', + 'blog_name', 'String', + 'post_title', 'String', + 'post_content', 'String', + 'post_tags', 'List' + ) + )) + ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parameters = VALUES(parameters), updated_at = NOW(); +-- =================================================================== +-- 워크플로우 구조 및 스케줄 데이터 삽입 +-- =================================================================== -- 워크플로우-Job 연결 INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES (1, 1, 1), - (1, 2, 2); + (1, 2, 2) + ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); -- Job-Task 연결 INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), - (2, 6, 1), (2, 7, 2); + (2, 6, 1), (2, 7, 2) + ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); --- 스케줄 설정 (매분 0초마다 실행) -INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`) VALUES - (1, '0 * * * * ?', TRUE); \ No newline at end of file +-- 스케줄 설정 (매일 오전 8시) +INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`, `created_by`) VALUES + (1, '0 0 8 * * ?', TRUE, 1) + ON DUPLICATE KEY UPDATE cron_expression = VALUES(cron_expression), is_active = VALUES(is_active), updated_at = NOW(); \ No newline at end of file diff --git a/docker/production-fastapi/docker-compose.yml b/docker/production-fastapi/docker-compose.yml index f0803385..8dd5dbb8 100644 --- a/docker/production-fastapi/docker-compose.yml +++ b/docker/production-fastapi/docker-compose.yml @@ -8,7 +8,7 @@ services: ports: - "80:8000" volumes: - - /app/models:/app/models + - ~/app/models:/app/models - logs_volume:/logs depends_on: - promtail