Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Mark invoke return promise as combineable.
Browse files Browse the repository at this point in the history
Re-implement rpc/send/sendDelayed using private methods.
slinkydeveloper committed Jan 24, 2024
1 parent ce5f7fc commit b18d775
Showing 2 changed files with 54 additions and 54 deletions.
104 changes: 52 additions & 52 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
@@ -76,9 +76,10 @@ export interface CallContext {
delay?: number;
}

export type InternalCombineablePromise<T> = CombineablePromise<T> & {
journalIndex: number;
};
export type InternalCombineablePromise<T> = CombineablePromise<T> &
WrappedPromise<T> & {
journalIndex: number;
};

export class RestateContextImpl implements RestateGrpcContext, RpcContext {
// here, we capture the context information for actions on the Restate context that
@@ -156,7 +157,12 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
data: Uint8Array
): Promise<Uint8Array> {
if (this.isInOneWayCall()) {
return this.invokeOneWay(service, method, data);
return this.invokeOneWay(
service,
method,
data,
this.getOneWayCallDelay()
);
} else {
return this.invoke(service, method, data);
}
@@ -167,26 +173,29 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
service: string,
method: string,
data: Uint8Array
): Promise<Uint8Array> {
): InternalCombineablePromise<Uint8Array> {
this.checkState("invoke");

const msg = InvokeEntryMessage.create({
serviceName: service,
methodName: method,
parameter: Buffer.from(data),
});
return this.stateMachine
.handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg)
.transform((v) => v as Uint8Array);
return this.markCombineablePromise(
this.stateMachine
.handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg)
.transform((v) => v as Uint8Array)
);
}

private async invokeOneWay(
service: string,
method: string,
data: Uint8Array
data: Uint8Array,
delay?: number
): Promise<Uint8Array> {
const delay = this.getOneWayCallDelay();
const invokeTime = delay > 0 ? Date.now() + delay : undefined;
const actualDelay = delay || 0;
const invokeTime = actualDelay > 0 ? Date.now() + actualDelay : undefined;
const msg = BackgroundInvokeEntryMessage.create({
serviceName: service,
methodName: method,
@@ -229,25 +238,21 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
);
}

public rpc<M>({ path }: ServiceApi): Client<M> {
rpc<M>({ path }: ServiceApi<M>): Client<M> {
const clientProxy = new Proxy(
{},
{
get: (_target, prop) => {
const route = prop as string;
return async (...args: unknown[]) => {
const request = requestFromArgs(args);
const requestBytes = RpcRequest.encode(request).finish();
const responseBytes = await this.request(
path,
route,
requestBytes
);
const response = RpcResponse.decode(responseBytes);
return response.response;
};
},
}
{},
{
get: (_target, prop) => {
const route = prop as string;
return async (...args: unknown[]) => {
const request = requestFromArgs(args);
const requestBytes = RpcRequest.encode(request).finish();
return this.invoke(path, route, requestBytes).transform(
(responseBytes) => RpcResponse.decode(responseBytes).response
);
};
},
}
);

return clientProxy as Client<M>;
@@ -258,26 +263,21 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
}

public sendDelayed<M>(
{ path }: ServiceApi,
delayMillis: number
{ path }: ServiceApi,
delayMillis: number
): SendClient<M> {
const clientProxy = new Proxy(
{},
{
get: (_target, prop) => {
const route = prop as string;
return (...args: unknown[]) => {
const request = requestFromArgs(args);
const requestBytes = RpcRequest.encode(request).finish();
const sender = () => this.request(path, route, requestBytes);
if (delayMillis === undefined || delayMillis === 0) {
this.oneWayCall(sender);
} else {
this.delayedCall(sender, delayMillis);
}
};
},
}
{},
{
get: (_target, prop) => {
const route = prop as string;
return (...args: unknown[]) => {
const request = requestFromArgs(args);
const requestBytes = RpcRequest.encode(request).finish();
this.invokeOneWay(path, route, requestBytes, delayMillis);
};
},
}
);

return clientProxy as SendClient<M>;
@@ -405,7 +405,7 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
return this.markCombineablePromise(this.sleepInternal(millis));
}

private sleepInternal(millis: number): Promise<void> {
private sleepInternal(millis: number): WrappedPromise<void> {
return this.stateMachine.handleUserCodeMessage<void>(
SLEEP_ENTRY_MESSAGE_TYPE,
SleepEntryMessage.create({ wakeUpTime: Date.now() + millis })
@@ -510,9 +510,9 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
return context?.type === CallContexType.OneWayCall;
}

private getOneWayCallDelay(): number {
const context = RestateGrpcContextImpl.callContext.getStore();
return context?.delay || 0;
private getOneWayCallDelay(): number | undefined {
const context = RestateContextImpl.callContext.getStore();
return context?.delay;
}

private checkNotExecutingSideEffect() {
@@ -550,7 +550,7 @@ export class RestateContextImpl implements RestateGrpcContext, RpcContext {
}

private markCombineablePromise<T>(
p: Promise<T>
p: WrappedPromise<T>
): InternalCombineablePromise<T> {
return Object.defineProperties(p, {
__restate_context: {
4 changes: 2 additions & 2 deletions src/utils/promises.ts
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ export type WrappedPromise<T> = Promise<T> & {
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| null
| undefined
) => Promise<TResult1 | TResult2>;
) => WrappedPromise<TResult1 | TResult2>;
};

export function wrapDeeply<T>(
@@ -41,7 +41,7 @@ export function wrapDeeply<T>(
| ((reason: any) => TResult2 | PromiseLike<TResult2>)
| null
| undefined
) => Promise<TResult1 | TResult2>;
) => WrappedPromise<TResult1 | TResult2>;
if (Object.hasOwn(promise, "transform")) {
const wrappedPromise = promise as WrappedPromise<T>;
transform = (onfulfilled, onrejected) =>

0 comments on commit b18d775

Please sign in to comment.