Skip to content

design: Quarkus Flow integration — async dispatch, callback state, and sub-workflow context (request for Quarkus Flow team input) #213

@mdproctor

Description

@mdproctor

casehub ↔ Quarkus Flow integration — complete implementation specification

This issue documents everything needed to implement the casehub FlowWorker integration with Quarkus Flow. All APIs confirmed from source: serverlessworkflow-impl-core 7.17.1.Final and quarkus-flow core/runtime.


Key finding: no blocking, no framework changes needed

The original concerns about blocking were unfounded. Quarkus Flow already ships Uni2CompletableFuture and Multi2CompletableFuture as DataTypeConverter service providers (core/runtime/.../converters/). Any function bean returning Uni<T> or Multi<T> is automatically converted to CompletableFuture<T> and awaited asynchronously by the Serverless Workflow engine. No thread is blocked. No callback state complexity. No virtual threads required.


1. WorkflowExecutionListener — registration and full API

Registration (CDI, automatic)

@ApplicationScoped
public class CasehubWorkflowExecutionListener implements WorkflowExecutionListener {
    // picked up automatically by WorkflowApplicationRecorder:
    // container.select(WorkflowExecutionListener.class, Any.Literal.INSTANCE)
    //          .stream().forEach(listener -> builder.withListener(listener));
}

No ServiceLoader, no configuration. Any CDI bean implementing WorkflowExecutionListener is bound automatically.

Full interface (source confirmed, 7.17.1.Final)

public interface WorkflowExecutionListener extends AutoCloseable, ServicePriority {
    default void onWorkflowStarted(WorkflowStartedEvent event);
    default void onWorkflowSuspended(WorkflowSuspendedEvent event);
    default void onWorkflowResumed(WorkflowResumedEvent event);
    default void onWorkflowCompleted(WorkflowCompletedEvent event);
    default void onWorkflowFailed(WorkflowFailedEvent event);
    default void onWorkflowCancelled(WorkflowCancelledEvent event);
    default void onTaskStarted(TaskStartedEvent event);
    default void onTaskCompleted(TaskCompletedEvent event);
    default void onTaskFailed(TaskFailedEvent event);
    default void onTaskSuspended(TaskSuspendedEvent event);
    default void onTaskResumed(TaskResumedEvent event);
    default void onTaskRetried(TaskRetriedEvent event);
    default void onWorkflowStatusChanged(WorkflowStatusEvent event);
}

All methods are default (no-op) — implement only what's needed.


2. WorkflowContext is mutable — context injection confirmed

WorkflowContext (the concrete class implementing WorkflowContextData) has a setter:

public class WorkflowContext implements WorkflowContextData {
    private WorkflowModel context;                  // mutable field
    public WorkflowModel context() { ... }          // getter (on interface)
    public void context(WorkflowModel model) { ... } // setter (on concrete class)
    public WorkflowMutableInstance instance() { ... }
}

In onWorkflowStarted, casehub can cast event.workflowContext() to WorkflowContext and inject keys:

@Override
public void onWorkflowStarted(WorkflowStartedEvent event) {
    WorkflowContext ctx = (WorkflowContext) event.workflowContext();

    // Augment initial context with casehub propagation metadata
    Map<String, Object> current = new HashMap<>(
        ctx.context().asMap().orElse(Map.of()));
    current.put("__casehub.traceId", propagationContext.traceId());
    current.put("__casehub.causedBy", propagationContext.causedByEntryId().toString());
    ctx.context(modelFactory.from(current));  // inject before first task runs
}

WorkflowModelFactory.from(Map<String, Object>) creates a new WorkflowModel from a map. Inject via CDI.


3. Parent detection — TaskContext.parentContext

The parent reference is on TaskContext, not WorkflowInstanceData:

public class TaskContext implements TaskContextData {
    private final Optional<TaskContext> parentContext;  // present when nested
    private final String taskName;
    private final WorkflowPosition position;
    ...
}

In onTaskStarted, event.taskContext().parentContext() is present when the task is executing inside a sub-workflow or nested context. This tells casehub whether this workflow start is a top-level execution or a sub-workflow call, without needing a separate parent-workflow-id field.


4. Casehub dispatch function — non-blocking, works today

// CDI function bean — register with workflow via FuncDSL or YAML function reference
@ApplicationScoped
public class CasehubDispatchFunction {

    @Inject WorkOrchestrator workOrchestrator;
    // CaseInstance injected via CDI request scope or passed via workflow context

    public Uni<Map<String, Object>> dispatch(String capability, Map<String, Object> input) {
        return Uni.createFrom()
            .completionStage(() -> workOrchestrator.submit(caseInstance,
                WorkRequest.of(capability, input)))
            .map(WorkResult::output);
        // Uni2CompletableFuture converts this automatically
        // Engine awaits CompletableFuture asynchronously — no thread blocked
    }
}

5. Complete CasehubWorkflowExecutionListener implementation sketch

@ApplicationScoped
public class CasehubWorkflowExecutionListener implements WorkflowExecutionListener {

    @Inject WorkflowModelFactory modelFactory;
    @Inject EventLogRepository eventLogRepository;
    @Inject CompletionTracker completionTracker;  // casehub-engine epic #204
    @Inject PropagationContextHolder propagationContextHolder;

    @Override
    public void onWorkflowStarted(WorkflowStartedEvent event) {
        WorkflowContext ctx = (WorkflowContext) event.workflowContext();
        String instanceId = ctx.instanceData().id();

        // 1. Inject propagation metadata into workflow initial context
        Map<String, Object> augmented = new HashMap<>(
            ctx.context().asMap().orElse(Map.of()));
        augmented.put("__casehub.traceId", propagationContextHolder.current().traceId());
        augmented.put("__casehub.causedBy",
            propagationContextHolder.current().causedByEntryId().toString());
        ctx.context(modelFactory.from(augmented));

        // 2. Emit SPAWN_STARTED ledger entry (spawnType=WORKFLOW_STEP)
        //    causedByEntryId from propagationContextHolder
        eventLogRepository.append(buildSpawnStarted(instanceId, SpawnType.WORKFLOW_STEP));
    }

    @Override
    public void onWorkflowCompleted(WorkflowCompletedEvent event) {
        String instanceId = event.workflowContext().instanceData().id();
        Map<String, Object> output = event.workflowContext().instanceData()
            .output().asMap().orElse(Map.of());

        // Route output via CompletionTracker (handles returnTo routing automatically)
        completionTracker.recordCompletion(instanceId, output);
        // If sub-workflow: output routes to parent WorkflowContext (not root CaseContext)
        // If top-level FlowWorker: output routes to CaseContext via outputSchema JQ
    }

    @Override
    public void onWorkflowFailed(WorkflowFailedEvent event) {
        String instanceId = event.workflowContext().instanceData().id();
        completionTracker.recordFailure(instanceId, event.workflowContext().instanceData().status());
    }

    @Override
    public void onTaskStarted(TaskStartedEvent event) {
        String taskName = event.taskContext().taskName();
        boolean isNested = event.taskContext().parentContext().isPresent();

        // Emit per-task SPAWN_STARTED for observability (optional, configurable)
        // spawnType=AGENT_INVOCATION if langchain4j step, WORKFLOW_STEP otherwise
    }

    @Override
    public void onTaskCompleted(TaskCompletedEvent event) {
        String taskName = event.taskContext().taskName();
        Map<String, Object> output = event.taskContext().output().asMap().orElse(Map.of());

        // Emit SPAWN_COMPLETED ledger entry per task step (lineage epic #205)
        // This provides per-agent-invocation observability without a write-level hook
    }
}

6. Sub-workflow context propagation — fully solved

CaseContext (parent case)
  ── inputSchema JQ ──▶ WorkflowContext (FlowWorker)
     ← onWorkflowStarted: inject __casehub.traceId, __casehub.causedBy
       ── sub-workflow starts ──▶ WorkflowContext (sub-workflow)
          ← onWorkflowStarted: inject same metadata (inherited from parent via inputExpressions or augmented)
          ← onWorkflowCompleted: CompletionTracker.recordCompletion() routes to PARENT WorkflowContext
       ── sub-workflow completes ──▶ output merged to parent WorkflowContext
     ← onWorkflowCompleted: CompletionTracker.recordCompletion() routes to CaseContext
  ◀── outputSchema JQ ──

7. Durability (JVM restart)

quarkus-flow ships JPA (/persistence/jpa/) and Redis (/persistence/redis/) persistence modules. WorkflowInstanceData.id() is the correlation key. When the JVM restarts:

  • If persistence is active: workflow resumes from last persisted state when casehub's WorkerExecutionRecoveryService re-fires the WorkerCompleted completion event on startup
  • If no persistence: workflow must replay from the beginning; casehub's recovery fires the event and the workflow re-executes

No changes to casehub or Quarkus Flow needed for basic restart recovery.


8. Outstanding question (one only)

Is the cast (WorkflowContext) event.workflowContext() safe in onWorkflowStarted?

WorkflowStartedEvent.workflowContext() returns WorkflowContextData (interface). The concrete class is WorkflowContext which has the mutable context(WorkflowModel) setter. If Quarkus Flow guarantees that the concrete type is always WorkflowContext at this point, the cast is safe. If sub-workflow invocations use a different concrete type, we need confirmation.


9. Related epics

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestmigrationCapability existing in casehub-poc, or planned in its design/architecture docs

Type

No type

Projects

Status

Todo

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions