Skip to content

Commit

Permalink
code-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
antmendoza committed May 29, 2024
1 parent cd5b7c9 commit b2a4f67
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 68 deletions.
28 changes: 28 additions & 0 deletions core/src/main/java/io/temporal/samples/taskinteraction/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.temporal.samples.taskinteraction;

import java.util.Objects;

public class Task {

private String token;
Expand All @@ -39,6 +41,19 @@ public TaskTitle getTitle() {
return title;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof Task)) return false;
final Task task = (Task) o;
return Objects.equals(token, task.token) && Objects.equals(title, task.title);
}

@Override
public int hashCode() {
return Objects.hash(token, title);
}

@Override
public String toString() {
return "Task{" + "token='" + token + '\'' + ", title=" + title + '}';
Expand All @@ -62,6 +77,19 @@ public void setValue(final String value) {
this.value = value;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof TaskTitle)) return false;
final TaskTitle taskTitle = (TaskTitle) o;
return Objects.equals(value, taskTitle.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}

@Override
public String toString() {
return "TaskTitle{" + "value='" + value + '\'' + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,53 @@ public class TaskService<R> {
ActivityTask.class,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(5)).build());

private final Map<String, CompletablePromise<R>> pendingPromises = new HashMap<>();
private final Logger logger = Workflow.getLogger(TaskService.class);
private final TaskClient listener =
taskToken -> {
logger.info("Completing task with token: " + taskToken);
private TaskManager tasksManager = new TaskManager();

final CompletablePromise<R> completablePromise = pendingPromises.get(taskToken);
completablePromise.complete(null);
};
private final Logger logger = Workflow.getLogger(TaskService.class);

public TaskService() {
Workflow.registerListener(listener);

// This listener exposes a signal method that clients use to notify the task has been completed
Workflow.registerListener(
new TaskClient() {
@Override
public void completeTaskByToken(final String taskToken) {
logger.info("Completing task with token: " + taskToken);
tasksManager.completeTask(taskToken);
}
});
}

public void executeTask(Task task) {

logger.info("Before creating task : " + task);
final String token = task.getToken();

// Activity implementation is responsible for registering the task to the external service
// (which is responsible for managing the task life-cycle)
activity.createTask(task);

logger.info("Task created: " + task);

final CompletablePromise<R> promise = Workflow.newPromise();
pendingPromises.put(token, promise);

// Wait promise to complete or fail
promise.get();
tasksManager.waitForTaskCompletion(task);

logger.info("Task completed: " + task);
}

private class TaskManager {

private final Map<String, CompletablePromise<R>> tasks = new HashMap<>();

public void waitForTaskCompletion(final Task task) {
final CompletablePromise<R> promise = Workflow.newPromise();
tasks.put(task.getToken(), promise);
// Wait promise to complete
promise.get();
}

public void completeTask(final String taskToken) {

final CompletablePromise<R> completablePromise = tasks.get(taskToken);
completablePromise.complete(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface WorkflowTaskManager {
String WORKFLOW_ID = WorkflowTaskManager.class.getSimpleName();

@WorkflowMethod
void execute(List<Task> inputPendingTask, List<String> inputTaskToComplete);
void execute(final WorkflowTaskManagerImpl.PendingTasks pendingTasks);

@UpdateMethod
void createTask(Task task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,85 +20,94 @@
package io.temporal.samples.taskinteraction;

import io.temporal.workflow.Workflow;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.StringTokenizer;

public class WorkflowTaskManagerImpl implements WorkflowTaskManager {

private List<Task> pendingTask;

private List<String> tasksToComplete;
private PendingTasks pendingTasks = new PendingTasks();

@Override
public void execute(List<Task> inputPendingTask, List<String> inputTaskToComplete) {
initPendingTasks(inputPendingTask);
initTaskToComplete(inputTaskToComplete);

while (true) {

Workflow.await(
() ->
// Wait until there are pending task to complete
!tasksToComplete.isEmpty());
public void execute(final PendingTasks inputPendingTasks) {

final String taskToken = tasksToComplete.remove(0);
initTaskList(inputPendingTasks);

// Find the workflow id of the workflow we have to signal back
final String externalWorkflowId = new StringTokenizer(taskToken, "_").nextToken();
Workflow.await(() -> Workflow.getInfo().isContinueAsNewSuggested());

Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
.completeTaskByToken(taskToken);

final Task task = getPendingTaskWithToken(taskToken).get();
pendingTask.remove(task);

if (Workflow.getInfo().isContinueAsNewSuggested()) {
Workflow.newContinueAsNewStub(WorkflowTaskManager.class)
.execute(pendingTask, tasksToComplete);
}
}
Workflow.newContinueAsNewStub(WorkflowTaskManager.class).execute(this.pendingTasks);
}

@Override
public void createTask(Task task) {
initPendingTasks(new ArrayList<>());
pendingTask.add(task);
initTaskList(new PendingTasks());
pendingTasks.addTask(task);
}

@Override
public void completeTaskByToken(String taskToken) {

tasksToComplete.add(taskToken);
Task task = this.pendingTasks.filterTaskByToken(taskToken).get();

Workflow.await(
() -> {
final boolean taskCompleted =
getPendingTask().stream().noneMatch((t) -> Objects.equals(t.getToken(), taskToken));
final String externalWorkflowId = extractWorkflowIdFromTaskToken(taskToken);

return taskCompleted;
});
// Signal back to the workflow that started this task to notify that the task was completed
Workflow.newExternalWorkflowStub(TaskClient.class, externalWorkflowId)
.completeTaskByToken(taskToken);

this.pendingTasks.markTaskAsCompleted(task);
}

@Override
public List<Task> getPendingTask() {
return pendingTask;
return pendingTasks.getTasks();
}

private Optional<Task> getPendingTaskWithToken(final String taskToken) {
return pendingTask.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst();
}
private void initTaskList(final PendingTasks pendingTasks) {
this.pendingTasks = this.pendingTasks == null ? new PendingTasks() : this.pendingTasks;

private void initTaskToComplete(final List<String> tasks) {
if (tasksToComplete == null) {
tasksToComplete = new ArrayList<>();
// Update method addTask can be invoked before the main workflow method.
if (pendingTasks != null) {
this.pendingTasks.addAll(pendingTasks.getTasks());
}
tasksToComplete.addAll(tasks);
}

private void initPendingTasks(final List<Task> tasks) {
private String extractWorkflowIdFromTaskToken(final String taskToken) {
return new StringTokenizer(taskToken, "_").nextToken();
}

public static class PendingTasks {
private final List<Task> tasks;

public PendingTasks() {
this(new ArrayList<>());
}

public PendingTasks(final List<Task> tasks) {
this.tasks = tasks;
}

public void addTask(final Task task) {
this.tasks.add(task);
}

public void addAll(final List<Task> tasks) {
this.tasks.addAll(tasks);
}

public void markTaskAsCompleted(final Task task) {
// For the sake of simplicity, we delete the task if it is marked as completed.
// Nothing stops us from having a field to track the tasks' state
tasks.remove(task);
}

private Optional<Task> filterTaskByToken(final String taskToken) {
return tasks.stream().filter((t) -> t.getToken().equals(taskToken)).findFirst();
}

if (pendingTask == null) {
pendingTask = new ArrayList<>();
private List<Task> getTasks() {
return tasks;
}
pendingTask.addAll(tasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.temporal.client.WorkflowOptions;
import io.temporal.samples.taskinteraction.Task;
import io.temporal.samples.taskinteraction.WorkflowTaskManager;
import java.util.ArrayList;
import io.temporal.samples.taskinteraction.WorkflowTaskManagerImpl;

public class ActivityTaskImpl implements ActivityTask {

Expand All @@ -35,11 +35,13 @@ public ActivityTaskImpl(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}

// This activity is responsible for registering the task to the external service
@Override
public void createTask(Task task) {

final String taskQueue = Activity.getExecutionContext().getInfo().getActivityTaskQueue();

// In this case the service that manages the task life-cycle is another workflow.
final WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder()
.setWorkflowId(WorkflowTaskManager.WORKFLOW_ID)
Expand All @@ -49,13 +51,13 @@ public void createTask(Task task) {
final WorkflowTaskManager taskManager =
workflowClient.newWorkflowStub(WorkflowTaskManager.class, workflowOptions);
try {
WorkflowClient.start(taskManager::execute, new ArrayList<>(), new ArrayList<>());
WorkflowClient.start(taskManager::execute, new WorkflowTaskManagerImpl.PendingTasks());
} catch (WorkflowExecutionAlreadyStarted e) {
// expected exception if workflow was started by a previous activity execution.
// This will be handled differently once updateWithStart is implemented
}

// register the "task" to the external workflow that manages task lifecycle
// Register the "task" to the external workflow and return
taskManager.createTask(task);
}
}

0 comments on commit b2a4f67

Please sign in to comment.