Skip to content

Commit

Permalink
Add guard for RestateContext.sideEffect await (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Jan 17, 2024
1 parent cce4fa0 commit 66e82fe
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 39 deletions.
97 changes: 66 additions & 31 deletions src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
// For example, this is illegal: 'ctx.sideEffect(() => {await ctx.get("my-state")})'
static callContext = new AsyncLocalStorage<CallContext>();

// This is used to guard users against calling ctx.sideEffect without awaiting it.
// See https://github.com/restatedev/sdk-typescript/issues/197 for more details.
private executingSideEffect = false;

constructor(
public readonly id: Buffer,
public readonly serviceName: string,
Expand All @@ -90,30 +94,35 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
public readonly rand: Rand = new RandImpl(id)
) {}

public async get<T>(name: string): Promise<T | null> {
// DON'T make this function async!!! see sideEffect comment for details.
public get<T>(name: string): Promise<T | null> {
// Check if this is a valid action
this.checkState("get state");

// Create the message and let the state machine process it
const msg = this.stateMachine.localStateStore.get(name);
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_ENTRY_MESSAGE_TYPE,
msg
);

// If the GetState message did not have a value or empty,
// then we went to the runtime to get the value.
// When we get the response, we set it in the localStateStore,
// to answer subsequent requests
if (msg.value === undefined && msg.empty === undefined) {
this.stateMachine.localStateStore.add(name, result as Buffer | Empty);
}
const getState = async (): Promise<T | null> => {
const result = await this.stateMachine.handleUserCodeMessage(
GET_STATE_ENTRY_MESSAGE_TYPE,
msg
);

if (!(result instanceof Buffer)) {
return null;
}
// If the GetState message did not have a value or empty,
// then we went to the runtime to get the value.
// When we get the response, we set it in the localStateStore,
// to answer subsequent requests
if (msg.value === undefined && msg.empty === undefined) {
this.stateMachine.localStateStore.add(name, result as Buffer | Empty);
}

return jsonDeserialize(result.toString());
if (!(result instanceof Buffer)) {
return null;
}

return jsonDeserialize(result.toString());
};
return getState();
}

public set<T>(name: string, value: T): void {
Expand Down Expand Up @@ -144,7 +153,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
}
}

private async invoke(
// DON'T make this function async!!! see sideEffect comment for details.
private invoke(
service: string,
method: string,
data: Uint8Array
Expand All @@ -156,11 +166,9 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
methodName: method,
parameter: Buffer.from(data),
});
const promise = this.stateMachine.handleUserCodeMessage(
INVOKE_ENTRY_MESSAGE_TYPE,
msg
);
return (await promise) as Uint8Array;
return this.stateMachine
.handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg)
.transform((v) => v as Uint8Array);
}

private async invokeOneWay(
Expand All @@ -184,33 +192,39 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
return new Uint8Array();
}

public async oneWayCall(
// DON'T make this function async!!! see sideEffect comment for details.
public oneWayCall(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
call: () => Promise<any>
): Promise<void> {
this.checkState("oneWayCall");

await RestateGrpcContextImpl.callContext.run(
return RestateGrpcContextImpl.callContext.run(
{ type: CallContexType.OneWayCall },
call
);
}

public async delayedCall(
// DON'T make this function async!!! see sideEffect comment for details.
public delayedCall(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
call: () => Promise<any>,
delayMillis?: number
): Promise<void> {
this.checkState("delayedCall");

// Delayed call is a one way call with a delay
await RestateGrpcContextImpl.callContext.run(
return RestateGrpcContextImpl.callContext.run(
{ type: CallContexType.OneWayCall, delay: delayMillis },
call
);
}

public async sideEffect<T>(
// DON'T make this function async!!!
// The reason is that we want the erros thrown by the initial checks to be propagated in the caller context,
// and not in the promise context. To understand the semantic difference, make this function async and run the
// UnawaitedSideEffectShouldFailSubsequentContextCall test.
public sideEffect<T>(
fn: () => Promise<T>,
retryPolicy: RetrySettings = DEFAULT_INFINITE_EXPONENTIAL_BACKOFF
): Promise<T> {
Expand All @@ -227,6 +241,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
{ errorCode: ErrorCodes.INTERNAL }
);
}
this.checkNotExecutingSideEffect();
this.executingSideEffect = true;

const executeAndLogSideEffect = async () => {
// in replay mode, we directly return the value from the log
Expand Down Expand Up @@ -301,17 +317,25 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
return sideEffectResult;
};

const sleep = (millis: number) => this.sleep(millis);
return executeWithRetries(retryPolicy, executeAndLogSideEffect, sleep);
const sleep = (millis: number) => this.sleepInternal(millis);
return executeWithRetries(
retryPolicy,
executeAndLogSideEffect,
sleep
).finally(() => {
this.executingSideEffect = false;
});
}

public sleep(millis: number): Promise<void> {
this.checkState("sleep");
return this.sleepInternal(millis);
}

const msg = SleepEntryMessage.create({ wakeUpTime: Date.now() + millis });
private sleepInternal(millis: number): Promise<void> {
return this.stateMachine.handleUserCodeMessage<void>(
SLEEP_ENTRY_MESSAGE_TYPE,
msg
SleepEntryMessage.create({ wakeUpTime: Date.now() + millis })
);
}

Expand Down Expand Up @@ -385,9 +409,20 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
return context?.delay || 0;
}

private checkNotExecutingSideEffect() {
if (this.executingSideEffect) {
throw new TerminalError(
`Invoked a RestateContext method while a side effect is still executing.
Make sure you await the ctx.sideEffect call before using any other RestateContext method.`,
{ errorCode: ErrorCodes.INTERNAL }
);
}
}

private checkState(callType: string): void {
const context = RestateGrpcContextImpl.callContext.getStore();
if (!context) {
this.checkNotExecutingSideEffect();
return;
}

Expand Down
10 changes: 2 additions & 8 deletions src/server/base_restate_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ export abstract class BaseRestateServer {
method
);
// note that this log will not print all the keys.
rlog.info(
`Binding: ${url} -> ${JSON.stringify(method, null, "\t")}`
);
rlog.info(`Binding: ${url} -> ${JSON.stringify(method, null, "\t")}`);
}
}

Expand Down Expand Up @@ -264,11 +262,7 @@ export abstract class BaseRestateServer {
) as HostedGrpcServiceMethod<unknown, unknown>;

rlog.info(
`Binding: ${url} -> ${JSON.stringify(
registration.method,
null,
"\t"
)}`
`Binding: ${url} -> ${JSON.stringify(registration.method, null, "\t")}`
);
}

Expand Down
97 changes: 97 additions & 0 deletions test/side_effect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,103 @@ describe("AwaitSideEffectService", () => {
});
});

export class UnawaitedSideEffectShouldFailSubsequentContextCallService
implements TestGreeter
{
constructor(
// eslint-disable-next-line @typescript-eslint/no-empty-function
private readonly next = (ctx: restate.RestateContext): void => {}

Check warning on line 919 in test/side_effect.test.ts

View workflow job for this annotation

GitHub Actions / build (19.x)

'ctx' is defined but never used
) {}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
async greet(request: TestRequest): Promise<TestResponse> {
const ctx = restate.useContext(this);

ctx.sideEffect<number>(async () => {
// eslint-disable-next-line @typescript-eslint/no-empty-function
return new Promise(() => {});
});
this.next(ctx);

throw new Error("code should not reach this point");
}
}

describe("UnawaitedSideEffectShouldFailSubsequentContextCall", () => {
const defineTestCase = (
contextMethodCall: string,
next: (ctx: restate.RestateContext) => void
): void => {
it(
"Not awaiting side effect should fail at next " + contextMethodCall,
async () => {
const result = await new TestDriver(
new UnawaitedSideEffectShouldFailSubsequentContextCallService(next),
[startMessage(), inputMessage(greetRequest("Till"))]
).run();

checkTerminalError(
result[0],
`Invoked a RestateContext method while a side effect is still executing.
Make sure you await the ctx.sideEffect call before using any other RestateContext method.`
);
expect(result.slice(1)).toStrictEqual([END_MESSAGE]);
}
);
};

defineTestCase("side effect", (ctx) =>
ctx.sideEffect<number>(async () => {
return 1;
})
);
defineTestCase("get", (ctx) => ctx.get<string>("123"));
defineTestCase("set", (ctx) => ctx.set("123", "abc"));
defineTestCase("call", (ctx) => {
const client = new TestGreeterClientImpl(ctx);
client.greet(TestRequest.create({ name: "Francesco" }));
});
defineTestCase("one way call", (ctx) => {
const client = new TestGreeterClientImpl(ctx);
ctx.oneWayCall(() =>
client.greet(TestRequest.create({ name: "Francesco" }))
);
});
});

export class UnawaitedSideEffectShouldFailSubsequentSetService
implements TestGreeter
{
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async greet(request: TestRequest): Promise<TestResponse> {
const ctx = restate.useContext(this);

ctx.sideEffect<number>(async () => {
// eslint-disable-next-line @typescript-eslint/no-empty-function
return new Promise(() => {});
});
ctx.set("123", "abc");

throw new Error("code should not reach this point");
}
}

describe("UnawaitedSideEffectShouldFailSubsequentSetService", () => {
it("Not awaiting side effects should fail", async () => {
const result = await new TestDriver(
new UnawaitedSideEffectShouldFailSubsequentSetService(),
[startMessage(), inputMessage(greetRequest("Till"))]
).run();

checkTerminalError(
result[0],
`Invoked a RestateContext method while a side effect is still executing.
Make sure you await the ctx.sideEffect call before using any other RestateContext method.`
);
expect(result.slice(1)).toStrictEqual([END_MESSAGE]);
});
});

export class TerminalErrorSideEffectService implements TestGreeter {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async greet(request: TestRequest): Promise<TestResponse> {
Expand Down

0 comments on commit 66e82fe

Please sign in to comment.