From 6b2fabd44682fe6a6965a191744d23536474f5a7 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 18 Sep 2025 14:16:53 +0900 Subject: [PATCH 01/14] =?UTF-8?q?refactor:=20workflow=20=EA=B4=80=EB=A0=A8?= =?UTF-8?q?=20insert=20SQL=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/resources/sql/03-insert-workflow.sql | 168 ++++++------------ 1 file changed, 53 insertions(+), 115 deletions(-) diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index dd2ddb15..9aff4b0d 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -1,120 +1,58 @@ --- 워크플로우 관련 데이터 삽입 - --- 카테고리 삽입 -INSERT INTO `category` (`name`, `description`) VALUES - ('마케팅', '마케팅 관련 자동화 워크플로우'), - ('콘텐츠', '콘텐츠 생성 및 관리'), - ('데이터 수집', '웹 크롤링 및 데이터 수집 관련'); - --- 워크플로우 생성 -INSERT INTO `workflow` (`name`, `description`, `is_enabled`, `created_by`) VALUES - ('트렌드_블로그_자동화', '트렌드 검색부터 블로그 글 작성까지 전체 자동화 프로세스', TRUE, 1); - --- Job 생성 -INSERT INTO `job` (`name`, `description`, `is_enabled`, `created_by`) VALUES - ('트렌드_검색_작업', '최신 트렌드 키워드 검색 및 분석', TRUE, 1), - ('싸다구_크롤링_작업', '싸다구 사이트에서 관련 상품 정보 크롤링', TRUE, 1), - ('블로그_글_작성_작업', '수집된 데이터를 바탕으로 블로그 글 자동 생성', TRUE, 1); - --- Task 생성 -INSERT INTO `task` (`name`, `type`, `parameters`) VALUES --- 트렌드 검색 관련 태스크 -('구글_트렌드_검색', 'API_CALL', JSON_OBJECT( - 'api_endpoint', 'https://trends.googleapis.com/trends/api', - 'search_region', 'KR', - 'timeframe', 'now 7-d', - 'category', '0' - )), -('네이버_트렌드_검색', 'API_CALL', JSON_OBJECT( - 'api_endpoint', 'https://datalab.naver.com/keyword/trendSearch.naver', - 'period', 'week', - 'device', 'pc' - )), -('키워드_분석_및_필터링', 'DATA_PROCESSING', JSON_OBJECT( - 'min_score', 50, - 'max_keywords', 10, - 'filter_rules', JSON_ARRAY('adult_content', 'spam_keywords') - )), - --- 싸다구 크롤링 관련 태스크 -('싸다구_상품_검색', 'WEB_SCRAPING', JSON_OBJECT( - 'base_url', 'https://www.ssg.com', - 'search_path', '/search.ssg', - 'max_pages', 3, - 'delay_ms', 2000 - )), -('상품_정보_추출', 'DATA_EXTRACTION', JSON_OBJECT( - 'extract_fields', JSON_ARRAY('title', 'price', 'rating', 'review_count', 'image_url'), - 'data_validation', true - )), -('가격_비교_분석', 'DATA_ANALYSIS', JSON_OBJECT( - 'comparison_sites', JSON_ARRAY('쿠팡', '11번가', '옥션'), - 'price_threshold', 0.1 - )), - --- 블로그 글 작성 관련 태스크 -('블로그_템플릿_선택', 'TEMPLATE_PROCESSING', JSON_OBJECT( - 'template_type', 'product_review', - 'style', 'conversational', - 'target_length', 1500 - )), -('AI_콘텐츠_생성', 'AI_GENERATION', JSON_OBJECT( - 'model', 'gpt-4', - 'temperature', 0.7, - 'max_tokens', 2000, - 'prompt_template', '트렌드 키워드와 상품 정보를 바탕으로 자연스러운 블로그 글을 작성해주세요.' - )), -('콘텐츠_검수_및_최적화', 'CONTENT_REVIEW', JSON_OBJECT( - 'seo_optimization', true, - 'readability_check', true, - 'plagiarism_check', true - )), -('블로그_플랫폼_발행', 'PUBLISHING', JSON_OBJECT( - 'platforms', JSON_ARRAY('네이버 블로그', '티스토리', '브런치'), - 'schedule_publish', false, - 'auto_tags', true - )); +-- 기존 워크플로우 관련 데이터 삭제 +DELETE FROM `job_task`; +DELETE FROM `workflow_job`; +DELETE FROM `task`; +DELETE FROM `job`; +DELETE FROM `workflow`; +DELETE FROM `schedule`; + +-- 워크플로우 생성 (ID: 1) +INSERT INTO `workflow` (`id`, `name`, `description`) VALUES + (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스'); + +-- Job 생성 (ID: 1, 2) +INSERT INTO `job` (`id`, `name`, `description`) VALUES + (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업'), + (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업'); + +-- Task 생성 (ID: 1 ~ 7) +INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES + -- Job 1의 Task들 + (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://pre-processing-service:8000/keywords/search', + 'method', 'POST', + 'body', JSON_OBJECT('tag', 'fashion') -- 초기 입력값은 정적으로 정의 + )), + (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://pre-processing-service:8000/products/search', + 'method', 'POST', + 'input_mapping', JSON_OBJECT('keyword', '키워드 검색 태스크.keyword') -- "키워드 검색 태스크"의 결과에서 "keyword" 필드를 가져와 매핑 + )), + (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( + 'url', 'http://pre-processing-service:8000/products/match', + 'method', 'POST', + 'input_mapping', JSON_OBJECT( + 'keyword', '키워드 검색 태스크.keyword', + 'search_results', '상품 검색 태스크.search_results' + ) + )), + (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/products/similarity', 'method', 'POST')), + (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/products/crawl', 'method', 'POST')), + + -- Job 2의 Task들 + (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/blogs/rag/create', 'method', 'POST')), + (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/blogs/publish', 'method', 'POST')); -- 워크플로우-Job 연결 -INSERT INTO `workflow_job` (`workflow_id`, `job_id`) VALUES - (1, 1), -- 트렌드_블로그_자동화 + 트렌드_검색_작업 - (1, 2), -- 트렌드_블로그_자동화 + 싸다구_크롤링_작업 - (1, 3); -- 트렌드_블로그_자동화 + 블로그_글_작성_작업 +INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES + (1, 1, 1), + (1, 2, 2); --- Job-Task 연결 (실행 순서 포함) --- 트렌드 검색 작업의 태스크들 +-- Job-Task 연결 INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - (1, 1, 1), -- 구글_트렌드_검색 - (1, 2, 2), -- 네이버_트렌드_검색 - (1, 3, 3); -- 키워드_분석_및_필터링 - --- 싸다구 크롤링 작업의 태스크들 -INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - (2, 4, 1), -- 싸다구_상품_검색 - (2, 5, 2), -- 상품_정보_추출 - (2, 6, 3); -- 가격_비교_분석 - --- 블로그 글 작성 작업의 태스크들 -INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES - (3, 7, 1), -- 블로그_템플릿_선택 - (3, 8, 2), -- AI_콘텐츠_생성 - (3, 9, 3), -- 콘텐츠_검수_및_최적화 - (3, 10, 4); -- 블로그_플랫폼_발행 - --- 스케줄 설정 (매일 오전 8시 실행) -INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `parameters`, `is_active`, `created_by`) VALUES - (1, '0 0 8 * * *', JSON_OBJECT( - 'timezone', 'Asia/Seoul', - 'retry_count', 3, - 'timeout_minutes', 60, - 'notification_email', 'admin@icebang.site' - ), TRUE, 1); + (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), + (2, 6, 1), (2, 7, 2); --- 사용자별 설정 (관리자용) -INSERT INTO `user_config` (`user_id`, `type`, `name`, `json`, `is_active`) VALUES - (1, 'workflow_preference', '트렌드_블로그_설정', JSON_OBJECT( - 'preferred_keywords', JSON_ARRAY('테크', 'IT', '트렌드', '리뷰'), - 'blog_style', 'casual', - 'auto_publish', false, - 'notification_enabled', true - ), TRUE); \ No newline at end of file +-- 스케줄 설정 (매일 오전 8시) +INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`) VALUES + (1, '0 0 8 * * ?', TRUE); \ No newline at end of file From 03fc6617b508a4290677960364bc429f686f64de Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 18 Sep 2025 14:17:21 +0900 Subject: [PATCH 02/14] =?UTF-8?q?refactor:=20Task=20=EA=B4=80=EB=A0=A8=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/runner/HttpTaskRunner.java | 52 +++++++---- .../domain/workflow/runner/TaskRunner.java | 17 +++- .../service/WorkflowExecutionService.java | 92 +++++++++++++++---- 3 files changed, 124 insertions(+), 37 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java index 9f497b97..861edd5a 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java @@ -6,6 +6,7 @@ import org.springframework.web.client.RestTemplate; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -14,35 +15,52 @@ import site.icebang.domain.workflow.model.Task; @Slf4j -@Component("httpTaskRunner") +@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록 @RequiredArgsConstructor public class HttpTaskRunner implements TaskRunner { + private final RestTemplate restTemplate; + // private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입 + @Override - public TaskExecutionResult execute(Task task, TaskRun taskRun) { + public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { JsonNode params = task.getParameters(); - String url = params.get("url").asText(); - String method = params.get("method").asText(); - JsonNode body = params.get("body"); + if (params == null) { + return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다."); + } + + String url = params.path("url").asText(); + String method = params.path("method").asText("POST"); // 기본값 POST + + if (url.isEmpty()) { + return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다."); + } try { - HttpEntity requestEntity = - new HttpEntity<>( - body.toString(), - new HttpHeaders() { - { - setContentType(MediaType.APPLICATION_JSON); - } - }); - - ResponseEntity response = + // 1. HTTP 헤더 설정 + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + // 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body) + HttpEntity requestEntity = new HttpEntity<>(requestBody.toString(), headers); + + log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString()); + + // 3. RestTemplate으로 API 호출 + ResponseEntity responseEntity = restTemplate.exchange( url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class); - return TaskExecutionResult.success(response.getBody()); + String responseBody = responseEntity.getBody(); + log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); + + // TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장 + + return TaskExecutionResult.success(responseBody); + } catch (RestClientException e) { - log.error("HTTP Task 실행 실패: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); + log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); return TaskExecutionResult.failure(e.getMessage()); } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java index a2b820bb..9c6ab224 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/TaskRunner.java @@ -1,9 +1,14 @@ package site.icebang.domain.workflow.runner; +import com.fasterxml.jackson.databind.node.ObjectNode; + import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.workflow.model.Task; +/** 워크플로우의 개별 Task를 실행하는 모든 Runner가 구현해야 할 인터페이스 */ public interface TaskRunner { + + /** Task 실행 결과를 담는 Record. status: SUCCESS 또는 FAILED message: 실행 결과 또는 에러 메시지 */ record TaskExecutionResult(String status, String message) { public static TaskExecutionResult success(String message) { return new TaskExecutionResult("SUCCESS", message); @@ -14,9 +19,17 @@ public static TaskExecutionResult failure(String message) { } public boolean isFailure() { - return "FAILED".equals(status); + return "FAILED".equals(this.status); } } - TaskExecutionResult execute(Task task, TaskRun taskRun); + /** + * 특정 Task를 실행합니다. + * + * @param task 실행할 Task의 정적 정의 + * @param taskRun 현재 실행에 대한 기록 객체 + * @param requestBody 동적으로 생성된 요청 데이터 + * @return Task 실행 결과 + */ + TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 086b00de..3d0ca0c5 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,11 +1,17 @@ package site.icebang.domain.workflow.service; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -30,18 +36,17 @@ public class WorkflowExecutionService { private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; private final Map taskRunners; + private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입 - /** - * 워크플로우 실행의 시작점. 전체 과정은 하나의 트랜잭션으로 묶입니다. - * - * @param workflowId 실행할 워크플로우의 ID - */ @Transactional public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 시작: WorkflowId={} ==========", workflowId); WorkflowRun workflowRun = WorkflowRun.start(workflowId); workflowRunMapper.insert(workflowRun); + // 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성 + Map workflowContext = new HashMap<>(); + List jobs = jobMapper.findJobsByWorkflowId(workflowId); log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); @@ -51,7 +56,8 @@ public void executeWorkflow(Long workflowId) { log.info( "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - boolean jobSucceeded = executeTasksForJob(jobRun); + // 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행 + boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); jobRunMapper.update(jobRun); @@ -60,7 +66,7 @@ public void executeWorkflow(Long workflowId) { workflowRun.finish("FAILED"); workflowRunMapper.update(workflowRun); log.error("Job 실패로 인해 워크플로우 실행을 중단합니다: WorkflowRunId={}", workflowRun.getId()); - return; // Job이 실패하면 전체 워크플로우를 중단 + return; } log.info("---------- Job 실행 성공: JobRunId={} ----------", jobRun.getId()); } @@ -70,13 +76,7 @@ public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); } - /** - * 특정 Job에 속한 Task들을 순차적으로 실행합니다. - * - * @param jobRun 실행중인 Job의 기록 객체 - * @return 모든 Task가 성공하면 true, 하나라도 실패하면 false - */ - private boolean executeTasksForJob(JobRun jobRun) { + private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { List tasks = jobMapper.findTasksByJobId(jobRun.getJobId()); log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); @@ -92,20 +92,76 @@ private boolean executeTasksForJob(JobRun jobRun) { taskRun.finish("FAILED", "지원하지 않는 Task 타입: " + task.getType()); taskRunMapper.update(taskRun); log.error("Task 실행 실패 (미지원 타입): TaskRunId={}, Type={}", taskRun.getId(), task.getType()); - return false; // 실행할 Runner가 없으므로 실패 + return false; } - TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun); + // 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성 + ObjectNode requestBody = prepareRequestBody(task, workflowContext); + + // 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행 + TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); taskRunMapper.update(taskRun); if (result.isFailure()) { log.error("Task 실행 실패: TaskRunId={}, Message={}", taskRun.getId(), result.message()); - return false; // Task가 실패하면 즉시 중단하고 실패 반환 + return false; + } + + // 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장 + try { + JsonNode resultJson = objectMapper.readTree(result.message()); + workflowContext.put(task.getName(), resultJson); + // TODO: task_io_data 테이블에 requestBody(INPUT)와 resultJson(OUTPUT) 저장 + } catch (JsonProcessingException e) { + log.error("Task 결과 JSON 파싱 실패: TaskRunId={}", taskRun.getId(), e); + taskRun.finish("FAILED", "결과 JSON 파싱 실패"); + taskRunMapper.update(taskRun); + return false; } log.info("Task 실행 성공: TaskRunId={}", taskRun.getId()); } + return true; + } + + /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ + private ObjectNode prepareRequestBody(Task task, Map context) { + ObjectNode requestBody = objectMapper.createObjectNode(); + JsonNode params = task.getParameters(); + if (params == null) return requestBody; + + JsonNode mappingRules = params.get("input_mapping"); + JsonNode staticBody = params.get("body"); - return true; // 모든 Task가 성공적으로 완료됨 + // 정적 body가 있으면 우선적으로 복사 + if (staticBody != null && staticBody.isObject()) { + requestBody.setAll((ObjectNode) staticBody); + } + + // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 + if (mappingRules != null && mappingRules.isObject()) { + mappingRules + .fields() + .forEachRemaining( + entry -> { + String targetField = entry.getKey(); // 예: "keyword" + String sourcePath = entry.getValue().asText(); // 예: "키워드 검색 태스크.keyword" + + String[] parts = sourcePath.split("\\.", 2); + if (parts.length == 2) { + String sourceTaskName = parts[0]; + String sourceFieldPath = parts[1]; + + JsonNode sourceData = context.get(sourceTaskName); + if (sourceData != null) { + JsonNode valueToSet = sourceData.at("/" + sourceFieldPath.replace('.', '/')); + if (!valueToSet.isMissingNode()) { + requestBody.set(targetField, valueToSet); + } + } + } + }); + } + return requestBody; } } From 1f31d03a738d7ca8c79a7b6594b8fca5077940cd Mon Sep 17 00:00:00 2001 From: jihukimme Date: Thu, 18 Sep 2025 17:25:01 +0900 Subject: [PATCH 03/14] =?UTF-8?q?refactor:=20fast=20api=20url=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=20=EB=B0=8F=20workflow=20=EA=B4=80=EB=A0=A8=20insert?= =?UTF-8?q?=20SQL=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.yml | 2 +- .../main/resources/sql/03-insert-workflow.sql | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/user-service/src/main/resources/application.yml b/apps/user-service/src/main/resources/application.yml index 706eceea..7fd5893d 100644 --- a/apps/user-service/src/main/resources/application.yml +++ b/apps/user-service/src/main/resources/application.yml @@ -16,5 +16,5 @@ mybatis: # 외부 API 연동을 위한 설정 섹션 api: fastapi: - url: http://pre-processing-service:8000 # FastAPI 서버의 기본 URL + url: http://127.0.0.1:8000 # FastAPI 서버의 기본 URL timeout: 10000 # API 요청 타임아웃 (밀리초 단위) \ No newline at end of file diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index 9aff4b0d..bff80749 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -19,29 +19,29 @@ INSERT INTO `job` (`id`, `name`, `description`) VALUES INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES -- Job 1의 Task들 (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://pre-processing-service:8000/keywords/search', + 'url', 'http://127.0.0.1:8000/keywords/search', 'method', 'POST', - 'body', JSON_OBJECT('tag', 'fashion') -- 초기 입력값은 정적으로 정의 + 'body', JSON_OBJECT('tag', 'naver') -- 초기 입력값은 정적으로 정의 )), (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://pre-processing-service:8000/products/search', + 'url', 'http://127.0.0.1:8000/products/search', 'method', 'POST', 'input_mapping', JSON_OBJECT('keyword', '키워드 검색 태스크.keyword') -- "키워드 검색 태스크"의 결과에서 "keyword" 필드를 가져와 매핑 )), (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://pre-processing-service:8000/products/match', + 'url', 'http://127.0.0.1:8000/products/match', 'method', 'POST', 'input_mapping', JSON_OBJECT( 'keyword', '키워드 검색 태스크.keyword', 'search_results', '상품 검색 태스크.search_results' ) )), - (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/products/similarity', 'method', 'POST')), - (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/products/crawl', 'method', 'POST')), + (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/similarity', 'method', 'POST')), + (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/crawl', 'method', 'POST')), -- Job 2의 Task들 - (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/blogs/rag/create', 'method', 'POST')), - (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('url', 'http://pre-processing-service:8000/blogs/publish', 'method', 'POST')); + (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/rag/create', 'method', 'POST')), + (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/publish', 'method', 'POST')); -- 워크플로우-Job 연결 INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES From 97f525a752d78f0cdb3e4a792937067c3fd20235 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 13:47:29 +0900 Subject: [PATCH 04/14] =?UTF-8?q?refactor:=20WORKFLOW=20=EA=B4=80=EB=A0=A8?= =?UTF-8?q?=20INSERT=20SQL=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/resources/sql/03-insert-workflow.sql | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index bff80749..8cb09b7d 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -15,33 +15,34 @@ INSERT INTO `job` (`id`, `name`, `description`) VALUES (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업'), (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업'); --- Task 생성 (ID: 1 ~ 7) + + +-- Task 생성 (ID: 1 ~ 7) - parameters.body의 value를 'Java 타입'으로 변경 INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES -- Job 1의 Task들 (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/keywords/search', - 'method', 'POST', - 'body', JSON_OBJECT('tag', 'naver') -- 초기 입력값은 정적으로 정의 + 'endpoint', '/keywords/search', 'method', 'POST', + 'body', JSON_OBJECT('tag', 'String') )), (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/products/search', - 'method', 'POST', - 'input_mapping', JSON_OBJECT('keyword', '키워드 검색 태스크.keyword') -- "키워드 검색 태스크"의 결과에서 "keyword" 필드를 가져와 매핑 + 'endpoint', '/products/search', 'method', 'POST', + 'body', JSON_OBJECT('keyword', 'String') )), (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( - 'url', 'http://127.0.0.1:8000/products/match', - 'method', 'POST', - 'input_mapping', JSON_OBJECT( - 'keyword', '키워드 검색 태스크.keyword', - 'search_results', '상품 검색 태스크.search_results' - ) + 'endpoint', '/products/match', 'method', 'POST', + 'body', JSON_OBJECT( + 'keyword', 'String', + 'search_results', 'List' -- 또는 'JsonNode' 등 약속된 타입 + ) )), - (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/similarity', 'method', 'POST')), - (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/products/crawl', 'method', 'POST')), + -- Body가 필요 없는 Task들은 비워둠 + (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('endpoint', '/products/similarity', 'method', 'POST')), + (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('endpoint', '/products/crawl', 'method', 'POST')), + (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), + (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('endpoint', '/blogs/publish', 'method', 'POST')) + ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parameters = VALUES(parameters), updated_at = NOW(); + - -- Job 2의 Task들 - (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/rag/create', 'method', 'POST')), - (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('url', 'http://127.0.0.1:8000/blogs/publish', 'method', 'POST')); -- 워크플로우-Job 연결 INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES From e185b7c088b2f2c10044a9e8e3ea5caef23d2991 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 14:35:14 +0900 Subject: [PATCH 05/14] =?UTF-8?q?refactor:=20task=20type=20=EB=84=A4?= =?UTF-8?q?=EC=9D=B4=EB=B0=8D=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/sql/03-insert-workflow.sql | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index 8cb09b7d..8ea131f2 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -20,15 +20,15 @@ INSERT INTO `job` (`id`, `name`, `description`) VALUES -- Task 생성 (ID: 1 ~ 7) - parameters.body의 value를 'Java 타입'으로 변경 INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES -- Job 1의 Task들 - (1, '키워드 검색 태스크', 'HTTP', JSON_OBJECT( + (1, '키워드 검색 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/keywords/search', 'method', 'POST', 'body', JSON_OBJECT('tag', 'String') )), - (2, '상품 검색 태스크', 'HTTP', JSON_OBJECT( + (2, '상품 검색 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/products/search', 'method', 'POST', 'body', JSON_OBJECT('keyword', 'String') )), - (3, '상품 매칭 태스크', 'HTTP', JSON_OBJECT( + (3, '상품 매칭 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/products/match', 'method', 'POST', 'body', JSON_OBJECT( 'keyword', 'String', @@ -36,10 +36,10 @@ INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES ) )), -- Body가 필요 없는 Task들은 비워둠 - (4, '상품 유사도 분석 태스크', 'HTTP', JSON_OBJECT('endpoint', '/products/similarity', 'method', 'POST')), - (5, '상품 정보 크롤링 태스크', 'HTTP', JSON_OBJECT('endpoint', '/products/crawl', 'method', 'POST')), - (6, '블로그 RAG 생성 태스크', 'HTTP', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), - (7, '블로그 발행 태스크', 'HTTP', JSON_OBJECT('endpoint', '/blogs/publish', 'method', 'POST')) + (4, '상품 유사도 분석 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/products/similarity', 'method', 'POST')), + (5, '상품 정보 크롤링 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/products/crawl', 'method', 'POST')), + (6, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), + (7, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/publish', 'method', 'POST')) ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parameters = VALUES(parameters), updated_at = NOW(); From a81216653ea99c7c8667ea2248d50de260f1793b Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 14:48:50 +0900 Subject: [PATCH 06/14] =?UTF-8?q?refactor:=20FastAPI=20=ED=86=B5=EC=8B=A0?= =?UTF-8?q?=20=EB=A1=9C=EC=A7=81=EC=9D=84=20=EC=A0=84=EC=9A=A9=20Adapter?= =?UTF-8?q?=EB=A1=9C=20=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fastapi/adapter/FastApiAdapter.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java diff --git a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java new file mode 100644 index 00000000..257b98db --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java @@ -0,0 +1,52 @@ +package site.icebang.external.fastapi.adapter; + +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import site.icebang.global.config.properties.FastApiProperties; + +@Slf4j +@Component +@RequiredArgsConstructor +public class FastApiAdapter { + + private final RestTemplate restTemplate; + private final FastApiProperties properties; + + /** + * FastAPI 서버에 API 요청을 보내는 범용 메소드 + * @param endpoint /keywords/search 와 같은 endpoint 경로 + * @param method HTTP 메소드 + * @param requestBody 요청 Body (JSON 문자열) + * @return 성공 시 응답 Body, 실패 시 null + */ + public String call(String endpoint, HttpMethod method, String requestBody) { + String fullUrl = properties.getUrl() + endpoint; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity requestEntity = new HttpEntity<>(requestBody, headers); + + try { + log.debug("FastAPI 요청: URL={}, Method={}, Body={}", fullUrl, method, requestBody); + + ResponseEntity responseEntity = restTemplate.exchange( + fullUrl, + method, + requestEntity, + String.class + ); + + String responseBody = responseEntity.getBody(); + log.debug("FastAPI 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); + return responseBody; + + } catch (RestClientException e) { + log.error("FastAPI 호출 실패: URL={}, Error={}", fullUrl, e.getMessage()); + return null; + } + } +} \ No newline at end of file From 03f594c9c2ed4c1a5fdd8382272cc0f204ac7ed7 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 16:24:05 +0900 Subject: [PATCH 07/14] =?UTF-8?q?refactor:=20=EC=82=AC=EC=9A=A9=ED=95=98?= =?UTF-8?q?=EC=A7=80=20=EC=95=8A=EB=8A=94=20Schema=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/sql/01-schema-h2.sql | 328 ------------------ 1 file changed, 328 deletions(-) delete mode 100644 apps/user-service/src/main/resources/sql/01-schema-h2.sql diff --git a/apps/user-service/src/main/resources/sql/01-schema-h2.sql b/apps/user-service/src/main/resources/sql/01-schema-h2.sql deleted file mode 100644 index 018ebb1d..00000000 --- a/apps/user-service/src/main/resources/sql/01-schema-h2.sql +++ /dev/null @@ -1,328 +0,0 @@ --- H2 데이터베이스 호환 스키마 (테스트용) --- MySQL의 unsigned, AFTER 절 등을 H2 호환으로 변경 - -CREATE TABLE `permission` ( - `id` int NOT NULL AUTO_INCREMENT, - `resource` varchar(100) NULL, - `description` varchar(255) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `is_active` boolean DEFAULT TRUE, - `updated_by` bigint NULL, - `created_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `organization` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(150) NULL, - `domain_name` varchar(100) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `role` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `organization_id` bigint NULL, - `name` varchar(100) NULL, - `description` varchar(500) NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `user` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(50) NULL, - `email` varchar(100) NULL, - `password` varchar(255) NULL, - `status` varchar(20) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `department` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `organization_id` bigint NOT NULL, - `name` varchar(100) NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `position` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `organization_id` bigint NOT NULL, - `title` varchar(100) NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `user_organization` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `user_id` bigint NOT NULL, - `organization_id` bigint NOT NULL, - `position_id` bigint NOT NULL, - `department_id` bigint NOT NULL, - `employee_number` varchar(50) NULL, - `status` varchar(20) NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `role_permission` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `role_id` bigint NOT NULL, - `permission_id` int NOT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_role_permission` (`role_id`, `permission_id`) -); - -CREATE TABLE `user_role` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `role_id` bigint NOT NULL, - `user_organization_id` bigint NOT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_user_role` (`role_id`, `user_organization_id`) -); - --- 성능 최적화를 위한 인덱스 -CREATE INDEX `idx_user_email` ON `user` (`email`); -CREATE INDEX `idx_user_status` ON `user` (`status`); -CREATE INDEX `idx_user_organization_user` ON `user_organization` (`user_id`); -CREATE INDEX `idx_user_organization_org` ON `user_organization` (`organization_id`); -CREATE INDEX `idx_user_organization_status` ON `user_organization` (`status`); -CREATE INDEX `idx_role_org` ON `role` (`organization_id`); -CREATE INDEX `idx_permission_resource` ON `permission` (`resource`); -CREATE INDEX `idx_permission_active` ON `permission` (`is_active`); - -CREATE TABLE `workflow` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NOT NULL UNIQUE, - `description` text NULL, - `is_enabled` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `updated_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `schedule` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_id` bigint NOT NULL, - `cron_expression` varchar(50) NULL, - `parameters` json NULL, - `is_active` boolean DEFAULT TRUE, - `last_run_status` varchar(20) NULL, - `last_run_at` timestamp NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `updated_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `job` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NOT NULL UNIQUE, - `description` text NULL, - `is_enabled` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - `updated_by` bigint NULL, - PRIMARY KEY (`id`) -); - -CREATE TABLE `task` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NOT NULL UNIQUE, - `type` varchar(50) NULL, - `parameters` json NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `workflow_job` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_id` bigint NOT NULL, - `job_id` bigint NOT NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_workflow_job` (`workflow_id`, `job_id`) -); - -CREATE TABLE `job_task` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `job_id` bigint NOT NULL, - `task_id` bigint NOT NULL, - `execution_order` int NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_job_task` (`job_id`, `task_id`) -); - -CREATE TABLE `execution_log` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `execution_type` varchar(20) NULL COMMENT 'task, schedule, job, workflow', - `source_id` bigint NULL COMMENT '모든 데이터에 대한 ID ex: job_id, schedule_id, task_id, ...', - `log_level` varchar(20) NULL, - `executed_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `log_message` text NULL, - `trace_id` char(36) NULL, - `config_snapshot` json NULL, - PRIMARY KEY (`id`), - INDEX `idx_source_id_type` (`source_id`, `execution_type`) -); - -CREATE TABLE `task_io_data` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `task_run_id` bigint NOT NULL, - `io_type` varchar(10) NOT NULL COMMENT 'INPUT, OUTPUT', - `name` varchar(100) NOT NULL COMMENT '파라미터/변수 이름', - `data_type` varchar(50) NOT NULL COMMENT 'string, number, json, file, etc', - `data_value` json NULL COMMENT '실제 데이터 값', - `data_size` bigint NULL COMMENT '데이터 크기 (bytes)', - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - INDEX `idx_task_io_task_run_id` (`task_run_id`), - INDEX `idx_task_io_type` (`io_type`), - INDEX `idx_task_io_name` (`name`) -); - -CREATE TABLE `config` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `target_type` varchar(50) NULL COMMENT 'user, job, workflow', - `target_id` bigint NULL, - `version` int NULL, - `json` json NULL, - `is_active` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `created_by` bigint NULL, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_config_target` (`target_type`, `target_id`) -); - -CREATE TABLE `category` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `name` varchar(100) NULL, - `description` text NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - -CREATE TABLE `user_config` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `user_id` bigint NOT NULL, - `type` varchar(50) NULL, - `name` varchar(100) NULL, - `json` json NULL, - `is_active` boolean DEFAULT TRUE, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - `updated_at` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`id`) -); - --- 인덱스 추가 (성능 최적화) -CREATE INDEX `idx_schedule_workflow` ON `schedule` (`workflow_id`); -CREATE INDEX `idx_job_enabled` ON `job` (`is_enabled`); -CREATE INDEX `idx_task_type` ON `task` (`type`); -CREATE INDEX `idx_workflow_enabled` ON `workflow` (`is_enabled`); -CREATE UNIQUE INDEX `uk_schedule_workflow` ON `schedule` (`workflow_id`); -CREATE UNIQUE INDEX `uk_job_name` ON `job` (`name`); -CREATE UNIQUE INDEX `uk_task_name` ON `task` (`name`); -CREATE UNIQUE INDEX `uk_workflow_name` ON `workflow` (`name`); -CREATE INDEX `idx_user_config_user` ON `user_config` (`user_id`); - --- 워크플로우 실행 테이블 -CREATE TABLE `workflow_run` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_id` bigint NOT NULL, - `trace_id` char(36) NOT NULL, - `run_number` varchar(20) NULL, - `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled', - `trigger_type` varchar(20) NULL COMMENT 'manual, schedule, push, pull_request', - `started_at` timestamp NULL, - `finished_at` timestamp NULL, - `created_by` bigint NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_workflow_run_trace` (`trace_id`), - INDEX `idx_workflow_run_status` (`status`), - INDEX `idx_workflow_run_workflow_id` (`workflow_id`), - INDEX `idx_workflow_run_created_at` (`created_at`) -); - --- Job 실행 테이블 -CREATE TABLE `job_run` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `workflow_run_id` bigint NOT NULL, - `job_id` bigint NOT NULL, - `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled, skipped', - `started_at` timestamp NULL, - `finished_at` timestamp NULL, - `execution_order` int NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - INDEX `idx_job_run_workflow_run_id` (`workflow_run_id`), - INDEX `idx_job_run_status` (`status`), - INDEX `idx_job_run_job_id` (`job_id`) -); - --- Task 실행 테이블 -CREATE TABLE `task_run` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `job_run_id` bigint NOT NULL, - `task_id` bigint NOT NULL, - `status` varchar(20) NULL COMMENT 'pending, running, success, failed, cancelled, skipped', - `started_at` timestamp NULL, - `finished_at` timestamp NULL, - `execution_order` int NULL, - `created_at` timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - INDEX `idx_task_run_job_run_id` (`job_run_id`), - INDEX `idx_task_run_status` (`status`), - INDEX `idx_task_run_task_id` (`task_id`) -); - --- v0.0.3 - H2 호환 버전 -DROP TABLE IF EXISTS `config`; - --- H2에서는 한 번에 하나씩 컬럼 추가 -ALTER TABLE `workflow_job` ADD COLUMN `execution_order` INT NULL; - -ALTER TABLE `schedule` ADD COLUMN `schedule_text` varchar(20) NULL; - -ALTER TABLE `workflow` ADD COLUMN `default_config` json NULL; - -ALTER TABLE `user` ADD COLUMN `joined_at` timestamp NULL; - -ALTER TABLE `department` ADD COLUMN `description` varchar(100) NULL; - --- v0.4 - H2 호환 버전 (AFTER 절 제거, unsigned 제거, 개별 ALTER 구문으로 분리) --- execution_log 테이블 컬럼 추가 (H2 호환) -ALTER TABLE `execution_log` ADD COLUMN `run_id` bigint NULL; -ALTER TABLE `execution_log` ADD COLUMN `status` varchar(20) NULL; -ALTER TABLE `execution_log` ADD COLUMN `duration_ms` int NULL; -ALTER TABLE `execution_log` ADD COLUMN `error_code` varchar(50) NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved1` varchar(100) NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved2` varchar(100) NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved3` int NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved4` json NULL; -ALTER TABLE `execution_log` ADD COLUMN `reserved5` timestamp NULL; - --- 기존 컬럼 수정 (H2 호환) -ALTER TABLE `execution_log` ALTER COLUMN `log_message` varchar(500) NOT NULL; -ALTER TABLE `execution_log` ALTER COLUMN `executed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP; - --- 기존 불필요한 컬럼 제거 -ALTER TABLE `execution_log` DROP COLUMN IF EXISTS `config_snapshot`; - --- 새로운 인덱스 추가 -CREATE INDEX `idx_run_id` ON `execution_log` (`run_id`); -CREATE INDEX `idx_log_level_status` ON `execution_log` (`log_level`, `status`); -CREATE INDEX `idx_error_code` ON `execution_log` (`error_code`); -CREATE INDEX `idx_duration` ON `execution_log` (`duration_ms`); - --- 기존 인덱스 수정 -DROP INDEX IF EXISTS `idx_source_id_type`; -CREATE INDEX `idx_execution_type_source` ON `execution_log` (`execution_type`, `source_id`); From 031c9e8f45d613cd31b4a495410d633c0e001b89 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 17:04:21 +0900 Subject: [PATCH 08/14] =?UTF-8?q?refactor:=20WORKFLOW=EA=B4=80=EB=A0=A8=20?= =?UTF-8?q?INSERT=20SQL=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/resources/sql/03-insert-workflow.sql | 87 +++++++++++++------ 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql index 8ea131f2..9b6db4c0 100644 --- a/apps/user-service/src/main/resources/sql/03-insert-workflow.sql +++ b/apps/user-service/src/main/resources/sql/03-insert-workflow.sql @@ -1,59 +1,90 @@ --- 기존 워크플로우 관련 데이터 삭제 +-- =================================================================== +-- 워크플로우 관련 데이터 초기화 +-- =================================================================== +-- 참조 관계 역순으로 데이터 삭제 +DELETE FROM `schedule`; DELETE FROM `job_task`; DELETE FROM `workflow_job`; DELETE FROM `task`; DELETE FROM `job`; DELETE FROM `workflow`; -DELETE FROM `schedule`; + +-- =================================================================== +-- 워크플로우 정적 데이터 삽입 +-- =================================================================== -- 워크플로우 생성 (ID: 1) -INSERT INTO `workflow` (`id`, `name`, `description`) VALUES - (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스'); +INSERT INTO `workflow` (`id`, `name`, `description`, `created_by`) VALUES + (1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스', 1) + ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); -- Job 생성 (ID: 1, 2) -INSERT INTO `job` (`id`, `name`, `description`) VALUES - (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업'), - (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업'); - - +INSERT INTO `job` (`id`, `name`, `description`, `created_by`) VALUES + (1, '상품 분석', '키워드 검색, 상품 크롤링 및 유사도 분석 작업', 1), + (2, '블로그 콘텐츠 생성', '분석 데이터를 기반으로 RAG 콘텐츠 생성 및 발행 작업', 1) + ON DUPLICATE KEY UPDATE name = VALUES(name), description = VALUES(description), updated_at = NOW(); --- Task 생성 (ID: 1 ~ 7) - parameters.body의 value를 'Java 타입'으로 변경 +-- Task 생성 (ID: 1 ~ 7) - FastAPI Request Body 스키마 반영 INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES - -- Job 1의 Task들 (1, '키워드 검색 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/keywords/search', 'method', 'POST', - 'body', JSON_OBJECT('tag', 'String') - )), + 'body', JSON_OBJECT('tag', 'String') -- { "tag": str } + )), (2, '상품 검색 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/products/search', 'method', 'POST', - 'body', JSON_OBJECT('keyword', 'String') - )), + 'body', JSON_OBJECT('keyword', 'String') -- { "keyword": str } + )), (3, '상품 매칭 태스크', 'FastAPI', JSON_OBJECT( 'endpoint', '/products/match', 'method', 'POST', - 'body', JSON_OBJECT( + 'body', JSON_OBJECT( -- { keyword: str, search_results: List } 'keyword', 'String', - 'search_results', 'List' -- 또는 'JsonNode' 등 약속된 타입 + 'search_results', 'List' ) - )), - -- Body가 필요 없는 Task들은 비워둠 - (4, '상품 유사도 분석 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/products/similarity', 'method', 'POST')), - (5, '상품 정보 크롤링 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/products/crawl', 'method', 'POST')), + )), + (4, '상품 유사도 분석 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/similarity', 'method', 'POST', + 'body', JSON_OBJECT( -- { keyword: str, matched_products: List, search_results: List } + 'keyword', 'String', + 'matched_products', 'List', + 'search_results', 'List' + ) + )), + (5, '상품 정보 크롤링 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/products/crawl', 'method', 'POST', + 'body', JSON_OBJECT('product_url', 'String') -- { "product_url": str } + )), + -- RAG관련 request body는 추후에 결정될 예정 (6, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')), - (7, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/publish', 'method', 'POST')) + (7, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT( + 'endpoint', '/blogs/publish', 'method', 'POST', + 'body', JSON_OBJECT( -- { tag: str, blog_id: str, ... } + 'tag', 'String', + 'blog_id', 'String', + 'blog_pw', 'String', + 'blog_name', 'String', + 'post_title', 'String', + 'post_content', 'String', + 'post_tags', 'List' + ) + )) ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parameters = VALUES(parameters), updated_at = NOW(); - - +-- =================================================================== +-- 워크플로우 구조 및 스케줄 데이터 삽입 +-- =================================================================== -- 워크플로우-Job 연결 INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES (1, 1, 1), - (1, 2, 2); + (1, 2, 2) + ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); -- Job-Task 연결 INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), - (2, 6, 1), (2, 7, 2); + (2, 6, 1), (2, 7, 2) + ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order); -- 스케줄 설정 (매일 오전 8시) -INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`) VALUES - (1, '0 0 8 * * ?', TRUE); \ No newline at end of file +INSERT INTO `schedule` (`workflow_id`, `cron_expression`, `is_active`, `created_by`) VALUES + (1, '0 0 8 * * ?', TRUE, 1) + ON DUPLICATE KEY UPDATE cron_expression = VALUES(cron_expression), is_active = VALUES(is_active), updated_at = NOW(); \ No newline at end of file From 52f11f4ee33502595bb2067f24b22cb92a5eecd7 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 18:03:31 +0900 Subject: [PATCH 09/14] =?UTF-8?q?refactor:=20FastApi=20=ED=86=B5=EC=8B=A0?= =?UTF-8?q?=20=EA=B4=80=EB=A0=A8=20=EB=A1=9C=EC=A7=81=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/runner/FastApiTaskRunner.java | 31 +++++ .../workflow/runner/HttpTaskRunner.java | 67 --------- .../service/WorkflowExecutionService.java | 131 ++++-------------- .../fastapi/adapter/FastApiAdapter.java | 28 ++-- 4 files changed, 66 insertions(+), 191 deletions(-) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java delete mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java new file mode 100644 index 00000000..e6ab9001 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java @@ -0,0 +1,31 @@ +package site.icebang.domain.workflow.runner; + +import org.springframework.http.HttpMethod; +import org.springframework.stereotype.Component; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import site.icebang.domain.execution.model.TaskRun; +import site.icebang.domain.workflow.model.Task; +import site.icebang.external.fastapi.adapter.FastApiAdapter; // 📌 새로운 어댑터 import + +@Component("fastapiTaskRunner") +@RequiredArgsConstructor +public class FastApiTaskRunner implements TaskRunner { + + private final FastApiAdapter fastApiAdapter; + + @Override + public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { + JsonNode params = task.getParameters(); + String endpoint = params.path("endpoint").asText(); + HttpMethod method = HttpMethod.valueOf(params.path("method").asText("POST").toUpperCase()); + + String responseBody = fastApiAdapter.call(endpoint, method, requestBody.toString()); + + if (responseBody == null) { + return TaskExecutionResult.failure("FastApiAdapter 호출에 실패했습니다."); + } + return TaskExecutionResult.success(responseBody); + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java deleted file mode 100644 index 861edd5a..00000000 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/HttpTaskRunner.java +++ /dev/null @@ -1,67 +0,0 @@ -package site.icebang.domain.workflow.runner; - -import org.springframework.http.*; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestClientException; -import org.springframework.web.client.RestTemplate; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import site.icebang.domain.execution.model.TaskRun; -import site.icebang.domain.workflow.model.Task; - -@Slf4j -@Component("httpTaskRunner") // "httpTaskRunner"라는 이름의 Bean으로 등록 -@RequiredArgsConstructor -public class HttpTaskRunner implements TaskRunner { - - private final RestTemplate restTemplate; - - // private final TaskIoDataRepository taskIoDataRepository; // TODO: 입출력 저장을 위해 주입 - - @Override - public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode requestBody) { - JsonNode params = task.getParameters(); - if (params == null) { - return TaskExecutionResult.failure("Task에 파라미터가 정의되지 않았습니다."); - } - - String url = params.path("url").asText(); - String method = params.path("method").asText("POST"); // 기본값 POST - - if (url.isEmpty()) { - return TaskExecutionResult.failure("Task 파라미터에 'url'이 없습니다."); - } - - try { - // 1. HTTP 헤더 설정 - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - - // 2. HTTP 요청 엔티티 생성 (헤더 + 동적 Body) - HttpEntity requestEntity = new HttpEntity<>(requestBody.toString(), headers); - - log.debug("HTTP Task 요청: URL={}, Method={}, Body={}", url, method, requestBody.toString()); - - // 3. RestTemplate으로 API 호출 - ResponseEntity responseEntity = - restTemplate.exchange( - url, HttpMethod.valueOf(method.toUpperCase()), requestEntity, String.class); - - String responseBody = responseEntity.getBody(); - log.debug("HTTP Task 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); - - // TODO: taskIoDataRepository를 사용하여 requestBody와 responseBody를 DB에 저장 - - return TaskExecutionResult.success(responseBody); - - } catch (RestClientException e) { - log.error("HTTP Task 실행 중 에러 발생: TaskRunId={}, Error={}", taskRun.getId(), e.getMessage()); - return TaskExecutionResult.failure(e.getMessage()); - } - } -} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 0f0e316d..2529d931 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,20 +1,13 @@ package site.icebang.domain.workflow.service; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; - +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import site.icebang.domain.execution.mapper.JobRunMapper; import site.icebang.domain.execution.mapper.TaskRunMapper; import site.icebang.domain.execution.mapper.WorkflowRunMapper; @@ -26,6 +19,12 @@ import site.icebang.domain.workflow.model.Job; import site.icebang.domain.workflow.model.Task; import site.icebang.domain.workflow.runner.TaskRunner; +import site.icebang.domain.workflow.runner.body.TaskBodyBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Slf4j @Service @@ -37,7 +36,8 @@ public class WorkflowExecutionService { private final JobRunMapper jobRunMapper; private final TaskRunMapper taskRunMapper; private final Map taskRunners; - private final ObjectMapper objectMapper; // 📌 JSON 처리를 위해 ObjectMapper 주입 + private final ObjectMapper objectMapper; + private final List bodyBuilders; @Transactional public void executeWorkflow(Long workflowId) { @@ -45,19 +45,15 @@ public void executeWorkflow(Long workflowId) { WorkflowRun workflowRun = WorkflowRun.start(workflowId); workflowRunMapper.insert(workflowRun); - // 📌 1. 워크플로우 전체 실행 동안 데이터를 공유할 컨텍스트 생성 Map workflowContext = new HashMap<>(); - List jobs = jobMapper.findJobsByWorkflowId(workflowId); log.info("총 {}개의 Job을 순차적으로 실행합니다.", jobs.size()); for (Job job : jobs) { JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - log.info( - "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + log.info("---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); - // 📌 2. Job 내의 Task들을 실행하고, 컨텍스트를 전달하여 데이터 파이프라이닝 수행 boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED"); @@ -77,16 +73,13 @@ public void executeWorkflow(Long workflowId) { log.info("========== 워크플로우 실행 성공: WorkflowRunId={} ==========", workflowRun.getId()); } - /** - * 특정 Job에 속한 Task들을 순차적으로 실행합니다. - * - * @param jobRun 실행중인 Job의 기록 객체 - * @return 모든 Task가 성공하면 true, 하나라도 실패하면 false - */ private boolean executeTasksForJob(JobRun jobRun, Map workflowContext) { - // TaskDto를 조회하고 Task로 변환 + // 📌 Mapper로부터 TaskDto 리스트를 조회합니다. List taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId()); - List tasks = taskDtos.stream().map(this::convertToTask).toList(); + + // 📌 convertToTask 메소드를 사용하여 Task 모델 리스트로 변환합니다. + List tasks = taskDtos.stream().map(this::convertToTask).collect(Collectors.toList()); + log.info("Job (JobRunId={}) 내 총 {}개의 Task를 실행합니다.", jobRun.getId(), tasks.size()); for (Task task : tasks) { @@ -104,10 +97,12 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } - // 📌 3. Task 실행 전, 컨텍스트를 이용해 동적으로 Request Body를 생성 - ObjectNode requestBody = prepareRequestBody(task, workflowContext); + ObjectNode requestBody = bodyBuilders.stream() + .filter(builder -> builder.supports(task.getName())) + .findFirst() + .map(builder -> builder.build(task, workflowContext)) + .orElse(objectMapper.createObjectNode()); - // 📌 4. 동적으로 생성된 Request Body를 전달하여 Task 실행 TaskRunner.TaskExecutionResult result = runner.execute(task, taskRun, requestBody); taskRun.finish(result.status(), result.message()); taskRunMapper.update(taskRun); @@ -117,7 +112,6 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } - // 📌 5. 성공한 Task의 결과를 다음 Task가 사용할 수 있도록 컨텍스트에 저장 try { JsonNode resultJson = objectMapper.readTree(result.message()); workflowContext.put(task.getName(), resultJson); @@ -133,82 +127,10 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return true; } - /** 워크플로우 컨텍스트와 Task의 input_mapping 설정을 기반으로 API 요청에 사용할 동적인 Request Body를 생성합니다. */ - private ObjectNode prepareRequestBody(Task task, Map context) { - ObjectNode requestBody = objectMapper.createObjectNode(); - JsonNode params = task.getParameters(); - if (params == null) return requestBody; - - JsonNode mappingRules = params.get("input_mapping"); - JsonNode staticBody = params.get("body"); - - // 정적 body가 있으면 우선적으로 복사 - if (staticBody != null && staticBody.isObject()) { - requestBody.setAll((ObjectNode) staticBody); - } - - // 📌 디버깅용: 현재 컨텍스트 출력 - log.debug("=== 워크플로우 컨텍스트 확인 ==="); - for (Map.Entry entry : context.entrySet()) { - log.debug("Task: {}, Data: {}", entry.getKey(), entry.getValue().toString()); - } - - // input_mapping 규칙에 따라 동적으로 값 덮어쓰기/추가 - if (mappingRules != null && mappingRules.isObject()) { - mappingRules - .fields() - .forEachRemaining( - entry -> { - String targetField = entry.getKey(); // 예: "product_url" - String sourcePath = - entry - .getValue() - .asText(); // 예: "상품 유사도 분석 태스크.data.selected_product.product_url" - - log.debug("=== input_mapping 처리 ==="); - log.debug("targetField: {}, sourcePath: {}", targetField, sourcePath); - - String[] parts = sourcePath.split("\\.", 2); - if (parts.length == 2) { - String sourceTaskName = parts[0]; - String sourceFieldPath = parts[1]; - - log.debug( - "sourceTaskName: {}, sourceFieldPath: {}", sourceTaskName, sourceFieldPath); - - JsonNode sourceData = context.get(sourceTaskName); - log.debug("sourceData found: {}", sourceData != null); - - if (sourceData != null) { - log.debug("sourceData content: {}", sourceData.toString()); - - String jsonPath = "/" + sourceFieldPath.replace('.', '/'); - log.debug("jsonPath: {}", jsonPath); - - JsonNode valueToSet = sourceData.at(jsonPath); - log.debug( - "valueToSet found: {}, isMissing: {}", - valueToSet, - valueToSet.isMissingNode()); - - if (!valueToSet.isMissingNode()) { - log.debug("설정할 값: {}", valueToSet.toString()); - requestBody.set(targetField, valueToSet); - } else { - log.warn("값을 찾을 수 없음: jsonPath={}", jsonPath); - } - } else { - log.warn("소스 태스크 데이터를 찾을 수 없음: {}", sourceTaskName); - } - } - }); - } - - log.debug("최종 requestBody: {}", requestBody.toString()); - return requestBody; - } - - /** TaskDto를 Task 모델로 변환합니다. 비즈니스 로직 실행에 필요한 필드만 복사합니다. */ + /** + * TaskDto를 Task 모델로 변환합니다. + * 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. + */ private Task convertToTask(TaskDto taskDto) { Task task = new Task(); try { @@ -231,7 +153,6 @@ private Task convertToTask(TaskDto taskDto) { } catch (Exception e) { throw new RuntimeException("TaskDto to Task 변환 중 오류 발생", e); } - return task; } -} +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java index 257b98db..2ecf46d1 100644 --- a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java +++ b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java @@ -1,12 +1,17 @@ package site.icebang.external.fastapi.adapter; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.*; import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import site.icebang.domain.workflow.model.Task; import site.icebang.global.config.properties.FastApiProperties; +import java.util.Map; @Slf4j @Component @@ -16,34 +21,19 @@ public class FastApiAdapter { private final RestTemplate restTemplate; private final FastApiProperties properties; - /** - * FastAPI 서버에 API 요청을 보내는 범용 메소드 - * @param endpoint /keywords/search 와 같은 endpoint 경로 - * @param method HTTP 메소드 - * @param requestBody 요청 Body (JSON 문자열) - * @return 성공 시 응답 Body, 실패 시 null - */ + // 📌 Task나 context에 대한 의존성이 완전히 사라짐 public String call(String endpoint, HttpMethod method, String requestBody) { String fullUrl = properties.getUrl() + endpoint; - HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity requestEntity = new HttpEntity<>(requestBody, headers); try { log.debug("FastAPI 요청: URL={}, Method={}, Body={}", fullUrl, method, requestBody); - - ResponseEntity responseEntity = restTemplate.exchange( - fullUrl, - method, - requestEntity, - String.class - ); - + ResponseEntity responseEntity = restTemplate.exchange(fullUrl, method, requestEntity, String.class); String responseBody = responseEntity.getBody(); log.debug("FastAPI 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); return responseBody; - } catch (RestClientException e) { log.error("FastAPI 호출 실패: URL={}, Error={}", fullUrl, e.getMessage()); return null; From eefb3d138bdf65e2e8f625785e7a80eb3906d5a8 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 18:05:29 +0900 Subject: [PATCH 10/14] =?UTF-8?q?refactor:=20FastApi=20=EC=B8=A1=20request?= =?UTF-8?q?Body=20=EA=B4=80=EB=A0=A8=20=EB=A1=9C=EC=A7=81=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runner/body/ProductSearchBodyBuilder.java | 30 +++++++++++++++++++ .../workflow/runner/body/TaskBodyBuilder.java | 11 +++++++ 2 files changed, 41 insertions(+) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java new file mode 100644 index 00000000..91fca5f0 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java @@ -0,0 +1,30 @@ +package site.icebang.domain.workflow.runner.body; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import java.util.Map; +import site.icebang.domain.workflow.model.Task; + +@Component +@RequiredArgsConstructor +public class ProductSearchBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 검색 태스크"; + private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); + String keyword = sourceResult != null ? sourceResult.path("keyword").asText("") : ""; + return objectMapper.createObjectNode().put("keyword", keyword); + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java new file mode 100644 index 00000000..237e93fb --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java @@ -0,0 +1,11 @@ +package site.icebang.domain.workflow.runner.body; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Map; +import site.icebang.domain.workflow.model.Task; + +public interface TaskBodyBuilder { + boolean supports(String taskName); + ObjectNode build(Task task, Map workflowContext); +} \ No newline at end of file From 2baee968dd3c460ee4ea0130c8dd87f7ee92c321 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 18:10:23 +0900 Subject: [PATCH 11/14] refactor: Code Formatting --- .../workflow/runner/FastApiTaskRunner.java | 7 ++- .../runner/body/ProductSearchBodyBuilder.java | 40 ++++++++------- .../workflow/runner/body/TaskBodyBuilder.java | 11 ++-- .../service/WorkflowExecutionService.java | 30 +++++------ .../fastapi/adapter/FastApiAdapter.java | 50 +++++++++---------- 5 files changed, 74 insertions(+), 64 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java index e6ab9001..5a36afa3 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/FastApiTaskRunner.java @@ -2,12 +2,15 @@ import org.springframework.http.HttpMethod; import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; + import site.icebang.domain.execution.model.TaskRun; import site.icebang.domain.workflow.model.Task; -import site.icebang.external.fastapi.adapter.FastApiAdapter; // 📌 새로운 어댑터 import +import site.icebang.external.fastapi.adapter.FastApiAdapter; @Component("fastapiTaskRunner") @RequiredArgsConstructor @@ -28,4 +31,4 @@ public TaskExecutionResult execute(Task task, TaskRun taskRun, ObjectNode reques } return TaskExecutionResult.success(responseBody); } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java index 91fca5f0..2dd3fcb6 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductSearchBodyBuilder.java @@ -1,30 +1,34 @@ package site.icebang.domain.workflow.runner.body; +import java.util.Map; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; -import java.util.Map; + import site.icebang.domain.workflow.model.Task; @Component @RequiredArgsConstructor public class ProductSearchBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "상품 검색 태스크"; - private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); - String keyword = sourceResult != null ? sourceResult.path("keyword").asText("") : ""; - return objectMapper.createObjectNode().put("keyword", keyword); - } -} \ No newline at end of file + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 검색 태스크"; + private static final String SOURCE_TASK_NAME = "키워드 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + JsonNode sourceResult = workflowContext.get(SOURCE_TASK_NAME); + String keyword = sourceResult != null ? sourceResult.path("keyword").asText("") : ""; + return objectMapper.createObjectNode().put("keyword", keyword); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java index 237e93fb..da6f1597 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java @@ -1,11 +1,14 @@ package site.icebang.domain.workflow.runner.body; +import java.util.Map; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.util.Map; + import site.icebang.domain.workflow.model.Task; public interface TaskBodyBuilder { - boolean supports(String taskName); - ObjectNode build(Task task, Map workflowContext); -} \ No newline at end of file + boolean supports(String taskName); + + ObjectNode build(Task task, Map workflowContext); +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java index 2529d931..d142b630 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -1,13 +1,21 @@ package site.icebang.domain.workflow.service; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; + import site.icebang.domain.execution.mapper.JobRunMapper; import site.icebang.domain.execution.mapper.TaskRunMapper; import site.icebang.domain.execution.mapper.WorkflowRunMapper; @@ -21,11 +29,6 @@ import site.icebang.domain.workflow.runner.TaskRunner; import site.icebang.domain.workflow.runner.body.TaskBodyBuilder; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - @Slf4j @Service @RequiredArgsConstructor @@ -52,7 +55,8 @@ public void executeWorkflow(Long workflowId) { for (Job job : jobs) { JobRun jobRun = JobRun.start(workflowRun.getId(), job.getId()); jobRunMapper.insert(jobRun); - log.info("---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); + log.info( + "---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId()); boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext); @@ -97,7 +101,8 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return false; } - ObjectNode requestBody = bodyBuilders.stream() + ObjectNode requestBody = + bodyBuilders.stream() .filter(builder -> builder.supports(task.getName())) .findFirst() .map(builder -> builder.build(task, workflowContext)) @@ -127,10 +132,7 @@ private boolean executeTasksForJob(JobRun jobRun, Map workflow return true; } - /** - * TaskDto를 Task 모델로 변환합니다. - * 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. - */ + /** TaskDto를 Task 모델로 변환합니다. 📌 주의: Reflection을 사용한 방식은 성능이 느리고 불안정하므로 권장되지 않습니다. */ private Task convertToTask(TaskDto taskDto) { Task task = new Task(); try { @@ -155,4 +157,4 @@ private Task convertToTask(TaskDto taskDto) { } return task; } -} \ No newline at end of file +} diff --git a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java index 2ecf46d1..2a5bd001 100644 --- a/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java +++ b/apps/user-service/src/main/java/site/icebang/external/fastapi/adapter/FastApiAdapter.java @@ -1,42 +1,40 @@ package site.icebang.external.fastapi.adapter; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.springframework.http.*; import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; -import site.icebang.domain.workflow.model.Task; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import site.icebang.global.config.properties.FastApiProperties; -import java.util.Map; @Slf4j @Component @RequiredArgsConstructor public class FastApiAdapter { - private final RestTemplate restTemplate; - private final FastApiProperties properties; + private final RestTemplate restTemplate; + private final FastApiProperties properties; - // 📌 Task나 context에 대한 의존성이 완전히 사라짐 - public String call(String endpoint, HttpMethod method, String requestBody) { - String fullUrl = properties.getUrl() + endpoint; - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - HttpEntity requestEntity = new HttpEntity<>(requestBody, headers); + // 📌 Task나 context에 대한 의존성이 완전히 사라짐 + public String call(String endpoint, HttpMethod method, String requestBody) { + String fullUrl = properties.getUrl() + endpoint; + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity requestEntity = new HttpEntity<>(requestBody, headers); - try { - log.debug("FastAPI 요청: URL={}, Method={}, Body={}", fullUrl, method, requestBody); - ResponseEntity responseEntity = restTemplate.exchange(fullUrl, method, requestEntity, String.class); - String responseBody = responseEntity.getBody(); - log.debug("FastAPI 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); - return responseBody; - } catch (RestClientException e) { - log.error("FastAPI 호출 실패: URL={}, Error={}", fullUrl, e.getMessage()); - return null; - } + try { + log.debug("FastAPI 요청: URL={}, Method={}, Body={}", fullUrl, method, requestBody); + ResponseEntity responseEntity = + restTemplate.exchange(fullUrl, method, requestEntity, String.class); + String responseBody = responseEntity.getBody(); + log.debug("FastAPI 응답: Status={}, Body={}", responseEntity.getStatusCode(), responseBody); + return responseBody; + } catch (RestClientException e) { + log.error("FastAPI 호출 실패: URL={}, Error={}", fullUrl, e.getMessage()); + return null; } -} \ No newline at end of file + } +} From c91ae4f1e296a019872a5d00a8d3e2678c5fefa4 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 18:57:59 +0900 Subject: [PATCH 12/14] =?UTF-8?q?refactor:=20BodyBuilder=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84=EC=B2=B4=20=EC=83=9D=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runner/body/EmptyBodyBuilder.java | 36 ++++++++++++++++ .../runner/body/KeywordSearchBodyBuilder.java | 28 +++++++++++++ .../runner/body/ProductMatchBodyBuilder.java | 42 +++++++++++++++++++ 3 files changed, 106 insertions(+) create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java create mode 100644 apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java new file mode 100644 index 00000000..d8891776 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java @@ -0,0 +1,36 @@ +package site.icebang.domain.workflow.runner.body; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.Task; +import java.util.Map; +import java.util.Set; + +@Component +@RequiredArgsConstructor +public class EmptyBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final Set SUPPORTED_TASKS = Set.of( + "상품 유사도 분석 태스크", + "상품 정보 크롤링 태스크", + "블로그 RAG 생성 태스크", + "블로그 발행 태스크" + ); + + @Override + public boolean supports(String taskName) { + return SUPPORTED_TASKS.contains(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + // 이 Task들은 Body가 필요 없으므로 빈 객체를 반환합니다. + // TODO: 나중에 이 Task들이 이전 단계의 결과값을 필요로 하게 되면, + // 다른 빌더들처럼 workflowContext에서 데이터를 꺼내 Body를 구성하도록 수정합니다. + return objectMapper.createObjectNode(); + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java new file mode 100644 index 00000000..d26d3730 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java @@ -0,0 +1,28 @@ +package site.icebang.domain.workflow.runner.body; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.Task; +import java.util.Map; + +@Component +@RequiredArgsConstructor +public class KeywordSearchBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "키워드 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + // 이 Task는 항상 정적인 Body를 가집니다. + return objectMapper.createObjectNode().put("tag", "naver"); + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java new file mode 100644 index 00000000..b634a383 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java @@ -0,0 +1,42 @@ +package site.icebang.domain.workflow.runner.body; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import site.icebang.domain.workflow.model.Task; +import java.util.Map; +import java.util.Optional; + +@Component +@RequiredArgsConstructor +public class ProductMatchBodyBuilder implements TaskBodyBuilder { + + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 매칭 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + // 상품 검색 결과 정보 가져오기 + Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) + .map(node -> node.path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + + return body; + } +} \ No newline at end of file From f4accaefa54d7cde6b6920a7878f15516f061db2 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 18:58:07 +0900 Subject: [PATCH 13/14] =?UTF-8?q?refactor:=20BodyBuilder=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84=EC=B2=B4=20=EC=83=9D=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/runner/body/TaskBodyBuilder.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java index da6f1597..920b89d3 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java @@ -1,14 +1,24 @@ package site.icebang.domain.workflow.runner.body; -import java.util.Map; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; - import site.icebang.domain.workflow.model.Task; +import java.util.Map; public interface TaskBodyBuilder { - boolean supports(String taskName); - ObjectNode build(Task task, Map workflowContext); -} + /** + * 이 빌더가 어떤 Task를 지원하는지 식별합니다. + * @param taskName Task의 고유한 이름 + * @return 지원하면 true, 아니면 false + */ + boolean supports(String taskName); + + /** + * 실제 API 요청에 사용될 Body를 생성합니다. + * @param task DB에 저장된 Task의 원본 정의 + * @param workflowContext 이전 Task들의 결과가 담긴 컨텍스트 + * @return 생성된 JSON Body + */ + ObjectNode build(Task task, Map workflowContext); +} \ No newline at end of file From 521e4e5ee68a37eb8694587eef7db90a2078ea36 Mon Sep 17 00:00:00 2001 From: jihukimme Date: Fri, 19 Sep 2025 18:59:23 +0900 Subject: [PATCH 14/14] refactor: Code Formatting --- .../runner/body/EmptyBodyBuilder.java | 48 +++++++------- .../runner/body/KeywordSearchBodyBuilder.java | 32 ++++++---- .../runner/body/ProductMatchBodyBuilder.java | 64 ++++++++++--------- .../workflow/runner/body/TaskBodyBuilder.java | 34 +++++----- 4 files changed, 95 insertions(+), 83 deletions(-) diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java index d8891776..3385a8ed 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/EmptyBodyBuilder.java @@ -1,36 +1,36 @@ package site.icebang.domain.workflow.runner.body; +import java.util.Map; +import java.util.Set; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.Task; -import java.util.Map; -import java.util.Set; @Component @RequiredArgsConstructor public class EmptyBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final Set SUPPORTED_TASKS = Set.of( - "상품 유사도 분석 태스크", - "상품 정보 크롤링 태스크", - "블로그 RAG 생성 태스크", - "블로그 발행 태스크" - ); - - @Override - public boolean supports(String taskName) { - return SUPPORTED_TASKS.contains(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - // 이 Task들은 Body가 필요 없으므로 빈 객체를 반환합니다. - // TODO: 나중에 이 Task들이 이전 단계의 결과값을 필요로 하게 되면, - // 다른 빌더들처럼 workflowContext에서 데이터를 꺼내 Body를 구성하도록 수정합니다. - return objectMapper.createObjectNode(); - } -} \ No newline at end of file + private final ObjectMapper objectMapper; + private static final Set SUPPORTED_TASKS = + Set.of("상품 유사도 분석 태스크", "상품 정보 크롤링 태스크", "블로그 RAG 생성 태스크", "블로그 발행 태스크"); + + @Override + public boolean supports(String taskName) { + return SUPPORTED_TASKS.contains(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + // 이 Task들은 Body가 필요 없으므로 빈 객체를 반환합니다. + // TODO: 나중에 이 Task들이 이전 단계의 결과값을 필요로 하게 되면, + // 다른 빌더들처럼 workflowContext에서 데이터를 꺼내 Body를 구성하도록 수정합니다. + return objectMapper.createObjectNode(); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java index d26d3730..f1bd5509 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/KeywordSearchBodyBuilder.java @@ -1,28 +1,32 @@ package site.icebang.domain.workflow.runner.body; +import java.util.Map; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.Task; -import java.util.Map; @Component @RequiredArgsConstructor public class KeywordSearchBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "키워드 검색 태스크"; + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "키워드 검색 태스크"; - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } - @Override - public ObjectNode build(Task task, Map workflowContext) { - // 이 Task는 항상 정적인 Body를 가집니다. - return objectMapper.createObjectNode().put("tag", "naver"); - } -} \ No newline at end of file + @Override + public ObjectNode build(Task task, Map workflowContext) { + // 이 Task는 항상 정적인 Body를 가집니다. + return objectMapper.createObjectNode().put("tag", "naver"); + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java index b634a383..610334cf 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/ProductMatchBodyBuilder.java @@ -1,42 +1,46 @@ package site.icebang.domain.workflow.runner.body; +import java.util.Map; +import java.util.Optional; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; + import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; + import site.icebang.domain.workflow.model.Task; -import java.util.Map; -import java.util.Optional; @Component @RequiredArgsConstructor public class ProductMatchBodyBuilder implements TaskBodyBuilder { - private final ObjectMapper objectMapper; - private static final String TASK_NAME = "상품 매칭 태스크"; - private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; - private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; - - @Override - public boolean supports(String taskName) { - return TASK_NAME.equals(taskName); - } - - @Override - public ObjectNode build(Task task, Map workflowContext) { - ObjectNode body = objectMapper.createObjectNode(); - - // 키워드 정보 가져오기 - Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) - .map(node -> node.path("keyword")) - .ifPresent(keywordNode -> body.set("keyword", keywordNode)); - - // 상품 검색 결과 정보 가져오기 - Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) - .map(node -> node.path("search_results")) - .ifPresent(resultsNode -> body.set("search_results", resultsNode)); - - return body; - } -} \ No newline at end of file + private final ObjectMapper objectMapper; + private static final String TASK_NAME = "상품 매칭 태스크"; + private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크"; + private static final String SEARCH_SOURCE_TASK = "상품 검색 태스크"; + + @Override + public boolean supports(String taskName) { + return TASK_NAME.equals(taskName); + } + + @Override + public ObjectNode build(Task task, Map workflowContext) { + ObjectNode body = objectMapper.createObjectNode(); + + // 키워드 정보 가져오기 + Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK)) + .map(node -> node.path("keyword")) + .ifPresent(keywordNode -> body.set("keyword", keywordNode)); + + // 상품 검색 결과 정보 가져오기 + Optional.ofNullable(workflowContext.get(SEARCH_SOURCE_TASK)) + .map(node -> node.path("search_results")) + .ifPresent(resultsNode -> body.set("search_results", resultsNode)); + + return body; + } +} diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java index 920b89d3..0d807d86 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/runner/body/TaskBodyBuilder.java @@ -1,24 +1,28 @@ package site.icebang.domain.workflow.runner.body; +import java.util.Map; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; + import site.icebang.domain.workflow.model.Task; -import java.util.Map; public interface TaskBodyBuilder { - /** - * 이 빌더가 어떤 Task를 지원하는지 식별합니다. - * @param taskName Task의 고유한 이름 - * @return 지원하면 true, 아니면 false - */ - boolean supports(String taskName); + /** + * 이 빌더가 어떤 Task를 지원하는지 식별합니다. + * + * @param taskName Task의 고유한 이름 + * @return 지원하면 true, 아니면 false + */ + boolean supports(String taskName); - /** - * 실제 API 요청에 사용될 Body를 생성합니다. - * @param task DB에 저장된 Task의 원본 정의 - * @param workflowContext 이전 Task들의 결과가 담긴 컨텍스트 - * @return 생성된 JSON Body - */ - ObjectNode build(Task task, Map workflowContext); -} \ No newline at end of file + /** + * 실제 API 요청에 사용될 Body를 생성합니다. + * + * @param task DB에 저장된 Task의 원본 정의 + * @param workflowContext 이전 Task들의 결과가 담긴 컨텍스트 + * @return 생성된 JSON Body + */ + ObjectNode build(Task task, Map workflowContext); +}