Skip to content

Commit

Permalink
Add failure variant for GetState, PollInputStream and SleepEntryMessage
Browse files Browse the repository at this point in the history
This commit updates the SDK to the latest service protocol version. Part
of it is to enable failure variants for the GetState, PollInputStream and
SleepEntryMessages so that the runtime can cancel these entries. This
commit also adds tests for verifying the changes.

This fixes #210.
  • Loading branch information
tillrohrmann committed Dec 17, 2023
1 parent c3d84ac commit 745971e
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 65 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@
"example": "examples",
"test": "test"
}
}
}
65 changes: 42 additions & 23 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/*
* Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate SDK for Node.js/TypeScript,
* which is released under the MIT license.
*
* You can find a copy of the license in file LICENSE in the root
* directory of this repository or package, or at
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/service-protocol/blob/main/LICENSE

syntax = "proto3";

Expand Down Expand Up @@ -44,7 +42,6 @@ message StartMessage {
}

// Type: 0x0000 + 1
// Note: Acks to custom messages will have the `empty` field filled.
message CompletionMessage {
uint32 entry_index = 1;

Expand All @@ -55,6 +52,11 @@ message CompletionMessage {
};
}

// Type: 0x0000 + 4
message EntryAckMessage {
uint32 entry_index = 1;
}

// Type: 0x0000 + 2
// Implementations MUST send this message when suspending an invocation.
message SuspensionMessage {
Expand Down Expand Up @@ -94,13 +96,18 @@ message ErrorMessage {

// ------ Input and output ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0400 + 0
message PollInputStreamEntryMessage {
bytes value = 14;
oneof result {
bytes value = 14;
Failure failure = 15;
}
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0400 + 1
message OutputStreamEntryMessage {
oneof result {
Expand All @@ -111,43 +118,52 @@ message OutputStreamEntryMessage {

// ------ State access ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0800 + 0
message GetStateEntryMessage {
bytes key = 1;

oneof result {
google.protobuf.Empty empty = 13;
bytes value = 14;
Failure failure = 15;
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0800 + 1
message SetStateEntryMessage {
bytes key = 1;
bytes value = 3;
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0800 + 2
message ClearStateEntryMessage {
bytes key = 1;
}

// ------ Syscalls ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0C00 + 0
message SleepEntryMessage {
// Wake up time.
// The time is set as duration since UNIX Epoch.
uint64 wake_up_time = 1;

google.protobuf.Empty result = 13;
oneof result {
google.protobuf.Empty empty = 13;
Failure failure = 15;
}
}

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: Yes
// Type: 0x0C00 + 1
message InvokeEntryMessage {
string service_name = 1;
Expand All @@ -161,7 +177,8 @@ message InvokeEntryMessage {
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 2
message BackgroundInvokeEntryMessage {
string service_name = 1;
Expand All @@ -176,7 +193,8 @@ message BackgroundInvokeEntryMessage {
uint64 invoke_time = 4;
}

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0C00 + 3
// Awakeables are addressed by an identifier exposed to the user. See the spec for more details.
message AwakeableEntryMessage {
Expand All @@ -186,7 +204,8 @@ message AwakeableEntryMessage {
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 4
message CompleteAwakeableEntryMessage {
// Identifier of the awakeable. See the spec for more details.
Expand Down
37 changes: 31 additions & 6 deletions src/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import { Message } from "./types/types";
import { HostedGrpcServiceMethod } from "./types/grpc";
import {
Failure,
PollInputStreamEntryMessage,
StartMessage,
} from "./generated/proto/protocol";
Expand All @@ -37,6 +38,10 @@ enum State {
Complete = 3,
}

type InvocationValue =
| { kind: "value"; value: Buffer }
| { kind: "failure"; failure: Failure };

export class InvocationBuilder<I, O> implements RestateStreamConsumer {
private readonly complete = new CompletablePromise<void>();

Expand All @@ -46,7 +51,7 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
private replayEntries = new Map<number, Message>();
private id?: Buffer = undefined;
private debugId?: string = undefined;
private invocationValue?: Buffer = undefined;
private invocationValue?: InvocationValue = undefined;
private nbEntriesToReplay?: number = undefined;
private localStateStore?: LocalStateStore;

Expand All @@ -67,6 +72,30 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
m
);

{
const pollInputStreamMessage =
m.message as PollInputStreamEntryMessage;

if (pollInputStreamMessage.value !== undefined) {
this.invocationValue = {
kind: "value",
value: pollInputStreamMessage.value,
};
} else if (pollInputStreamMessage.failure !== undefined) {
this.invocationValue = {
kind: "failure",
failure: pollInputStreamMessage.failure,
};
} else {
throw new Error(
`PollInputStreamEntry neither contains value nor failure: ${printMessageAsJson(
m
)}`
);
}
}

this.addReplayEntry(m);
break;

Expand Down Expand Up @@ -120,10 +149,6 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}

private addReplayEntry(m: Message): InvocationBuilder<I, O> {
if (m.messageType === POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE) {
this.invocationValue = (m.message as PollInputStreamEntryMessage).value;
}

// Will be retrieved when the user code reaches this point
this.replayEntries.set(this.runtimeReplayIndex, m);
this.incrementRuntimeReplayIndex();
Expand Down Expand Up @@ -164,7 +189,7 @@ export class Invocation<I, O> {
public readonly debugId: string,
public readonly nbEntriesToReplay: number,
public readonly replayEntries: Map<number, Message>,
public readonly invocationValue: Buffer,
public readonly invocationValue: InvocationValue,
public readonly localStateStore: LocalStateStore
) {
this.logPrefix = `[${makeFqServiceName(
Expand Down
10 changes: 8 additions & 2 deletions src/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ export class Journal<I, O> {
this.resolveResult(
journalIndex,
journalEntry,
getStateMsg.value || getStateMsg.empty
getStateMsg.value || getStateMsg.empty,
getStateMsg.failure,
);
break;
}
Expand All @@ -266,7 +267,12 @@ export class Journal<I, O> {
}
case SLEEP_ENTRY_MESSAGE_TYPE: {
const sleepMsg = replayMessage.message as SleepEntryMessage;
this.resolveResult(journalIndex, journalEntry, sleepMsg.result);
this.resolveResult(
journalIndex,
journalEntry,
sleepMsg.empty,
sleepMsg.failure
);
break;
}
case AWAKEABLE_ENTRY_MESSAGE_TYPE: {
Expand Down
5 changes: 4 additions & 1 deletion src/restate_context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,10 @@ async function executeWithRetries<T>(
// - they are not retried within the service, because they will never succeed within this service,
// but can only succeed within a new invocation going to service with fixed code
// we hence break the retries here similar to terminal errors
if (e instanceof RestateError && e.code == RestateErrorCodes.JOURNAL_MISMATCH) {
if (
e instanceof RestateError &&
e.code == RestateErrorCodes.JOURNAL_MISMATCH
) {
throw e;
}

Expand Down
20 changes: 16 additions & 4 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
TerminalError,
RetryableError,
errorToErrorMessage,
failureToError,
} from "./types/errors";
import { LocalStateStore } from "./local_state_store";

Expand Down Expand Up @@ -201,10 +202,21 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
rlog.debugInvokeMessage(this.invocation.logPrefix, "Invoking function.");
}

const resultBytes: Promise<Uint8Array> = this.invocation.method.invoke(
this.restateContext,
this.invocation.invocationValue
);
let resultBytes: Promise<Uint8Array>;

switch (this.invocation.invocationValue.kind) {
case "value":
resultBytes = this.invocation.method.invoke(
this.restateContext,
this.invocation.invocationValue.value
);
break;
case "failure":
resultBytes = Promise.reject(
failureToError(this.invocation.invocationValue.failure, true)
);
break;
}

resultBytes
.then((bytes) => {
Expand Down
4 changes: 3 additions & 1 deletion src/types/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ export class RetryableError extends RestateError {
- In the replayed messages: type: ${
replayMessage.messageType
}, message: ${printMessageAsJson(replayMessage.message)}`;
return new RetryableError(msg, { errorCode: RestateErrorCodes.JOURNAL_MISMATCH });
return new RetryableError(msg, {
errorCode: RestateErrorCodes.JOURNAL_MISMATCH,
});
}

public static protocolViolation(message: string) {
Expand Down
25 changes: 25 additions & 0 deletions test/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import * as restate from "../src/public_api";
import { TestDriver } from "./testdriver";
import {
checkJournalMismatchError,
checkTerminalError,
completionMessage,
failure,
getStateMessage,
greetRequest,
greetResponse,
Expand Down Expand Up @@ -106,6 +108,18 @@ describe("GetStringStateGreeter", () => {
]);
});

it("handles completion with failure", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
completionMessage(1, undefined, undefined, failure("Canceled")),
]).run();

expect(result.length).toStrictEqual(2);
expect(result[0]).toStrictEqual(getStateMessage("STATE"));
checkTerminalError(result[1], "Canceled");
});

it("handles replay with value", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
Expand All @@ -129,6 +143,17 @@ describe("GetStringStateGreeter", () => {
outputMessage(greetResponse("Hello nobody")),
]);
});

it("handles replay with failure", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
getStateMessage("STATE", undefined, undefined, failure("Canceled")),
]).run();

expect(result.length).toStrictEqual(1);
checkTerminalError(result[0], "Canceled");
});
});

class GetNumberStateGreeter implements TestGreeter {
Expand Down
Loading

0 comments on commit 745971e

Please sign in to comment.