Skip to content

Commit 4c9f679

Browse files
Introduce orTimeout(millis) API, to easily combine a CombineablePromise with a sleep to implement timeouts.
1 parent 2fc193d commit 4c9f679

File tree

4 files changed

+103
-3
lines changed

4 files changed

+103
-3
lines changed

src/restate_context.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,18 @@ import { Client, SendClient } from "./types/router";
1717
/**
1818
* A promise that can be combined using Promise combinators in RestateContext.
1919
*/
20-
export type CombineablePromise<T> = Promise<T> & { __combineable: void };
20+
export type CombineablePromise<T> = Promise<T> & {
21+
__combineable: void;
22+
23+
/**
24+
* Creates a promise that awaits for the current promise up to the specified timeout duration.
25+
* If the timeout is fired, this Promise will be rejected with a {@link TimeoutError}.
26+
*
27+
* @param millis duration of the sleep in millis.
28+
* This is a lower-bound.
29+
*/
30+
orTimeout(millis: number): Promise<T>;
31+
};
2132

2233
/**
2334
* Base Restate context, which contains all operations that are the same in the gRPC-based API

src/restate_context_impl.ts

+26-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
TerminalError,
4949
ensureError,
5050
errorToFailureWithTerminal,
51+
TimeoutError,
5152
} from "./types/errors";
5253
import { jsonSerialize, jsonDeserialize } from "./utils/utils";
5354
import { Empty } from "./generated/google/protobuf/empty";
@@ -66,6 +67,7 @@ import {
6667
newJournalEntryPromiseId,
6768
PromiseId,
6869
} from "./promise_combinator_tracker";
70+
import { WrappedPromise } from "./utils/promises";
6971

7072
export enum CallContexType {
7173
None,
@@ -345,7 +347,7 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
345347
return this.markCombineablePromise(this.sleepInternal(millis));
346348
}
347349

348-
private sleepInternal(millis: number): Promise<void> {
350+
private sleepInternal(millis: number): WrappedPromise<void> {
349351
return this.stateMachine.handleUserCodeMessage<void>(
350352
SLEEP_ENTRY_MESSAGE_TYPE,
351353
SleepEntryMessage.create({ wakeUpTime: Date.now() + millis })
@@ -519,12 +521,34 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
519521
private markCombineablePromise<T>(
520522
p: Promise<T>
521523
): InternalCombineablePromise<T> {
524+
const journalIndex = this.stateMachine.getUserCodeJournalIndex();
525+
const orTimeout = (millis: number): Promise<T> => {
526+
const sleepPromise = this.sleepInternal(millis).transform(() => {
527+
throw new TimeoutError();
528+
});
529+
const sleepPromiseIndex = this.stateMachine.getUserCodeJournalIndex();
530+
531+
return this.stateMachine.createCombinator(Promise.race.bind(Promise), [
532+
{
533+
id: newJournalEntryPromiseId(journalIndex),
534+
promise: p,
535+
},
536+
{
537+
id: newJournalEntryPromiseId(sleepPromiseIndex),
538+
promise: sleepPromise,
539+
},
540+
]);
541+
};
542+
522543
return Object.defineProperties(p, {
523544
__combineable: {
524545
value: undefined,
525546
},
526547
journalIndex: {
527-
value: this.stateMachine.getUserCodeJournalIndex(),
548+
value: journalIndex,
549+
},
550+
orTimeout: {
551+
value: orTimeout.bind(this),
528552
},
529553
}) as InternalCombineablePromise<T>;
530554
}

src/types/errors.ts

+6
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ export class TerminalError extends RestateError {
202202
}
203203
}
204204

205+
export class TimeoutError extends TerminalError {
206+
constructor() {
207+
super("Timeout occurred", { errorCode: ErrorCodes.DEADLINE_EXCEEDED });
208+
}
209+
}
210+
205211
// Leads to Restate retries
206212
export class RetryableError extends RestateError {
207213
constructor(message: string, options?: { errorCode?: number; cause?: any }) {

test/promise_combinators.test.ts

+59
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
} from "./protoutils";
3131
import { TestGreeter, TestResponse } from "../src/generated/proto/test";
3232
import { SLEEP_ENTRY_MESSAGE_TYPE } from "../src/types/protocol";
33+
import { TimeoutError } from "../src/types/errors";
3334

3435
class AwakeableSleepRaceGreeter implements TestGreeter {
3536
async greet(): Promise<TestResponse> {
@@ -236,3 +237,61 @@ describe("AwakeableSleepRaceInterleavedWithSideEffectGreeter", () => {
236237
]);
237238
});
238239
});
240+
241+
class AwakeableOrTimeoutGreeter implements TestGreeter {
242+
async greet(): Promise<TestResponse> {
243+
const ctx = restate.useContext(this);
244+
245+
const { promise } = ctx.awakeable<string>();
246+
try {
247+
const result = await promise.orTimeout(100);
248+
return TestResponse.create({
249+
greeting: `Hello ${result}`,
250+
});
251+
} catch (e) {
252+
if (e instanceof TimeoutError) {
253+
return TestResponse.create({
254+
greeting: `Hello timed-out`,
255+
});
256+
}
257+
}
258+
259+
throw new Error("Unexpected result");
260+
}
261+
}
262+
263+
describe("AwakeableOrTimeoutGreeter", () => {
264+
it("handles completion of awakeable", async () => {
265+
const result = await new TestDriver(new AwakeableOrTimeoutGreeter(), [
266+
startMessage(),
267+
inputMessage(greetRequest("Till")),
268+
completionMessage(1, JSON.stringify("Francesco")),
269+
]).run();
270+
271+
expect(result.length).toStrictEqual(5);
272+
expect(result[0]).toStrictEqual(awakeableMessage());
273+
expect(result[1].messageType).toStrictEqual(SLEEP_ENTRY_MESSAGE_TYPE);
274+
expect(result.slice(2)).toStrictEqual([
275+
combinatorEntryMessage(0, [1]),
276+
outputMessage(greetResponse(`Hello Francesco`)),
277+
END_MESSAGE,
278+
]);
279+
});
280+
281+
it("handles completion of sleep", async () => {
282+
const result = await new TestDriver(new AwakeableOrTimeoutGreeter(), [
283+
startMessage(),
284+
inputMessage(greetRequest("Till")),
285+
completionMessage(2, undefined, true),
286+
]).run();
287+
288+
expect(result.length).toStrictEqual(5);
289+
expect(result[0]).toStrictEqual(awakeableMessage());
290+
expect(result[1].messageType).toStrictEqual(SLEEP_ENTRY_MESSAGE_TYPE);
291+
expect(result.slice(2)).toStrictEqual([
292+
combinatorEntryMessage(0, [2]),
293+
outputMessage(greetResponse(`Hello timed-out`)),
294+
END_MESSAGE,
295+
]);
296+
});
297+
});

0 commit comments

Comments
 (0)