diff --git a/examples/workflow_example.ts b/examples/workflow_example.ts index 1d573403..fbd061ae 100644 --- a/examples/workflow_example.ts +++ b/examples/workflow_example.ts @@ -42,7 +42,7 @@ const myworkflow = restate.workflow.workflow("acme.myworkflow", { // to listen to signals, also use promises const signal = ctx.promise("thesignal"); - const message = await signal.promise(); + const message = await signal; const result = `${message} my dear ${params.name}`; ctx.console.log(">>>>>>>>>>> Finishing workflow with: " + result); @@ -66,7 +66,7 @@ const myworkflow = restate.workflow.workflow("acme.myworkflow", { }, awaitName: async (ctx: restate.workflow.SharedWfContext): Promise => { - return ctx.promise("name_promise").promise(); + return ctx.promise("name_promise"); }, }); diff --git a/src/clients/workflow_client.ts b/src/clients/workflow_client.ts index d42e4dea..e044e95b 100644 --- a/src/clients/workflow_client.ts +++ b/src/clients/workflow_client.ts @@ -128,7 +128,7 @@ export function connect(restateUri: string): RestateClient { let result: restate.workflow.WorkflowStartResult; try { - result = await makeCall(restateUri, path, "start", workflowId, params); + result = await makeCall(restateUri, path, "submit", workflowId, params); } catch (err) { const error = ensureError(err); throw new Error("Cannot start workflow: " + error.message, { diff --git a/src/context.ts b/src/context.ts index e2ea5b44..e135d47d 100644 --- a/src/context.ts +++ b/src/context.ts @@ -186,7 +186,7 @@ export interface Context { * // The sleeping service should have sent the awakeableIdentifier string to this service. * ctx.resolveAwakeable(awakeableIdentifier, "hello"); */ - resolveAwakeable(id: string, payload: T): void; + resolveAwakeable(id: string, payload?: T): void; /** * Reject an awakeable of another service. When rejecting, the service waiting on this awakeable will be woken up with a terminal error with the provided reason. diff --git a/src/context_impl.ts b/src/context_impl.ts index f55c020b..e15ea0c7 100644 --- a/src/context_impl.ts +++ b/src/context_impl.ts @@ -491,10 +491,13 @@ export class ContextImpl implements KeyedContext, RestateGrpcChannel { }; } - public resolveAwakeable(id: string, payload: T): void { + public resolveAwakeable(id: string, payload?: T): void { + // We coerce undefined to null as null can be stringified by JSON.stringify + const payloadToWrite = payload === undefined ? null : payload; + this.checkState("resolveAwakeable"); this.completeAwakeable(id, { - value: Buffer.from(JSON.stringify(payload)), + value: Buffer.from(JSON.stringify(payloadToWrite)), }); } diff --git a/src/workflows/workflow.ts b/src/workflows/workflow.ts index 22661c3a..24bc089d 100644 --- a/src/workflows/workflow.ts +++ b/src/workflows/workflow.ts @@ -99,13 +99,12 @@ export interface WorkflowServices extends restate.ServiceBundle { // workflow-specific types (promises, contexts) // ---------------------------------------------------------------------------- -export interface DurablePromise { - promise(): Promise; +export type DurablePromise = restate.CombineablePromise & { peek(): Promise; resolve(value?: T): void; fail(errorMsg: string): void; -} +}; /** * The context for the workflow's interaction methods, which are all methods @@ -142,12 +141,6 @@ export enum WorkflowStartResult { ALREADY_FINISHED = "ALREADY_FINISHED", } -export type StatusMessage = { - sequenceNum: number; - message: string; - timestamp: Date; -}; - // ---------------------------------------------------------------------------- // types and signatures for typed clients // ---------------------------------------------------------------------------- diff --git a/src/workflows/workflow_state_service.ts b/src/workflows/workflow_state_service.ts index 63e7e72c..7893d132 100644 --- a/src/workflows/workflow_state_service.ts +++ b/src/workflows/workflow_state_service.ts @@ -10,31 +10,14 @@ */ import * as restate from "../public_api"; -import { - LifecycleStatus, - StatusMessage, - WorkflowStartResult, -} from "./workflow"; +import { LifecycleStatus, WorkflowStartResult } from "./workflow"; const LIFECYCLE_STATUS_STATE_NAME = "status"; const RESULT_STATE_NAME = "result"; const RESULT_LISTENERS_NAME = "result_listeners"; -const STATUS_MESSAGES_STATE_NAME = "messages"; -const STATUS_MESSAGE_LISTENERS = "message_listeners"; const PROMISE_STATE_PREFIX = "prom_s_"; +const USER_STATE_PREFIX = "state_"; const PROMISE_AWAKEABLE_PREFIX = "prom_l_"; -const ALL_NAMES_STATE_NAME = "all_state_names"; - -const RESERVED_STATE_NAMES = [ - LIFECYCLE_STATUS_STATE_NAME, - RESULT_STATE_NAME, - RESULT_LISTENERS_NAME, - ALL_NAMES_STATE_NAME, -]; -const RESERVED_STATE_PREFIXES = [ - PROMISE_STATE_PREFIX, - PROMISE_AWAKEABLE_PREFIX, -]; export type ValueOrError = { value?: T; @@ -168,7 +151,7 @@ export const workflowStateService = restate.keyedRouter({ _workflowId: string, stateName: string ): Promise => { - return ctx.get(stateName); + return ctx.get(USER_STATE_PREFIX + stateName); }, setState: async ( @@ -190,26 +173,9 @@ export const workflowStateService = restate.keyedRouter({ return; } - const stateName = request.stateName; - - // guard against overwriting built-in states - for (const reservedStateName of RESERVED_STATE_NAMES) { - if (stateName === reservedStateName) { - throw new restate.TerminalError( - "State name is reserved: " + reservedStateName - ); - } - } - for (const reservedStatePrefix of RESERVED_STATE_PREFIXES) { - if (stateName.startsWith(reservedStatePrefix)) { - throw new restate.TerminalError( - "State prefix is reserved: " + reservedStatePrefix - ); - } - } + const stateName = USER_STATE_PREFIX + request.stateName; ctx.set(stateName, request.value); - await rememberNewStateName(ctx, stateName); }, clearState: async ( @@ -217,80 +183,26 @@ export const workflowStateService = restate.keyedRouter({ _workflowId: string, stateName: string ): Promise => { - ctx.clear(stateName); + ctx.clear(USER_STATE_PREFIX + stateName); }, stateKeys: async (ctx: restate.KeyedContext): Promise> => { - return (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; + return (await ctx.stateKeys()).filter((name) => + name.startsWith(USER_STATE_PREFIX) + ); }, clearAllState: async (ctx: restate.KeyedContext): Promise => { - const stateNames = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; + const stateNames = (await ctx.stateKeys()).filter((name) => + name.startsWith(USER_STATE_PREFIX) + ); for (const stateName of stateNames) { ctx.clear(stateName); } }, - publishMessage: async ( - ctx: restate.KeyedContext, - _workflowId: string, - msg: { message: string; timestamp: Date } - ): Promise => { - // append message - const msgs = - (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; - msgs.push({ sequenceNum: msgs.length, ...msg }); - ctx.set(STATUS_MESSAGES_STATE_NAME, msgs); - - // wake up all listeners - const listeners = (await ctx.get(STATUS_MESSAGE_LISTENERS)) ?? []; - for (const awkId of listeners) { - ctx.resolveAwakeable(awkId, {}); - } - ctx.clear(STATUS_MESSAGE_LISTENERS); - }, - - getLatestMessage: async ( - ctx: restate.KeyedContext - ): Promise => { - const msgs = - (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; - if (msgs.length === 0) { - return null; - } else { - return msgs[msgs.length - 1]; - } - }, - - pollNextMessages: async ( - ctx: restate.KeyedContext, - _workflowId: string, - req: { from: number; awakId: string } - ): Promise => { - const msgs = - (await ctx.get(STATUS_MESSAGES_STATE_NAME)) ?? []; - if (msgs.length > req.from) { - return msgs.slice(req.from); - } - - // not yet available, register a listener to be woken up when more is available - const listeners = (await ctx.get(STATUS_MESSAGE_LISTENERS)) ?? []; - listeners.push(req.awakId); - ctx.set(STATUS_MESSAGE_LISTENERS, listeners); - return null; - }, - dispose: async (ctx: restate.KeyedContext): Promise => { - const stateNames = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; - for (const stateName of stateNames) { - ctx.clear(stateName); - } - ctx.clear(ALL_NAMES_STATE_NAME); - ctx.clear(STATUS_MESSAGE_LISTENERS); - ctx.clear(STATUS_MESSAGES_STATE_NAME); - ctx.clear(RESULT_LISTENERS_NAME); - ctx.clear(RESULT_STATE_NAME); - ctx.clear(LIFECYCLE_STATUS_STATE_NAME); + ctx.clearAll(); }, }); @@ -325,7 +237,6 @@ async function completePromise( // first completor // (a) set state ctx.set(stateName, completion); - await rememberNewStateName(ctx, stateName); // (b) complete awaiting awakeables const listeners = (await ctx.get(awakeableStateName)) ?? []; @@ -370,9 +281,6 @@ async function subscribePromise( } const listeners = (await ctx.get(awakeableStateName)) ?? []; - if (listeners.length === 0) { - await rememberNewStateName(ctx, awakeableStateName); - } listeners.push(awakeableId); ctx.set(awakeableStateName, listeners); return null; @@ -385,15 +293,6 @@ async function peekPromise( return ctx.get>(stateName); } -async function rememberNewStateName( - ctx: restate.KeyedContext, - stateName: string -) { - const names = (await ctx.get(ALL_NAMES_STATE_NAME)) ?? []; - names.push(stateName); - ctx.set(ALL_NAMES_STATE_NAME, names); -} - async function checkIfRunning(ctx: restate.KeyedContext): Promise { const status = await ctx.get(LIFECYCLE_STATUS_STATE_NAME); return status === LifecycleStatus.RUNNING; diff --git a/src/workflows/workflow_wrapper_service.ts b/src/workflows/workflow_wrapper_service.ts index d38d3afd..a8365724 100644 --- a/src/workflows/workflow_wrapper_service.ts +++ b/src/workflows/workflow_wrapper_service.ts @@ -19,54 +19,6 @@ const DEFAULT_RETENTION_PERIOD = 7 * 24 * 60 * 60 * 1000; // 1 week // Workflow Context Implementations // ---------------------------------------------------------------------------- -class SharedPromiseImpl implements wf.DurablePromise { - constructor( - private readonly workflowId: string, - private readonly promiseName: string, - private readonly ctx: restate.Context, - private readonly stateServiceApi: restate.ServiceApi - ) {} - - promise(): Promise { - const awk = this.ctx.awakeable(); - - this.ctx.send(this.stateServiceApi).subscribePromise(this.workflowId, { - promiseName: this.promiseName, - awkId: awk.id, - }); - - return awk.promise; - } - - async peek(): Promise { - const result = await this.ctx - .rpc(this.stateServiceApi) - .peekPromise(this.workflowId, { promiseName: this.promiseName }); - - if (result === null) { - return null; - } - if (result.error !== undefined) { - return Promise.reject(new Error(result.error)); - } - return Promise.resolve(result.value as T); - } - - resolve(value?: T): void { - this.ctx.send(this.stateServiceApi).completePromise(this.workflowId, { - promiseName: this.promiseName, - completion: { value }, - }); - } - - fail(errorMsg: string): void { - this.ctx.send(this.stateServiceApi).completePromise(this.workflowId, { - promiseName: this.promiseName, - completion: { error: errorMsg }, - }); - } -} - class SharedContextImpl implements wf.SharedWfContext { constructor( protected readonly ctx: restate.Context, @@ -85,12 +37,56 @@ class SharedContextImpl implements wf.SharedWfContext { } promise(name: string): wf.DurablePromise { - return new SharedPromiseImpl( - this.wfId, - name, - this.ctx, - this.stateServiceApi - ); + // Create the awakeable to complete + const awk = this.ctx.awakeable(); + this.ctx.send(this.stateServiceApi).subscribePromise(this.wfId, { + promiseName: name, + awkId: awk.id, + }); + + // Prepare implementation of DurablePromise + + const peek = async (): Promise => { + const result = await this.ctx + .rpc(this.stateServiceApi) + .peekPromise(this.wfId, { promiseName: name }); + + if (result === null) { + return null; + } + if (result.error !== undefined) { + return Promise.reject(new Error(result.error)); + } + return Promise.resolve(result.value as T); + }; + + const resolve = (value: T) => { + const currentValue = value === undefined ? null : value; + + this.ctx.send(this.stateServiceApi).completePromise(this.wfId, { + promiseName: name, + completion: { value: currentValue }, + }); + }; + + const fail = (errorMsg: string) => { + this.ctx.send(this.stateServiceApi).completePromise(this.wfId, { + promiseName: name, + completion: { error: errorMsg }, + }); + }; + + return Object.defineProperties(awk.promise, { + peek: { + value: peek.bind(this), + }, + resolve: { + value: resolve.bind(this), + }, + fail: { + value: fail.bind(this), + }, + }) as wf.DurablePromise; } } @@ -112,17 +108,15 @@ class ExclusiveContextImpl extends SharedContextImpl implements wf.WfContext { this.console = ctx.console; } - publishMessage(message: string): void { - this.ctx - .send(this.stateServiceApi) - .publishMessage(this.wfId, { message, timestamp: new Date() }); - } - grpcChannel(): restate.RestateGrpcChannel { return this.ctx.grpcChannel(); } set(stateName: string, value: T): void { + if (value === undefined || value === null) { + throw new restate.TerminalError("Cannot set state to null or undefined"); + } + this.ctx .send(this.stateServiceApi) .setState(this.wfId, { stateName, value }); @@ -185,7 +179,7 @@ export function createWrapperService( stateServiceApi: restate.ServiceApi ) { const wrapperService = { - start: async ( + submit: async ( ctx: restate.Context, request: wf.WorkflowRequest ): Promise => { @@ -251,37 +245,6 @@ export function createWrapperService( checkRequestAndWorkflowId(request); return ctx.rpc(stateServiceApi).getStatus(request.workflowId); }, - - getLatestMessage: async ( - ctx: restate.Context, - request: wf.WorkflowRequest - ): Promise => { - checkRequestAndWorkflowId(request); - return ctx.rpc(stateServiceApi).getLatestMessage(request.workflowId); - }, - - pollNextMessages: async ( - ctx: restate.Context, - request: wf.WorkflowRequest<{ from: number }> - ): Promise => { - checkRequestAndWorkflowId(request); - - // eslint-disable-next-line no-constant-condition - while (true) { - const awk = ctx.awakeable(); - const messages = await ctx - .rpc(stateServiceApi) - .pollNextMessages(request.workflowId, { - from: request.from, - awakId: awk.id, - }); - if (messages !== undefined && messages !== null) { - return messages; - } - - await awk.promise; - } - }, }; // add all the interaction methods to the wrapper service diff --git a/test/awakeable.test.ts b/test/awakeable.test.ts index e5982dd2..3bafcb79 100644 --- a/test/awakeable.test.ts +++ b/test/awakeable.test.ts @@ -163,3 +163,33 @@ describe("AwakeableGreeter", () => { checkJournalMismatchError(result[0]); }); }); + +class AwakeableNull implements TestGreeter { + async greet(): Promise { + const ctx = restate.useContext(this); + + const awakeable = ctx.awakeable(); + + await awakeable.promise; + + return TestResponse.create({ + greeting: `Hello for ${awakeable.id}`, + }); + } +} + +describe("AwakeableNull", () => { + it("handles completion with null value", async () => { + const result = await new TestDriver(new AwakeableNull(), [ + startMessage(), + inputMessage(greetRequest("Till")), + completionMessage(1, JSON.stringify(null)), + ]).run(); + + expect(result).toStrictEqual([ + awakeableMessage(), + outputMessage(greetResponse(`Hello for ${getAwakeableId(1)}`)), + END_MESSAGE, + ]); + }); +}); diff --git a/test/complete_awakeable.test.ts b/test/complete_awakeable.test.ts index 37ea159b..620e718b 100644 --- a/test/complete_awakeable.test.ts +++ b/test/complete_awakeable.test.ts @@ -28,7 +28,7 @@ import { describe, expect } from "@jest/globals"; import { TestDriver } from "./testdriver"; class ResolveAwakeableGreeter implements TestGreeter { - constructor(readonly payload: string) {} + constructor(readonly payload: string | undefined) {} async greet(): Promise { const ctx = restate.useContext(this); @@ -54,6 +54,19 @@ describe("ResolveAwakeableGreeter", () => { ]); }); + it("resolve with undefined value", async () => { + const result = await new TestDriver( + new ResolveAwakeableGreeter(undefined), + [startMessage(), inputMessage(greetRequest("Till"))] + ).run(); + + expect(result).toStrictEqual([ + resolveAwakeableMessage(getAwakeableId(1), null), + outputMessage(greetResponse("Hello")), + END_MESSAGE, + ]); + }); + it("sends message to runtime for empty string", async () => { const result = await new TestDriver(new ResolveAwakeableGreeter(""), [ startMessage(),