-
Notifications
You must be signed in to change notification settings - Fork 4
Adda a watcher actor object, that supports send with await handler #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Adda a watcher actor object, that supports send with await handler #36
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome work @sparachi , we might need a couple of rounds of review here to narrow in on the right internal impl and api, but i really like where its heading
| ); | ||
| } | ||
| if (req.condition === "result" && !req.resultKey) { | ||
| throw new restate.TerminalError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps , {errorCode: 400} for these type of validation errors?
| ); | ||
|
|
||
| const until = req.condition; | ||
| const tag = req.observeTag as string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
observeTag might be undefined if condition is final or result?
|
|
||
| const machineCurrentStatus = async () => { | ||
| // Get the current state of the machine | ||
| const hasTag = await selfClient.hasTag({ tag }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these two calls (hasTag and snapshot) may hit the state machine at two different states, i think, so theyd be inconsistent with each other. can we do it in one call? maybe just a function over the snapshot data that is returned, or an extra field returned by snapshot if the snapshot data isnt sufficient now? Is hastag as a separate method definitely needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept each function/handler separately doing one thing. But, I could rename appropriately and return the flag, and snapshot together
| let tagWasObserved = false; | ||
|
|
||
| while (true) { | ||
| if (Date.now() - start > timeoutMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this check leads to non determinism issue. lets say we have a journal with 20 'sleep' entries because it has been waiting a while, and then the timeout is reached. now on replay (note if this code runs in lambda, every loop iteration leads to a replay), this condition will fire the first time it is checked, before the first sleep entry, because Date.now() is always returning the current time, even in replay
two options:
- use ctx.date.now() in this condition, so that the time goes into the journal. however this leads to another suspension, at least in the lambda case
- have the snapshot handler return Date.now() as part of its response object. then this is part of the journal and we basically are assessing whether the 'snapshot time' is later than the timeout, something that will not change on replay
| // Track if we've seen the tag before | ||
| let tagWasObserved = false; | ||
|
|
||
| while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open question; polling may be the right choice here, but did you consider a awakeable approach too? I think it could be simpler, faster to react, and handle timeouts nicely. here's how it might look:
- this handler creates an awakeable id, then calls into a non-shared vo handler which writes the watch condition to state along with that id
- this handler then awaits on the awakeable, racing it against a timeout (
.orTimeout()) - on every state machine transition, the
sendhandler evaluates if a watch condition has fired. if it has, it resolves the awakeable and removes the condition from state. - then this handler will be awoken when the condition is met, or the timeout is reached
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to enable long polling (ie, wait calls with timeout, that you just call again after the timeout) without the risk of dropping an event in between those calls, we may need something a little more complex:
- a watch condition is added to state against some id, and the id returned to the caller
- then you can wait on that id, and this involves creating an awakeable, putting it in state against the watch id, then awaiting on it with a timeout
- once the watch condition fires, you resolve all awakeables you currently have in state against that id, and you note in state that it fired, so any new calls that come in after for that watch id, can return immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the awakeable approach sounds might work. Let me look into it.
| return restate.object({ | ||
| name: watcherName, | ||
| handlers: { | ||
| sendWithAwait: restate.handlers.object.exclusive( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could it be a shared handler on the main VO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've two approaches before creating a watcher a) handler in actorObject with exclusive object context, but it would lock the object and other async tasks like invoke promise calls were queued until the timeout where lock is handler exits. which is not what we want b) handler in actorObject with shared context, but this approach has access to a point in time snapshot when the handler was entered. Even though the states were transitioned, this handler would not see the updates.
| ctx.console.log( | ||
| `Sleeping for ${intervalMs}ms to allow event to materialize in machine: ${originalMachineName}, with key: ${ctx.key}`, | ||
| ); | ||
| await ctx.sleep(intervalMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we cannot guarantee that the send has been processed after any amount of time, as its an exclusive mode handler and waitFor is a shared handler, and the vo could be processing a big queue of events before it gets to your send. imo the right solution here is that waitFor must work properly even if called before the send is processed. i think that the awakeable approach i mentioned above is one way to achieve this, as you could perhaps register your watch condition + awakeable id in the same call as your send, so you know they take effect at the same time. if we stick with a poll approach, we might need a counter in the VO to track whether the event that caused the watch to fire was before or after the sent event was processed
| : never; | ||
| }; | ||
|
|
||
| export type WatchableXStateApi = Pick< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a need for this trimmed down api vs just use the normal xstateapi type?
| "tagCleared", | ||
| "result" | ||
| } | ||
| export type WatchEvent = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if this should be a discriminated union with the different condition strings being different interfaces
| ctx.set("disposed", true); | ||
| }, | ||
| ), | ||
| hasTag: async ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned elsewhere, if we could just answer hasTag questions using the snapshot handler, i think that would be better
| observeTag?: string; | ||
| resultKey?: string; | ||
| intervalMs?: number; | ||
| timeoutMs?: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the idea that if i reach the timeout, i might call again? like, longpoll style? the concern there would be that you can miss an event in the meantime. or is the timeout more of a failure condition where you set it high enough that you'd always expect it to fire within the timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeoutMS is for client able to pass the value how long they're willing to wait for a response from watcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if the timeout is reached, what will the client do?
| } | ||
|
|
||
| const machineCurrentStatus = async () => { | ||
| // Get the current state of the machine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we won't observe any intermediary states of the machine, if it synchronously moves over 10 states that might have matched the watch condition. i think we need to evaluate the condition in a watch callback from xstate in the send handler as the transitions are processed
Provide synchronous support to clients interacting with back end state machines. actorWatcherObject provides a non-blocking way to wait for response from the original state machine, by tracking it's state/tags/context.