From b18d775358f2a7379e0cd8c8062c6145d9f481f5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 24 Jan 2024 12:53:22 +0100 Subject: [PATCH] Mark invoke return promise as combineable. Re-implement rpc/send/sendDelayed using private methods. --- src/restate_context_impl.ts | 104 ++++++++++++++++++------------------ src/utils/promises.ts | 4 +- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/src/restate_context_impl.ts b/src/restate_context_impl.ts index 1e390009..d7e2ae6c 100644 --- a/src/restate_context_impl.ts +++ b/src/restate_context_impl.ts @@ -76,9 +76,10 @@ export interface CallContext { delay?: number; } -export type InternalCombineablePromise = CombineablePromise & { - journalIndex: number; -}; +export type InternalCombineablePromise = CombineablePromise & + WrappedPromise & { + journalIndex: number; + }; export class RestateContextImpl implements RestateGrpcContext, RpcContext { // here, we capture the context information for actions on the Restate context that @@ -156,7 +157,12 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { data: Uint8Array ): Promise { if (this.isInOneWayCall()) { - return this.invokeOneWay(service, method, data); + return this.invokeOneWay( + service, + method, + data, + this.getOneWayCallDelay() + ); } else { return this.invoke(service, method, data); } @@ -167,7 +173,7 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { service: string, method: string, data: Uint8Array - ): Promise { + ): InternalCombineablePromise { this.checkState("invoke"); const msg = InvokeEntryMessage.create({ @@ -175,18 +181,21 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { methodName: method, parameter: Buffer.from(data), }); - return this.stateMachine - .handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg) - .transform((v) => v as Uint8Array); + return this.markCombineablePromise( + this.stateMachine + .handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg) + .transform((v) => v as Uint8Array) + ); } private async invokeOneWay( service: string, method: string, - data: Uint8Array + data: Uint8Array, + delay?: number ): Promise { - const delay = this.getOneWayCallDelay(); - const invokeTime = delay > 0 ? Date.now() + delay : undefined; + const actualDelay = delay || 0; + const invokeTime = actualDelay > 0 ? Date.now() + actualDelay : undefined; const msg = BackgroundInvokeEntryMessage.create({ serviceName: service, methodName: method, @@ -229,25 +238,21 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { ); } - public rpc({ path }: ServiceApi): Client { + rpc({ path }: ServiceApi): Client { const clientProxy = new Proxy( - {}, - { - get: (_target, prop) => { - const route = prop as string; - return async (...args: unknown[]) => { - const request = requestFromArgs(args); - const requestBytes = RpcRequest.encode(request).finish(); - const responseBytes = await this.request( - path, - route, - requestBytes - ); - const response = RpcResponse.decode(responseBytes); - return response.response; - }; - }, - } + {}, + { + get: (_target, prop) => { + const route = prop as string; + return async (...args: unknown[]) => { + const request = requestFromArgs(args); + const requestBytes = RpcRequest.encode(request).finish(); + return this.invoke(path, route, requestBytes).transform( + (responseBytes) => RpcResponse.decode(responseBytes).response + ); + }; + }, + } ); return clientProxy as Client; @@ -258,26 +263,21 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { } public sendDelayed( - { path }: ServiceApi, - delayMillis: number + { path }: ServiceApi, + delayMillis: number ): SendClient { const clientProxy = new Proxy( - {}, - { - get: (_target, prop) => { - const route = prop as string; - return (...args: unknown[]) => { - const request = requestFromArgs(args); - const requestBytes = RpcRequest.encode(request).finish(); - const sender = () => this.request(path, route, requestBytes); - if (delayMillis === undefined || delayMillis === 0) { - this.oneWayCall(sender); - } else { - this.delayedCall(sender, delayMillis); - } - }; - }, - } + {}, + { + get: (_target, prop) => { + const route = prop as string; + return (...args: unknown[]) => { + const request = requestFromArgs(args); + const requestBytes = RpcRequest.encode(request).finish(); + this.invokeOneWay(path, route, requestBytes, delayMillis); + }; + }, + } ); return clientProxy as SendClient; @@ -405,7 +405,7 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { return this.markCombineablePromise(this.sleepInternal(millis)); } - private sleepInternal(millis: number): Promise { + private sleepInternal(millis: number): WrappedPromise { return this.stateMachine.handleUserCodeMessage( SLEEP_ENTRY_MESSAGE_TYPE, SleepEntryMessage.create({ wakeUpTime: Date.now() + millis }) @@ -510,9 +510,9 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { return context?.type === CallContexType.OneWayCall; } - private getOneWayCallDelay(): number { - const context = RestateGrpcContextImpl.callContext.getStore(); - return context?.delay || 0; + private getOneWayCallDelay(): number | undefined { + const context = RestateContextImpl.callContext.getStore(); + return context?.delay; } private checkNotExecutingSideEffect() { @@ -550,7 +550,7 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext { } private markCombineablePromise( - p: Promise + p: WrappedPromise ): InternalCombineablePromise { return Object.defineProperties(p, { __restate_context: { diff --git a/src/utils/promises.ts b/src/utils/promises.ts index 4a5670d4..d2469ae0 100644 --- a/src/utils/promises.ts +++ b/src/utils/promises.ts @@ -24,7 +24,7 @@ export type WrappedPromise = Promise & { | ((reason: any) => TResult2 | PromiseLike) | null | undefined - ) => Promise; + ) => WrappedPromise; }; export function wrapDeeply( @@ -41,7 +41,7 @@ export function wrapDeeply( | ((reason: any) => TResult2 | PromiseLike) | null | undefined - ) => Promise; + ) => WrappedPromise; if (Object.hasOwn(promise, "transform")) { const wrappedPromise = promise as WrappedPromise; transform = (onfulfilled, onrejected) =>