diff --git a/packages/examples/package.json b/packages/examples/package.json index 74c7c58..0f25c97 100644 --- a/packages/examples/package.json +++ b/packages/examples/package.json @@ -10,7 +10,7 @@ "clean": "rm -rf dist && rm tsconfig.tsbuildinfo || true", "build": "tsc -b", "lint": "eslint", - "dev": "tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/auth/app.ts & tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/payment/app.ts & tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/versioning/app.ts" + "dev": "tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/auth/app.ts & tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/payment/app.ts & tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/versioning/app.ts & tsx watch --include ../restate-xstate/src --tsconfig ./tsconfig.dev.json ./src/syncWorkflows/app.ts" }, "dependencies": { "@restatedev/restate-sdk": "catalog:", diff --git a/packages/examples/src/syncWorkflows/app.ts b/packages/examples/src/syncWorkflows/app.ts new file mode 100644 index 0000000..e956029 --- /dev/null +++ b/packages/examples/src/syncWorkflows/app.ts @@ -0,0 +1,262 @@ +import * as restate from "@restatedev/restate-sdk"; +import { xstate } from "@restatedev/xstate"; +import { fromPromise } from "@restatedev/xstate/promise"; +import { createMachine, assign } from "xstate"; + +// Define the credit card charge state machine +const creditCardChargeMachine = createMachine( + { + id: "creditCardCharge", + initial: "idle", + types: {} as { + context: { + amount: number; + error: string | null; + authStatus?: string; + notifyStatus?: string; + }; + events: { type: "START"; amount: number } | { type: "RETRY" }; + }, + context: { + amount: 0, + error: null as string | null, + authStatus: undefined, + notifyStatus: undefined, + }, + states: { + idle: { + on: { + START: { + target: "authorizing", + // actions: assign({ amount: (_, event) => event.amount }), + }, + }, + }, + authorizing: { + invoke: { + src: "authorizeCard", + input: ({ event }) => (event as any).input, + onDone: { + target: "notifyUser", + actions: assign({ + authStatus: ({ event }) => event.output.status, + }), + }, + onError: { + target: "failed", + // actions: assign({ error: (_, event) => event.data }), + }, + }, + }, + notifyUser: { + invoke: { + src: "notifyUser", + onDone: { + target: "success", + actions: assign({ + notifyStatus: ({ event }) => event.output.status, + }), + }, + onError: { + target: "failed", + // actions: assign({ error: (_, event) => event.data }), + }, + }, + }, + success: { + type: "final", + }, + failed: { + type: "final", + }, + }, + }, + { + actors: { + authorizeCard: fromPromise(async ({ input }) => { + // Simulate authorization logic + // if (context.amount > 0) return true; + // throw "Authorization failed"; + console.log("Authorizing card for amount:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "auth-success" }; + }), + notifyUser: fromPromise(async ({ input }) => { + // Simulate charging logic + // if (input.amount < 10000) return true; + // throw "Charge declined"; + console.log("Notifying user:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "notify-success" }; + }), + }, + }, +); + +const transactionMachine = createMachine( + { + id: "transaction", + initial: "idle", + types: {} as { + context: { + error: string | null; + transactionStatus?: string; + }; + }, + context: { + error: null as string | null, + transactionStatus: undefined, + }, + states: { + idle: { + on: { + START: { + target: "commitTransaction", + }, + }, + }, + commitTransaction: { + tags: ["transactionCommit"], + initial: "step1", + states: { + step1: { + invoke: { + src: "commitStep1", + onDone: { + target: "step2", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + step2: { + tags: ["transactionStep2Commit"], + invoke: { + src: "commitStep2", + onDone: { + target: "step3", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + step3: { + invoke: { + src: "commitStep3", + onDone: { + target: "step4", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + step4: { + invoke: { + src: "commitStep4", + onDone: { + target: "#transaction.recordTransaction", + }, + onError: { + target: "#transaction.failed", + }, + }, + } + }, + }, + recordTransaction: { + invoke: { + src: "recordTransaction", + onDone: { + target: "success", + actions: assign({ + transactionStatus: ({ event }) => event.output.status, + }), + }, + onError: { + target: "failed", + }, + }, + }, + success: { + type: "final", + }, + failed: { + type: "final", + }, + }, + }, + { + actors: { + commitStep1: fromPromise(async ({ input }) => { + // Simulate Step1 async logic + console.log("Committing Step 1:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step1-success" }; + }), + commitStep2: fromPromise(async ({ input }) => { + // Simulate Step2 async logic + console.log("Committing Step 2:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step2-success" }; + }), + commitStep3: fromPromise(async ({ input }) => { + // Simulate Step3 async logic + console.log("Committing Step 3:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step3-success" }; + }), + commitStep4: fromPromise(async ({ input }) => { + // Simulate Step4 async logic + console.log("Committing Step 4:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step4-success" }; + }), + recordTransaction: fromPromise(async ({ input }) => { + // Simulate recording transaction logic + console.log("Recording transaction:", input); + await new Promise((resolve) => setTimeout(resolve, 4000)); + return { status: "transaction-recorded" }; + }), + }, + }, +); + +const creditCardChargeWithSync = xstate( + "creditCardCharge", + creditCardChargeMachine, + { + watcher: { + events: [{ event: "START", condition: "result", resultKey: "notifyStatus" }], + }, + }, +) as any; + +const transactionWithSync = xstate( + "transaction", + transactionMachine, + { + watcher: { + // events: [{ event: "START", condition: "tagCleared", observeTag: "transactionCommit" }, { event: "START", condition: "tagObserved", observeTag: "transactionStep2Commit" }], // Event response will be sent when the tag is cleared or observed based on client request + // events: [{ event: "START", condition: "tagCleared", observeTag: "transactionCommit" }], // Event response will be sent when the tag is cleared + // events: [{ event: "START", condition: "tagObserved", observedTag: "transactionStep2Commit" }], // Event response will be sent when the tag is observed + events: [{ event: "START", condition: "final" }], // Event response will be sent when final state is reached + + + // You can also use 'final' as the condition + // condition: "final", // Will wait for the machine to reach a final state + // condition: "tagObserved" or "tagCleared", // Will wait for the value in observedTag to be observed/cleared + // condition: "result", resultKey: "transactionStatus", // Will return the value of this key from the state machine context + // You can also define multiple events with different until conditions + + }, + }, +) as any; + +// Register as a restate xstate service +await restate.serve({ + services: [creditCardChargeWithSync, creditCardChargeWithSync.watcher!, transactionWithSync, transactionWithSync.watcher!], + port: 9083, +}); diff --git a/packages/restate-xstate/src/index.ts b/packages/restate-xstate/src/index.ts index 1de096e..81e8522 100644 --- a/packages/restate-xstate/src/index.ts +++ b/packages/restate-xstate/src/index.ts @@ -16,6 +16,10 @@ export type { XStateApi, ActorObject, ActorObjectHandlers, + WatcherDefaults, + XStateWatcherApi, + WatchEvent, + WatchCondition, } from "./lib/types.js"; /** * @deprecated Please import from `@restatedev/xstate/promise` diff --git a/packages/restate-xstate/src/lib/actorObject.ts b/packages/restate-xstate/src/lib/actorObject.ts index b37e106..0c2ff50 100644 --- a/packages/restate-xstate/src/lib/actorObject.ts +++ b/packages/restate-xstate/src/lib/actorObject.ts @@ -19,7 +19,10 @@ import type { SerialisableScheduledEvent, XStateApi, ActorObjectHandlers, + WaitForRequest, + WatchResult, } from "./types.js"; +import { ValidWatchCondition } from "./types.js"; import { resolveReferencedActor } from "./utils.js"; import { createActor } from "./createActor.js"; import { createScheduledEventId, type RestateActorSystem } from "./system.js"; @@ -46,6 +49,23 @@ async function getOrSetVersion< return version; } +async function getVersion< + LatestVersion extends string, + PreviousVersion extends string, +>( + ctx: restate.ObjectSharedContext, + latestVersion: LatestVersion, +): Promise { + let version = (await ctx.get("version")) as + | LatestVersion + | PreviousVersion + | null; + if (version == null) { + version = latestVersion; + } + return version; +} + function getLogic< LatestStateMachine extends AnyStateMachine, PreviousStateMachine extends AnyStateMachine, @@ -135,6 +155,7 @@ export function actorObject< await validateStateMachineIsNotDisposed(ctx); const systemName = ctx.key; + if (!request) { throw new restate.TerminalError("Must provide a request"); } @@ -368,6 +389,192 @@ export function actorObject< ctx.set("disposed", true); }, ), + hasTag: async ( + ctx: restate.ObjectContext, + req?: { tag: string }, + ) => { + await validateStateMachineIsNotDisposed(ctx); + const systemName = ctx.key; + + // no need to set the version here if we are just getting a snapshot + let version = await ctx.get("version"); + if (version == null) { + version = latestLogic.id; + } + const logic = getLogic(latestLogic, versions, version); + + const root = await createActor< + LatestStateMachine | PreviousStateMachine + >(ctx, api, systemName, version, logic); + + await validateStateMachineIsNotDisposed(ctx); + + const liveState = root.getSnapshot(); + const tag = req?.tag; + + return "hasTag" in liveState && (liveState as any).hasTag(tag); + }, + waitFor: restate.handlers.object.shared( + async ( + ctx: restate.ObjectSharedContext, + req: WaitForRequest, + ): Promise => { + await validateStateMachineIsNotDisposed(ctx); + const systemName = ctx.key; + const version = await getVersion(ctx, latestLogic.id); + const start = Date.now(); + + if ( + !req.condition || + !Object.values(ValidWatchCondition).includes(req.condition) + ) { + throw new restate.TerminalError( + "Invalid request: 'condition' must be one of ValidWatchCondition values", + ); + } + if (req.condition === "result" && !req.resultKey) { + throw new restate.TerminalError( + "Invalid request: 'resultKey' must be provided when 'condition' is 'result'", + ); + } + if ( + (req.condition === "tagObserved" || req.condition === "tagCleared") && + !req.observeTag + ) { + throw new restate.TerminalError( + "Invalid request: 'tag' must be provided when 'condition' is 'tagObserved' or 'tagCleared'", + ); + } + + ctx.console.log( + `Waiting for condition ${req.condition} with tag ${req.observeTag} or resultKey ${req.resultKey} in objectId ${systemName}`, + ); + + const until = req.condition; + const tag = req.observeTag as string; + const awaitResultKey = req.resultKey; + const intervalMs = req.intervalMs || 1000; + const timeoutMs = req.timeoutMs || 60000; + + const selfClient = ctx.objectClient< + ActorObjectHandlers + >(api, systemName); + if (!selfClient) { + throw new restate.TerminalError( + `Actor object ${systemName} not found`, + ); + } + + const machineCurrentStatus = async () => { + // Get the current state of the machine + const hasTag = await selfClient.hasTag({ tag }); + const snapshot = await selfClient.snapshot() as any; + const isFinal = snapshot.status === 'done'; + let awaitResultValue; + if (awaitResultKey && snapshot && "context" in snapshot) { + awaitResultValue = snapshot.context[awaitResultKey]; + } + + return { + isFinal, + hasTag, + snapshot, + awaitResultValue, + }; + }; + + // Check if tag exists in machine definition (for tagObserved and tagCleared) + if ((until === "tagObserved" || until === "tagCleared") && tag) { + const logic = getLogic(latestLogic, versions, version); + let tagExists = false; + + // Check if the tag is defined in any state + if (logic.config && logic.config.states) { + // Recursively check states for tags + const checkStateForTag = (states: Record) => { + for (const stateName in states) { + const state = states[stateName]; + // Check if this state has the tag + if (state.tags && state.tags.includes(tag)) { + return true; + } + // Check nested states + if (state.states && checkStateForTag(state.states)) { + return true; + } + } + return false; + }; + + tagExists = checkStateForTag(logic.config.states); + } + + if (!tagExists) { + return { + timedOut: false, + waitedMs: Date.now() - start, + error: new Error(`Tag "${tag}" is not defined in any state of the machine ${systemName}`), + }; + } + } + + // Track if we've seen the tag before + let tagWasObserved = false; + + while (true) { + if (Date.now() - start > timeoutMs) { + return { + timedOut: true, + waitedMs: Date.now() - start, + error: new Error(`Timeout after ${timeoutMs}ms waiting for ${until} condition on machine ${systemName}`), + }; + } + const { isFinal, hasTag, snapshot, awaitResultValue } = + await machineCurrentStatus(); + + // Update tag observation state + if (hasTag) { + tagWasObserved = true; + } + + if (until === "final" && isFinal) { + return { + timedOut: false, + waitedMs: Date.now() - start, + result: snapshot, + }; + } + if (until === "tagObserved" && hasTag) { + return { + timedOut: false, + waitedMs: Date.now() - start, + result: awaitResultKey ? awaitResultValue : snapshot, + }; + } + if (until === "tagCleared" && !hasTag && tagWasObserved) { + return { + timedOut: false, + waitedMs: Date.now() - start, + result: awaitResultKey ? awaitResultValue : snapshot, + }; + } + if (until === "result" && awaitResultValue) { + return { + timedOut: false, + waitedMs: Date.now() - start, + result: + awaitResultKey && awaitResultValue + ? awaitResultValue + : snapshot, + }; + } + ctx.console.log( + `Condition ${until} not met yet, waiting for ${intervalMs}ms...`, + ); + await ctx.sleep(intervalMs); + } + }, + ), }, }); } diff --git a/packages/restate-xstate/src/lib/actorWatcherObject.ts b/packages/restate-xstate/src/lib/actorWatcherObject.ts new file mode 100644 index 0000000..3ad573c --- /dev/null +++ b/packages/restate-xstate/src/lib/actorWatcherObject.ts @@ -0,0 +1,142 @@ +/** + * @license + * This object will provide a side car handlers (sendWithAwait) to state machines represented in actorObject. + * These handlers will be used to send events to the state machine, on behalf of the clients that await for the response to the events. + */ + +import * as restate from "@restatedev/restate-sdk"; +import { + ValidWatchCondition, + type WatchableXStateApi, + type WatcherDefaults, + type WatchRequest, + type WatchResult, +} from "./types.js"; + +export function actorWatcherObject( + watcherName: string, + watcherDefaults?: WatcherDefaults, +) { + return restate.object({ + name: watcherName, + handlers: { + sendWithAwait: restate.handlers.object.exclusive( + async ( + ctx: restate.ObjectContext, + req: WatchRequest, + ): Promise => { + + // Start the timer to observe transitions + const startTime = Date.now(); + if (!ctx.key || !req.event || typeof req.event !== "object") { + throw new restate.TerminalError( + "Invalid request: key, event are required", + ); + } + + const originalMachineName = watcherName.replace(/\.watcher/g, ""); + const eventWatchUntils = watcherDefaults?.events?.find( + (definedEvent) => definedEvent.event === req.event.type && definedEvent.condition === req.until?.condition, + ); + if (!eventWatchUntils) { + throw new restate.TerminalError( + `Event ${req.event.type} is not defined while defining watcher defaults`, + ); + } + + if ( + !eventWatchUntils?.condition || + !Object.values(ValidWatchCondition).includes(eventWatchUntils.condition) + ) { + throw new restate.TerminalError( + "Invalid event request: watcher 'condition' must be one of ValidWatchCondition values", + ); + } + + if ( + eventWatchUntils?.condition === "result" && + !eventWatchUntils.resultKey + ) { + throw new restate.TerminalError( + "Invalid request: 'resultKey' must be provided when 'condition' is 'result'", + ); + } + if ( + (eventWatchUntils?.condition === "tagObserved" || + eventWatchUntils?.condition === "tagCleared") && + !eventWatchUntils.observeTag + ) { + throw new restate.TerminalError( + "Invalid request: 'observeTag' must be provided when 'condition' is 'tagObserved' or 'tagCleared'", + ); + } + + const condition = eventWatchUntils?.condition; + const resultKey = eventWatchUntils?.resultKey; + const observeTag = eventWatchUntils?.observeTag; + const intervalMs = + req.intervalMs || watcherDefaults?.intervalMs || 500; + const timeoutMs = + req.timeoutMs || watcherDefaults?.timeoutMs || 60000; + + // Send event to the machine object + const machineClient = ctx.objectClient( + { + name: originalMachineName, + } as unknown as restate.VirtualObjectDefinition< + string, + WatchableXStateApi + >, + ctx.key, + ); + if (!machineClient) { + throw new restate.TerminalError( + `Machine object ${originalMachineName} not found`, + ); + } + + + + ctx.console.log( + `Sending event ${JSON.stringify(req.event)} to machine:${originalMachineName}, with key:${ctx.key}`, + ); + + // Send the event to the machine instance + try { + (machineClient as any).send({ + event: req.event, + source: "actorWatcherObject/sendWithAwait", + }); + } catch (error) { + throw new restate.TerminalError( + `Error sending event to machine ${originalMachineName}: ${error}`, + ); + } + + + + // Sleep for the initial interval for send event to materialize + ctx.console.log( + `Sleeping for ${intervalMs}ms to allow event to materialize in machine: ${originalMachineName}, with key: ${ctx.key}`, + ); + await ctx.sleep(intervalMs); + + let result: WatchResult = (await (machineClient as any).waitFor({ + condition, + observeTag, + resultKey, + intervalMs, + timeoutMs, + })) as WatchResult; + + ctx.console.log( + `Received response from machine: ${originalMachineName}, with key:${ctx.key}}`, + ); + result.waitedMs = Date.now() - startTime; + return result; + + }, + ), + }, + }); +} diff --git a/packages/restate-xstate/src/lib/types.ts b/packages/restate-xstate/src/lib/types.ts index e75964b..aeebc84 100644 --- a/packages/restate-xstate/src/lib/types.ts +++ b/packages/restate-xstate/src/lib/types.ts @@ -64,6 +64,7 @@ export interface XStateOptions { * @default Infinity * */ finalStateTTL?: number; + watcher?: WatcherDefaults; } export type ActorObjectHandlers = { @@ -93,6 +94,10 @@ export type ActorObjectHandlers = { version?: string; }, ) => Promise; + hasTag: ( + ctx: ObjectContext, + request: { tag: string }, + ) => Promise; }; export type ActorObject< @@ -109,3 +114,72 @@ export type XStateApi< P extends string, LatestStateMachine extends AnyStateMachine, > = ReturnType>; + +export type XStateWatcherApi< + P extends string, + LatestStateMachine extends AnyStateMachine, +> = XStateApi & { + watcher: VirtualObjectDefinition; +}; + +export type NoContextActorObjectHandlers = { + [Key in keyof ActorObjectHandler]: ActorObjectHandler[Key] extends ( + ctx: any, + req: infer R, + ) => Promise + ? (req: R) => Promise + : ActorObjectHandler[Key] extends (ctx: any) => Promise + ? () => Promise + : never; +}; + +export type WatchableXStateApi = Pick< + NoContextActorObjectHandlers>, + "send" | "snapshot" | "hasTag" +>; + +export type WatchCondition = "final" | "tagObserved" | "tagCleared" | "result"; +export enum ValidWatchCondition { + "final", + "tagObserved", + "tagCleared", + "result" +} +export type WatchEvent = { + event: string; + condition?: WatchCondition; + observeTag?: string; + resultKey?: string; +}; + +export type WatcherDefaults = { + events?: WatchEvent[]; + intervalMs?: number; + timeoutMs?: number; +}; + +export type WatchRequest = { + objectName: string; + objectId: string; + event: { + type: string; + }; + until?: WatchEvent; + intervalMs?: number; + timeoutMs?: number; +}; + +export type WaitForRequest = { + condition: WatchCondition; + observeTag?: string; + resultKey?: string; + intervalMs?: number; + timeoutMs?: number; +}; + +export type WatchResult = { + timedOut: boolean; + waitedMs: number; + result?: unknown; + error?: Error; +}; diff --git a/packages/restate-xstate/src/lib/xstate.ts b/packages/restate-xstate/src/lib/xstate.ts index 3597035..9f797ec 100644 --- a/packages/restate-xstate/src/lib/xstate.ts +++ b/packages/restate-xstate/src/lib/xstate.ts @@ -1,6 +1,7 @@ import type { AnyStateMachine } from "xstate"; -import type { XStateApi, XStateOptions } from "./types.js"; +import type { XStateApi, XStateOptions, XStateWatcherApi } from "./types.js"; import { actorObject } from "./actorObject.js"; +import { actorWatcherObject } from "./actorWatcherObject.js"; export const xstate = < P extends string, @@ -10,7 +11,7 @@ export const xstate = < path: P, logic: LatestStateMachine, options?: XStateOptions, -): XStateApi => { +): XStateApi | XStateWatcherApi => { if (options?.versions) { const idsSet = new Set(); for (const version of options.versions) { @@ -26,5 +27,15 @@ export const xstate = < } } - return actorObject(path, logic, options); + const originalActor = actorObject(path, logic, options); + + if (options?.watcher && options?.watcher.events != undefined) { + const finalActor = originalActor as XStateWatcherApi; + // Note: '/' is not allowed in object names + // Create a corresponding watcher object for the original actor + finalActor.watcher = actorWatcherObject(`${path}.watcher`, options.watcher); + } + + + return originalActor; }; diff --git a/packages/tests/src/lib/workflowWithSyncSupport.test.ts b/packages/tests/src/lib/workflowWithSyncSupport.test.ts new file mode 100644 index 0000000..504f84d --- /dev/null +++ b/packages/tests/src/lib/workflowWithSyncSupport.test.ts @@ -0,0 +1,163 @@ +import { xstate } from "@restatedev/xstate"; +import { describe, it } from "vitest"; +import { createRestateTestActor } from "@restatedev/xstate-test"; +import { fromPromise } from "@restatedev/xstate/promise"; +import { setup, assign, type SnapshotFrom } from "xstate"; +import { eventually } from "./eventually.js"; +import { error } from "console"; + +const workflow = setup({ + actors: { + commitStep1: fromPromise(async ({ input }) => { + // Simulate Step1 async logic + console.log("Committing Step 1:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step1-success" }; + }), + commitStep2: fromPromise(async ({ input }) => { + // Simulate Step2 async logic + console.log("Committing Step 2:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step2-success" }; + }), + commitStep3: fromPromise(async ({ input }) => { + // Simulate Step3 async logic + console.log("Committing Step 3:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step3-success" }; + }), + commitStep4: fromPromise(async ({ input }) => { + // Simulate Step4 async logic + console.log("Committing Step 4:", input); + await new Promise((resolve) => setTimeout(resolve, 3000)); + return { status: "step4-success" }; + }), + recordTransaction: fromPromise(async ({ input }) => { + // Simulate recording transaction logic + console.log("Recording transaction:", input); + await new Promise((resolve) => setTimeout(resolve, 4000)); + return { status: "transaction-recorded" }; + }), + }, +}).createMachine({ + id: "transaction", + context: { + error: null as string | null, + transactionStatus: undefined, + senderUserID: "", + recipientUserID: "", + amount: 0, + }, + states: { + idle: { + on: { + START: { + target: "commitTransaction", + }, + }, + }, + commitTransaction: { + tags: ["transactionCommit"], + initial: "step1", + states: { + step1: { + invoke: { + src: "commitStep1", + onDone: { + target: "step2", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + step2: { + tags: ["transactionStep2Commit"], + invoke: { + src: "commitStep2", + onDone: { + target: "step3", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + step3: { + invoke: { + src: "commitStep3", + onDone: { + target: "step4", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + step4: { + invoke: { + src: "commitStep4", + onDone: { + target: "#transaction.recordTransaction", + }, + onError: { + target: "#transaction.failed", + }, + }, + }, + }, + }, + recordTransaction: { + invoke: { + src: "recordTransaction", + onDone: { + target: "success", + actions: assign({ + transactionStatus: ({ event }) => event.output.status, + }), + }, + onError: { + target: "failed", + }, + }, + }, + success: { + type: "final", + }, + failed: { + type: "final", + }, + }, +}); + +describe("An event based workflow with sync support", () => { + it("Will complete successfully", { timeout: 20_000 }, async () => { + const wf = xstate("workflow", workflow); + + using actor = await createRestateTestActor>({ + machine: wf, + input: { + senderUserID: "user1", + recipientUserID: "user2", + amount: 100, + }, + }); + + await actor.send({ + type: "START", + }); + + await eventually(() => actor.snapshot()).toMatchObject({ + status: "done", + value: "success", + context: { + error: null, + senderUserID: "user1", + recipientUserID: "user2", + amount: 100, + transactionStatus: "transaction-recorded", + } + }); + + }); +});