Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public class TaskDto {
private Long id;
private String name;
private String type;
private Integer executionOrder;
private JsonNode settings;
private JsonNode parameters;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;

private Integer executionOrder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class Task {
/** Task 실행에 필요한 파라미터 (JSON) 예: {"url": "http://...", "method": "POST", "body": {...}} */
private JsonNode parameters;

private JsonNode settings;

private LocalDateTime createdAt;

private LocalDateTime updatedAt;
Expand All @@ -32,6 +34,7 @@ public Task(TaskDto taskDto) {
this.id = taskDto.getId();
this.name = taskDto.getName();
this.type = taskDto.getType();
this.settings = taskDto.getSettings();
this.parameters = taskDto.getParameters();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public ObjectNode build(Task task, Map<String, JsonNode> workflowContext) {
.filter(node -> !node.isMissingNode())
.ifPresent(tagsNode -> body.set("post_tags", tagsNode));
});

body.put("tag", "Blogger");
body.put("blog_id", "");
body.put("blog_pw", "");
String blog_name = task.getSettings().path("blog_name").asText("");
body.put("tag", task.getSettings().get("tag").asText());
body.put("blog_name", blog_name);
body.put("blog_id", task.getSettings().get("blog_id").asText());
body.put("blog_pw", task.getSettings().get("blog_pw").asText());

return body;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public boolean supports(String taskName) {
@Override
public ObjectNode build(Task task, Map<String, JsonNode> workflowContext) {
// 이 Task는 항상 정적인 Body를 가집니다.
return objectMapper.createObjectNode().put("tag", "naver");
String tag = task.getSettings().get("tag").asText();
return objectMapper.createObjectNode().put("tag", tag);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package site.icebang.domain.workflow.service;

import java.math.BigInteger;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -11,6 +12,8 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -19,11 +22,9 @@

import site.icebang.domain.workflow.dto.JobDto;
import site.icebang.domain.workflow.dto.TaskDto;
import site.icebang.domain.workflow.dto.WorkflowDetailCardDto;
import site.icebang.domain.workflow.manager.ExecutionMdcManager;
import site.icebang.domain.workflow.mapper.JobMapper;
import site.icebang.domain.workflow.mapper.JobRunMapper;
import site.icebang.domain.workflow.mapper.TaskRunMapper;
import site.icebang.domain.workflow.mapper.WorkflowRunMapper;
import site.icebang.domain.workflow.mapper.*;
import site.icebang.domain.workflow.model.Job;
import site.icebang.domain.workflow.model.JobRun;
import site.icebang.domain.workflow.model.Task;
Expand All @@ -44,6 +45,7 @@ public class WorkflowExecutionService {
private final List<TaskBodyBuilder> bodyBuilders;
private final ExecutionMdcManager mdcManager;
private final TaskExecutionService taskExecutionService;
private final WorkflowMapper workflowMapper;

@Transactional
@Async("traceExecutor")
Expand All @@ -55,7 +57,9 @@ public void executeWorkflow(Long workflowId) {
workflowRunMapper.insert(workflowRun);

Map<String, JsonNode> workflowContext = new HashMap<>();

WorkflowDetailCardDto settings =
workflowMapper.selectWorkflowDetailById(BigInteger.valueOf(workflowId));
JsonNode setting = objectMapper.readTree(settings.getDefaultConfig());
// 📌 Mapper로부터 JobDto 리스트를 조회합니다.
List<JobDto> jobDtos = jobMapper.findJobsByWorkflowId(workflowId);
// 📌 JobDto를 execution_order 기준으로 정렬합니다.
Expand All @@ -78,7 +82,7 @@ public void executeWorkflow(Long workflowId) {
workflowLogger.info(
"---------- Job 실행 시작: JobId={}, JobRunId={} ----------", job.getId(), jobRun.getId());

boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext);
boolean jobSucceeded = executeTasksForJob(jobRun, workflowContext, setting);
jobRun.finish(jobSucceeded ? "SUCCESS" : "FAILED");
jobRunMapper.update(jobRun);

Expand All @@ -96,13 +100,25 @@ public void executeWorkflow(Long workflowId) {
"========== 워크플로우 실행 {} : WorkflowRunId={} ==========",
hasAnyJobFailed ? "실패" : "성공",
workflowRun.getId());
} catch (JsonMappingException e) {
throw new RuntimeException(e);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
mdcManager.clearExecutionContext();
}
}

private boolean executeTasksForJob(JobRun jobRun, Map<String, JsonNode> workflowContext) {
private boolean executeTasksForJob(
JobRun jobRun, Map<String, JsonNode> workflowContext, JsonNode setting) {
List<TaskDto> taskDtos = jobMapper.findTasksByJobId(jobRun.getJobId());
for (TaskDto taskDto : taskDtos) {
String taskId = taskDto.getId().toString();
JsonNode settingForTask = setting.get(taskId);
if (settingForTask != null) {
taskDto.setSettings(settingForTask);
}
}
taskDtos.sort(
Comparator.comparing(
TaskDto::getExecutionOrder, Comparator.nullsLast(Comparator.naturalOrder()))
Expand Down
2 changes: 2 additions & 0 deletions apps/user-service/src/main/resources/sql/01-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,5 @@ CREATE INDEX idx_log_level_status ON execution_log(log_level, status);
CREATE INDEX idx_error_code ON execution_log(error_code);
CREATE INDEX idx_duration ON execution_log(duration_ms);
CREATE INDEX idx_execution_type_source ON execution_log(execution_type, source_id);


Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ DELETE FROM `workflow`;
-- 워크플로우 생성 (ID: 1)
INSERT INTO `workflow` (`id`, `name`, `description`, `created_by`, `default_config`) VALUES
(1, '상품 분석 및 블로그 자동 발행', '키워드 검색부터 상품 분석 후 블로그 발행까지의 자동화 프로세스', 1,
JSON_OBJECT('keyword_search',json_object('tag','naver'),'blog_publish',json_object('tag','naver_blog','blog_id', 'wtecho331', 'blog_pw', 'testpass')))
JSON_OBJECT('1',json_object('tag','naver'),'8',json_object('tag','naver_blog','blog_id', 'wtecho331', 'blog_pw', 'testpass')))
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description),
Expand Down
Loading