Skip to content

Commit 00927cc

Browse files
committed
Added a service class to abstract time to improve testing
1 parent ca49d63 commit 00927cc

32 files changed

+195
-113
lines changed

core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.netflix.conductor.metrics.Monitors;
4848
import com.netflix.conductor.model.TaskModel;
4949
import com.netflix.conductor.model.WorkflowModel;
50+
import com.netflix.conductor.service.TimeService;
5051

5152
import com.fasterxml.jackson.core.JsonProcessingException;
5253
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -285,9 +286,9 @@ private void externalizeWorkflowData(WorkflowModel workflowModel) {
285286
* @return the id of the updated workflow
286287
*/
287288
public String updateWorkflow(WorkflowModel workflowModel) {
288-
workflowModel.setUpdatedTime(System.currentTimeMillis());
289+
workflowModel.setUpdatedTime(TimeService.currentTimeMillis());
289290
if (workflowModel.getStatus().isTerminal()) {
290-
workflowModel.setEndTime(System.currentTimeMillis());
291+
workflowModel.setEndTime(TimeService.currentTimeMillis());
291292
}
292293
externalizeWorkflowData(workflowModel);
293294
executionDAO.updateWorkflow(workflowModel);
@@ -498,10 +499,10 @@ public void updateTask(TaskModel taskModel) {
498499
if (taskModel.getStatus() != null) {
499500
if (!taskModel.getStatus().isTerminal()
500501
|| (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) {
501-
taskModel.setUpdateTime(System.currentTimeMillis());
502+
taskModel.setUpdateTime(TimeService.currentTimeMillis());
502503
}
503504
if (taskModel.getStatus().isTerminal() && taskModel.getEndTime() == 0) {
504-
taskModel.setEndTime(System.currentTimeMillis());
505+
taskModel.setEndTime(TimeService.currentTimeMillis());
505506
}
506507
}
507508
externalizeTaskData(taskModel);
@@ -561,7 +562,7 @@ private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean arc
561562
}
562563

563564
public void extendLease(TaskModel taskModel) {
564-
taskModel.setUpdateTime(System.currentTimeMillis());
565+
taskModel.setUpdateTime(TimeService.currentTimeMillis());
565566
executionDAO.updateTask(taskModel);
566567
}
567568

core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.netflix.conductor.metrics.Monitors;
4545
import com.netflix.conductor.service.ExecutionService;
4646
import com.netflix.conductor.service.MetadataService;
47+
import com.netflix.conductor.service.TimeService;
4748

4849
import com.fasterxml.jackson.databind.ObjectMapper;
4950
import com.spotify.futures.CompletableFutures;
@@ -181,7 +182,7 @@ protected List<EventExecution> executeEvent(String event, Message msg) throws Ex
181182
if (!success) {
182183
String id = msg.getId() + "_" + 0;
183184
EventExecution eventExecution = new EventExecution(id, msg.getId());
184-
eventExecution.setCreated(System.currentTimeMillis());
185+
eventExecution.setCreated(TimeService.currentTimeMillis());
185186
eventExecution.setEvent(eventHandler.getEvent());
186187
eventExecution.setName(eventHandler.getName());
187188
eventExecution.setStatus(Status.SKIPPED);
@@ -240,7 +241,7 @@ protected CompletableFuture<List<EventExecution>> executeActionsForEventHandler(
240241
for (Action action : eventHandler.getActions()) {
241242
String id = msg.getId() + "_" + i++;
242243
EventExecution eventExecution = new EventExecution(id, msg.getId());
243-
eventExecution.setCreated(System.currentTimeMillis());
244+
eventExecution.setCreated(TimeService.currentTimeMillis());
244245
eventExecution.setEvent(eventHandler.getEvent());
245246
eventExecution.setName(eventHandler.getName());
246247
eventExecution.setAction(action.getAction());

core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.netflix.conductor.metrics.Monitors;
2626
import com.netflix.conductor.model.TaskModel;
2727
import com.netflix.conductor.model.WorkflowModel;
28+
import com.netflix.conductor.service.TimeService;
2829

2930
@Component
3031
public class AsyncSystemTaskExecutor {
@@ -147,7 +148,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
147148
}
148149

149150
if (task.getStatus() == TaskModel.Status.SCHEDULED) {
150-
task.setStartTime(System.currentTimeMillis());
151+
task.setStartTime(TimeService.currentTimeMillis());
151152
Monitors.recordQueueWaitTime(task.getTaskType(), task.getQueueWaitTime());
152153
systemTask.start(workflow, task, workflowExecutor);
153154
} else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) {
@@ -160,7 +161,7 @@ public void execute(WorkflowSystemTask systemTask, String taskId) {
160161
shouldRemoveTaskFromQueue = true;
161162
hasTaskExecutionCompleted = true;
162163
} else if (task.getStatus().isTerminal()) {
163-
task.setEndTime(System.currentTimeMillis());
164+
task.setEndTime(TimeService.currentTimeMillis());
164165
shouldRemoveTaskFromQueue = true;
165166
hasTaskExecutionCompleted = true;
166167
} else {

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.netflix.conductor.model.TaskModel;
5353
import com.netflix.conductor.model.WorkflowModel;
5454
import com.netflix.conductor.service.ExecutionLockService;
55+
import com.netflix.conductor.service.TimeService;
5556

5657
import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;
5758
import static com.netflix.conductor.model.TaskModel.Status.*;
@@ -87,7 +88,7 @@ public class WorkflowExecutorOps implements WorkflowExecutor {
8788
private final Predicate<PollData> validateLastPolledTime =
8889
pollData ->
8990
pollData.getLastPollTime()
90-
> System.currentTimeMillis() - activeWorkerLastPollMs;
91+
> TimeService.currentTimeMillis() - activeWorkerLastPollMs;
9192

9293
public WorkflowExecutorOps(
9394
DeciderService deciderService,
@@ -227,7 +228,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) {
227228
workflow.getTasks().clear();
228229
workflow.setReasonForIncompletion(null);
229230
workflow.setFailedTaskId(null);
230-
workflow.setCreateTime(System.currentTimeMillis());
231+
workflow.setCreateTime(TimeService.currentTimeMillis());
231232
workflow.setEndTime(0);
232233
workflow.setLastRetriedTime(0);
233234
// Change the status to running
@@ -320,7 +321,7 @@ private void updateAndPushParents(WorkflowModel workflow, String operation) {
320321
WorkflowModel parentWorkflow =
321322
executionDAOFacade.getWorkflowModel(parentWorkflowId, true);
322323
parentWorkflow.setStatus(WorkflowModel.Status.RUNNING);
323-
parentWorkflow.setLastRetriedTime(System.currentTimeMillis());
324+
parentWorkflow.setLastRetriedTime(TimeService.currentTimeMillis());
324325
executionDAOFacade.updateWorkflow(parentWorkflow);
325326

326327
try {
@@ -394,7 +395,7 @@ private void retry(WorkflowModel workflow) {
394395
// Update Workflow with new status.
395396
// This should load Workflow from archive, if archived.
396397
workflow.setStatus(WorkflowModel.Status.RUNNING);
397-
workflow.setLastRetriedTime(System.currentTimeMillis());
398+
workflow.setLastRetriedTime(TimeService.currentTimeMillis());
398399
String lastReasonForIncompletion = workflow.getReasonForIncompletion();
399400
workflow.setReasonForIncompletion(null);
400401
// Add to decider queue
@@ -816,7 +817,7 @@ public TaskModel updateTask(TaskResult taskResult) {
816817
}
817818

818819
if (task.getStatus().isTerminal()) {
819-
task.setEndTime(System.currentTimeMillis());
820+
task.setEndTime(TimeService.currentTimeMillis());
820821
}
821822

822823
// Update message in Task queue based on Task status
@@ -1327,7 +1328,7 @@ public void resumeWorkflow(String workflowId) {
13271328
+ workflow.getStatus().name());
13281329
}
13291330
workflow.setStatus(WorkflowModel.Status.RUNNING);
1330-
workflow.setLastRetriedTime(System.currentTimeMillis());
1331+
workflow.setLastRetriedTime(TimeService.currentTimeMillis());
13311332
// Add to decider queue
13321333
queueDAO.push(
13331334
DECIDER_QUEUE,
@@ -1392,7 +1393,7 @@ public void skipTaskFromWorkflow(
13921393
taskToBeSkipped.setWorkflowInstanceId(workflowId);
13931394
taskToBeSkipped.setWorkflowPriority(workflow.getPriority());
13941395
taskToBeSkipped.setStatus(SKIPPED);
1395-
taskToBeSkipped.setEndTime(System.currentTimeMillis());
1396+
taskToBeSkipped.setEndTime(TimeService.currentTimeMillis());
13961397
taskToBeSkipped.setTaskType(workflowTask.getName());
13971398
taskToBeSkipped.setCorrelationId(workflow.getCorrelationId());
13981399
if (skipTaskRequest != null) {
@@ -1552,7 +1553,7 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
15521553
if (task.getStatus() != null
15531554
&& !task.getStatus().isTerminal()
15541555
&& task.getStartTime() == 0) {
1555-
task.setStartTime(System.currentTimeMillis());
1556+
task.setStartTime(TimeService.currentTimeMillis());
15561557
}
15571558
if (!workflowSystemTask.isAsync()) {
15581559
try {
@@ -1754,7 +1755,7 @@ private boolean rerunWF(
17541755
}
17551756
workflow.setTasks(filteredTasks);
17561757
// reset fields before restarting the task
1757-
rerunFromTask.setScheduledTime(System.currentTimeMillis());
1758+
rerunFromTask.setScheduledTime(TimeService.currentTimeMillis());
17581759
rerunFromTask.setStartTime(0);
17591760
rerunFromTask.setUpdateTime(0);
17601761
rerunFromTask.setEndTime(0);
@@ -1764,7 +1765,7 @@ private boolean rerunWF(
17641765
if (rerunFromTask.getTaskType().equalsIgnoreCase(TaskType.TASK_TYPE_SUB_WORKFLOW)) {
17651766
// if task is sub workflow set task as IN_PROGRESS and reset start time
17661767
rerunFromTask.setStatus(IN_PROGRESS);
1767-
rerunFromTask.setStartTime(System.currentTimeMillis());
1768+
rerunFromTask.setStartTime(TimeService.currentTimeMillis());
17681769
} else {
17691770
if (taskInput != null) {
17701771
rerunFromTask.setInputData(taskInput);
@@ -1902,7 +1903,7 @@ public String startWorkflow(StartWorkflowInput input) {
19021903
workflow.setParentWorkflowId(input.getParentWorkflowId());
19031904
workflow.setParentWorkflowTaskId(input.getParentWorkflowTaskId());
19041905
workflow.setOwnerApp(WorkflowContext.get().getClientApp());
1905-
workflow.setCreateTime(System.currentTimeMillis());
1906+
workflow.setCreateTime(TimeService.currentTimeMillis());
19061907
workflow.setUpdatedBy(null);
19071908
workflow.setUpdatedTime(null);
19081909
workflow.setEvent(input.getEvent());

core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.netflix.conductor.core.exception.TerminateWorkflowException;
3333
import com.netflix.conductor.model.TaskModel;
3434
import com.netflix.conductor.model.WorkflowModel;
35+
import com.netflix.conductor.service.TimeService;
3536

3637
/**
3738
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
@@ -88,7 +89,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
8889
decisionTask.setTaskDefName(TaskType.TASK_TYPE_DECISION);
8990
decisionTask.addInput("case", caseValue);
9091
decisionTask.addOutput("caseOutput", Collections.singletonList(caseValue));
91-
decisionTask.setStartTime(System.currentTimeMillis());
92+
decisionTask.setStartTime(TimeService.currentTimeMillis());
9293
decisionTask.setStatus(TaskModel.Status.IN_PROGRESS);
9394
tasksToBeScheduled.add(decisionTask);
9495

core/src/main/java/com/netflix/conductor/core/execution/mapper/DoWhileTaskMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.netflix.conductor.dao.MetadataDAO;
2929
import com.netflix.conductor.model.TaskModel;
3030
import com.netflix.conductor.model.WorkflowModel;
31+
import com.netflix.conductor.service.TimeService;
3132

3233
/**
3334
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
@@ -85,7 +86,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
8586
TaskModel doWhileTask = taskMapperContext.createTaskModel();
8687
doWhileTask.setTaskType(TaskType.TASK_TYPE_DO_WHILE);
8788
doWhileTask.setStatus(TaskModel.Status.IN_PROGRESS);
88-
doWhileTask.setStartTime(System.currentTimeMillis());
89+
doWhileTask.setStartTime(TimeService.currentTimeMillis());
8990
doWhileTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
9091
doWhileTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
9192
doWhileTask.setRetryCount(taskMapperContext.getRetryCount());

core/src/main/java/com/netflix/conductor/core/execution/mapper/ExclusiveJoinTaskMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.netflix.conductor.common.metadata.tasks.TaskType;
2424
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
2525
import com.netflix.conductor.model.TaskModel;
26+
import com.netflix.conductor.service.TimeService;
2627

2728
@Component
2829
public class ExclusiveJoinTaskMapper implements TaskMapper {
@@ -51,7 +52,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
5152
TaskModel joinTask = taskMapperContext.createTaskModel();
5253
joinTask.setTaskType(TaskType.TASK_TYPE_EXCLUSIVE_JOIN);
5354
joinTask.setTaskDefName(TaskType.TASK_TYPE_EXCLUSIVE_JOIN);
54-
joinTask.setStartTime(System.currentTimeMillis());
55+
joinTask.setStartTime(TimeService.currentTimeMillis());
5556
joinTask.setInputData(joinInput);
5657
joinTask.setStatus(TaskModel.Status.IN_PROGRESS);
5758

core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.netflix.conductor.dao.MetadataDAO;
4141
import com.netflix.conductor.model.TaskModel;
4242
import com.netflix.conductor.model.WorkflowModel;
43+
import com.netflix.conductor.service.TimeService;
4344

4445
import com.fasterxml.jackson.core.type.TypeReference;
4546
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -289,8 +290,8 @@ TaskModel createDynamicForkTask(
289290
TaskModel forkDynamicTask = taskMapperContext.createTaskModel();
290291
forkDynamicTask.setTaskType(TaskType.TASK_TYPE_FORK);
291292
forkDynamicTask.setTaskDefName(TaskType.TASK_TYPE_FORK);
292-
forkDynamicTask.setStartTime(System.currentTimeMillis());
293-
forkDynamicTask.setEndTime(System.currentTimeMillis());
293+
forkDynamicTask.setStartTime(TimeService.currentTimeMillis());
294+
forkDynamicTask.setEndTime(TimeService.currentTimeMillis());
294295
List<String> forkedTaskNames =
295296
dynForkTasks.stream()
296297
.map(WorkflowTask::getTaskReferenceName)
@@ -329,8 +330,8 @@ TaskModel createJoinTask(
329330
joinTask.setWorkflowInstanceId(workflowModel.getWorkflowId());
330331
joinTask.setWorkflowType(workflowModel.getWorkflowName());
331332
joinTask.setCorrelationId(workflowModel.getCorrelationId());
332-
joinTask.setScheduledTime(System.currentTimeMillis());
333-
joinTask.setStartTime(System.currentTimeMillis());
333+
joinTask.setScheduledTime(TimeService.currentTimeMillis());
334+
joinTask.setStartTime(TimeService.currentTimeMillis());
334335
joinTask.setInputData(joinInput);
335336
joinTask.setTaskId(idGenerator.generate());
336337
joinTask.setStatus(TaskModel.Status.IN_PROGRESS);

core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinTaskMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.netflix.conductor.core.exception.TerminateWorkflowException;
2828
import com.netflix.conductor.model.TaskModel;
2929
import com.netflix.conductor.model.WorkflowModel;
30+
import com.netflix.conductor.service.TimeService;
3031

3132
/**
3233
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
@@ -74,7 +75,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
7475
TaskModel forkTask = taskMapperContext.createTaskModel();
7576
forkTask.setTaskType(TaskType.TASK_TYPE_FORK);
7677
forkTask.setTaskDefName(TaskType.TASK_TYPE_FORK);
77-
long epochMillis = System.currentTimeMillis();
78+
long epochMillis = TimeService.currentTimeMillis();
7879
forkTask.setStartTime(epochMillis);
7980
forkTask.setEndTime(epochMillis);
8081
forkTask.setInputData(taskInput);

core/src/main/java/com/netflix/conductor/core/execution/mapper/HumanTaskMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.netflix.conductor.core.utils.ParametersUtils;
2626
import com.netflix.conductor.model.TaskModel;
2727
import com.netflix.conductor.model.WorkflowModel;
28+
import com.netflix.conductor.service.TimeService;
2829

2930
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_HUMAN;
3031

@@ -66,7 +67,7 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
6667
TaskModel humanTask = taskMapperContext.createTaskModel();
6768
humanTask.setTaskType(TASK_TYPE_HUMAN);
6869
humanTask.setInputData(humanTaskInput);
69-
humanTask.setStartTime(System.currentTimeMillis());
70+
humanTask.setStartTime(TimeService.currentTimeMillis());
7071
humanTask.setStatus(TaskModel.Status.IN_PROGRESS);
7172
return List.of(humanTask);
7273
}

0 commit comments

Comments
 (0)