Skip to content

Commit d9c3b26

Browse files
committed
Add 'waitFor' handler
1 parent eab3889 commit d9c3b26

File tree

5 files changed

+286
-31
lines changed

5 files changed

+286
-31
lines changed

packages/restate-xstate/README.md

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ To try out this example:
2626
# start a local Restate instance
2727
restate-server
2828
# start the service
29-
npm run auth-example
29+
npm run examples
3030
# register the state machine service against restate
3131
restate dep register http://localhost:9080
3232

@@ -57,9 +57,9 @@ In [`examples/versioning/app.ts`](../examples/src/versioning/app.ts) there is an
5757
# start a local Restate instance
5858
restate-server
5959
# start the service
60-
npm run versioning-example
60+
npm run examples
6161
# register the state machine service against restate
62-
restate dep register http://localhost:9080
62+
restate dep register http://localhost:9082
6363

6464
# create a state machine
6565
curl http://localhost:8080/counter/myMachine/create
@@ -88,3 +88,41 @@ restate sql "with keys as
8888
(select service_key from state where key = 'snapshot' and json_get_str(value_utf8, 'status') != 'done')
8989
select state.service_key, state.value_utf8 from keys right join state where keys.service_key = state.service_key and key = 'version'"
9090
```
91+
92+
## Subscribing to changes
93+
94+
Calls to `create` or `send` always return immediately with the results of any synchronous transitions that were triggered.
95+
The state machine may later transition due to delayed transitions or promise actors.
96+
It is helpful to be able to subscribe to the state machine to wait for relevant changes.
97+
In native xstate this would be done with the `subscribe` method and the `waitFor` function.
98+
In the Restate integration we expose a similar mechanism via the `waitFor` handler.
99+
100+
`waitFor` accepts two parameters, `condition`, which describes what you're waiting for, and an optional `timeout` parameter which is how many milliseconds to wait before returning an error (HTTP 408) to the caller.
101+
The `condition` parameter currently accepts either `done`, which is met if the state machine enters a state with `type: "final"`, or `hasTag:${tag}`, which is met if the state machine enters a state with that tag.
102+
When the condition is met, the waitFor request returns with the snapshot of the state machine that met the condition.
103+
If the state machine completes or enters an error state without the condition being met, `waitFor` returns an error (HTTP 412).
104+
105+
To safely watch for a change from HTTP clients, its best to use idempotent invocations.
106+
These allow for interrupted HTTP requests to `waitFor` to be resumed by simply making the request again with the same idempotency key, without having to initiate a new `waitFor` invocation (in which case, you might miss a state change in the gap between the two requests).
107+
This means that even if your wait time exceeds HTTP response timeouts, you can safely keep long-polling for completion.
108+
109+
For example:
110+
111+
```bash
112+
# start a local Restate instance
113+
restate-server
114+
# start the service
115+
npm run examples
116+
# register the state machine service against restate
117+
restate dep register http://localhost:9080
118+
119+
# create a state machine
120+
curl http://localhost:8080/auth/myMachine/create
121+
# create a waitFor invocation which waits for the machine to complete
122+
curl http://localhost:8080/auth/myMachine/waitFor --json '{"condition": "done"}' -H "idempotency-key: my-key"
123+
# kick off the machine in another window
124+
curl http://localhost:8080/auth/myMachine/send --json '{"event": {"type": "AUTH"}}'
125+
# and watch the waitFor call eventually complete!
126+
# you can even call it again afterwards; the original result will be cached for the idempotency retention period
127+
curl http://localhost:8080/auth/myMachine/waitFor --json '{"condition": "done"}' -H "idempotency-key: my-key"
128+
```

packages/restate-xstate/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ export type {
1616
XStateApi,
1717
ActorObject,
1818
ActorObjectHandlers,
19+
SnapshotWithTags,
20+
Subscription,
1921
} from "./lib/types.js";
2022
/**
2123
* @deprecated Please import from `@restatedev/xstate/promise`

packages/restate-xstate/src/lib/actorObject.ts

Lines changed: 215 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import type {
2+
Actor,
23
ActorSystemInfo,
4+
AnyActorLogic,
35
AnyEventObject,
6+
AnyMachineSnapshot,
47
AnyStateMachine,
58
InputFrom,
9+
Observer,
610
PromiseActorLogic,
711
Snapshot,
812
} from "xstate";
@@ -19,6 +23,9 @@ import type {
1923
SerialisableScheduledEvent,
2024
XStateApi,
2125
ActorObjectHandlers,
26+
Condition,
27+
Subscription,
28+
SnapshotWithTags,
2229
} from "./types.js";
2330
import { resolveReferencedActor } from "./utils.js";
2431
import { createActor } from "./createActor.js";
@@ -95,6 +102,7 @@ export function actorObject<
95102
ctx.clear("events");
96103
ctx.clear("children");
97104
ctx.clear("disposed");
105+
ctx.clear("subscriptions");
98106

99107
const version = await getOrSetVersion(ctx, latestLogic.id);
100108
const logic = getLogic(
@@ -103,16 +111,15 @@ export function actorObject<
103111
version,
104112
) as LatestStateMachine;
105113

106-
const root = (
107-
await createActor(ctx, api, systemName, version, logic, {
108-
input: {
109-
...(request?.input ?? {}),
110-
} as InputFrom<LatestStateMachine>,
111-
})
112-
).start();
114+
const root = await createActor(ctx, api, systemName, version, logic, {
115+
input: {
116+
...(request?.input ?? {}),
117+
} as InputFrom<LatestStateMachine>,
118+
});
113119

114-
const snapshot = root.getPersistedSnapshot();
115-
ctx.set("snapshot", snapshot);
120+
root.start();
121+
122+
ctx.set("snapshot", root.getPersistedSnapshot());
116123

117124
await checkIfStateMachineShouldBeDisposed(
118125
ctx,
@@ -121,7 +128,7 @@ export function actorObject<
121128
options?.finalStateTTL,
122129
);
123130

124-
return snapshot;
131+
return persistedSnapshotWithTags(root);
125132
},
126133
send: async (
127134
ctx: restate.ObjectContext<State>,
@@ -171,15 +178,19 @@ export function actorObject<
171178
ctx.set("events", events);
172179
}
173180

174-
const root = (
175-
await createActor<PreviousStateMachine | LatestStateMachine>(
181+
const root = await createActor<
182+
PreviousStateMachine | LatestStateMachine
183+
>(ctx, api, systemName, version, logic);
184+
185+
root.subscribe(
186+
new ConditionObserver(
176187
ctx,
177-
api,
178-
systemName,
179-
version,
180-
logic,
181-
)
182-
).start();
188+
root,
189+
(await ctx.get("subscriptions")) ?? {},
190+
),
191+
);
192+
193+
root.start();
183194

184195
let actor;
185196
if (request.target) {
@@ -201,8 +212,7 @@ export function actorObject<
201212
request.event,
202213
);
203214

204-
const nextSnapshot = root.getPersistedSnapshot();
205-
ctx.set("snapshot", nextSnapshot);
215+
ctx.set("snapshot", root.getPersistedSnapshot());
206216

207217
await checkIfStateMachineShouldBeDisposed(
208218
ctx,
@@ -211,11 +221,93 @@ export function actorObject<
211221
options?.finalStateTTL,
212222
);
213223

214-
return nextSnapshot;
224+
return persistedSnapshotWithTags(root);
225+
},
226+
subscribe: async (
227+
ctx: restate.ObjectContext<State>,
228+
request: { condition: string; awakeableId: string },
229+
): Promise<void> => {
230+
await validateStateMachineIsNotDisposed(ctx);
231+
validateCondition(request.condition);
232+
233+
const systemName = ctx.key;
234+
235+
// no need to set the version here if we are just getting a snapshot
236+
let version = await ctx.get("version");
237+
if (version == null) {
238+
version = latestLogic.id;
239+
}
240+
const logic = getLogic(latestLogic, versions, version);
241+
242+
const root = await createActor<
243+
LatestStateMachine | PreviousStateMachine
244+
>(ctx, api, systemName, version, logic);
245+
246+
if (
247+
evaluateCondition(ctx, root, request.condition, [request.awakeableId])
248+
) {
249+
// the condition is already met
250+
return;
251+
}
252+
253+
const subscriptions = (await ctx.get("subscriptions")) ?? {};
254+
255+
if (subscriptions[request.condition]) {
256+
subscriptions[request.condition]?.awakeables.push(
257+
request.awakeableId,
258+
);
259+
} else {
260+
subscriptions[request.condition] = {
261+
awakeables: [request.awakeableId],
262+
};
263+
}
264+
265+
ctx.set("subscriptions", subscriptions);
215266
},
267+
waitFor: restate.handlers.object.shared(
268+
async (
269+
ctx: restate.ObjectSharedContext<State>,
270+
request: { condition: Condition; timeout?: number },
271+
) => {
272+
await validateStateMachineIsNotDisposed(ctx);
273+
const systemName = ctx.key;
274+
275+
const { id, promise } = ctx.awakeable<Snapshot<unknown>>();
276+
277+
ctx
278+
.objectSendClient<
279+
ActorObjectHandlers<LatestStateMachine>
280+
>(api, systemName)
281+
.subscribe({
282+
condition: request.condition,
283+
awakeableId: id,
284+
});
285+
286+
try {
287+
if (request.timeout !== undefined) {
288+
return await promise.orTimeout(request.timeout);
289+
} else {
290+
return await promise;
291+
}
292+
} catch (e) {
293+
if (!(e instanceof restate.TerminalError)) {
294+
// pass through transient errors
295+
throw e;
296+
}
297+
298+
if (e.code != 500) {
299+
// errors that aren't from the awakeable being rejected, eg cancellation, timeout
300+
throw e;
301+
}
302+
303+
// awakeable rejection, return http 412 so that clients know this is non-transient
304+
throw new restate.TerminalError(e.message, { errorCode: 412 });
305+
}
306+
},
307+
),
216308
snapshot: async (
217309
ctx: restate.ObjectContext<State>,
218-
): Promise<Snapshot<unknown>> => {
310+
): Promise<SnapshotWithTags> => {
219311
await validateStateMachineIsNotDisposed(ctx);
220312
const systemName = ctx.key;
221313

@@ -230,7 +322,7 @@ export function actorObject<
230322
LatestStateMachine | PreviousStateMachine
231323
>(ctx, api, systemName, version, logic);
232324

233-
return root.getPersistedSnapshot();
325+
return persistedSnapshotWithTags(root);
234326
},
235327
invokePromise: restate.handlers.object.shared(
236328
async (
@@ -371,3 +463,103 @@ export function actorObject<
371463
},
372464
});
373465
}
466+
467+
function persistedSnapshotWithTags(
468+
actor: Actor<AnyActorLogic>,
469+
): SnapshotWithTags {
470+
const snapshot = actor.getPersistedSnapshot();
471+
const tags = [...(actor.getSnapshot() as AnyMachineSnapshot).tags];
472+
tags.sort();
473+
474+
return {
475+
...snapshot,
476+
tags,
477+
};
478+
}
479+
480+
function validateCondition(condition: string): asserts condition is Condition {
481+
if (condition === "done") return;
482+
if (condition.startsWith("hasTag:")) return;
483+
throw new restate.TerminalError("Invalid subscription condition", {
484+
errorCode: 400,
485+
});
486+
}
487+
488+
function evaluateCondition(
489+
ctx: restate.ObjectContext<State>,
490+
actor: Actor<AnyActorLogic>,
491+
condition: Condition,
492+
awakeables: string[],
493+
): boolean {
494+
const snapshot = actor.getSnapshot() as AnyMachineSnapshot;
495+
496+
if (snapshot.status === "error") {
497+
awakeables.forEach((awakeable) => {
498+
ctx.rejectAwakeable(awakeable, `State machine returned an error`);
499+
});
500+
return true;
501+
}
502+
503+
if (snapshot.status === "done") {
504+
if (condition === "done") {
505+
awakeables.forEach((awakeable) => {
506+
ctx.resolveAwakeable(awakeable, persistedSnapshotWithTags(actor));
507+
});
508+
} else {
509+
awakeables.forEach((awakeable) => {
510+
ctx.rejectAwakeable(
511+
awakeable,
512+
`State machine completed without the condition being met`,
513+
);
514+
});
515+
}
516+
517+
return true;
518+
}
519+
520+
if (condition.startsWith("hasTag:") && snapshot.hasTag(condition.slice(7))) {
521+
awakeables.forEach((awakeable) => {
522+
ctx.resolveAwakeable(awakeable, persistedSnapshotWithTags(actor));
523+
});
524+
return true;
525+
}
526+
527+
return false;
528+
}
529+
530+
class ConditionObserver implements Observer<AnyMachineSnapshot> {
531+
constructor(
532+
private readonly ctx: restate.ObjectContext<State>,
533+
private readonly actor: Actor<AnyActorLogic>,
534+
private readonly subscriptions: {
535+
[condition: string]: Subscription;
536+
},
537+
) {}
538+
539+
next() {
540+
this.evaluate();
541+
}
542+
543+
error() {
544+
this.evaluate();
545+
}
546+
547+
evaluate() {
548+
for (const [condition, subscription] of Object.entries(
549+
this.subscriptions,
550+
)) {
551+
if (
552+
evaluateCondition(
553+
this.ctx,
554+
this.actor,
555+
condition as Condition,
556+
subscription.awakeables,
557+
)
558+
) {
559+
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
560+
delete this.subscriptions[condition];
561+
this.ctx.set("subscriptions", this.subscriptions);
562+
}
563+
}
564+
}
565+
}

packages/restate-xstate/src/lib/system.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ export async function createSystem<T extends ActorSystemInfo>(
9898
event,
9999
delay,
100100
id,
101-
startedAt: Date.now(),
102101
uuid: ctx.rand.uuidv4(),
103102
};
104103
const scheduledEventId = createScheduledEventId(source, id);

0 commit comments

Comments
 (0)