Skip to content

Commit

Permalink
Add failure variant for GetState, PollInputStream and SleepEntryMessage
Browse files Browse the repository at this point in the history
This commit updates the SDK to the latest service protocol version. Part
of it is to enable failure variants for the GetState, PollInputStream and
SleepEntryMessages so that the runtime can cancel these entries. This
commit also adds tests for verifying the changes.

This fixes #210.
  • Loading branch information
tillrohrmann committed Dec 18, 2023
1 parent ae50697 commit b5e9da3
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 35 deletions.
41 changes: 29 additions & 12 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,18 @@ message ErrorMessage {

// ------ Input and output ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0400 + 0
message PollInputStreamEntryMessage {
bytes value = 14;
oneof result {
bytes value = 14;
Failure failure = 15;
}
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0400 + 1
message OutputStreamEntryMessage {
oneof result {
Expand All @@ -113,43 +118,52 @@ message OutputStreamEntryMessage {

// ------ State access ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0800 + 0
message GetStateEntryMessage {
bytes key = 1;

oneof result {
google.protobuf.Empty empty = 13;
bytes value = 14;
Failure failure = 15;
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0800 + 1
message SetStateEntryMessage {
bytes key = 1;
bytes value = 3;
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0800 + 2
message ClearStateEntryMessage {
bytes key = 1;
}

// ------ Syscalls ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0C00 + 0
message SleepEntryMessage {
// Wake up time.
// The time is set as duration since UNIX Epoch.
uint64 wake_up_time = 1;

google.protobuf.Empty result = 13;
oneof result {
google.protobuf.Empty empty = 13;
Failure failure = 15;
}
}

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: Yes
// Type: 0x0C00 + 1
message InvokeEntryMessage {
string service_name = 1;
Expand All @@ -163,7 +177,8 @@ message InvokeEntryMessage {
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 2
message BackgroundInvokeEntryMessage {
string service_name = 1;
Expand All @@ -178,7 +193,8 @@ message BackgroundInvokeEntryMessage {
uint64 invoke_time = 4;
}

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0C00 + 3
// Awakeables are addressed by an identifier exposed to the user. See the spec for more details.
message AwakeableEntryMessage {
Expand All @@ -188,7 +204,8 @@ message AwakeableEntryMessage {
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 4
message CompleteAwakeableEntryMessage {
// Identifier of the awakeable. See the spec for more details.
Expand Down
37 changes: 31 additions & 6 deletions src/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import { Message } from "./types/types";
import { HostedGrpcServiceMethod } from "./types/grpc";
import {
Failure,
PollInputStreamEntryMessage,
StartMessage,
} from "./generated/proto/protocol";
Expand All @@ -37,6 +38,10 @@ enum State {
Complete = 3,
}

type InvocationValue =
| { kind: "value"; value: Buffer }
| { kind: "failure"; failure: Failure };

export class InvocationBuilder<I, O> implements RestateStreamConsumer {
private readonly complete = new CompletablePromise<void>();

Expand All @@ -46,7 +51,7 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
private replayEntries = new Map<number, Message>();
private id?: Buffer = undefined;
private debugId?: string = undefined;
private invocationValue?: Buffer = undefined;
private invocationValue?: InvocationValue = undefined;
private nbEntriesToReplay?: number = undefined;
private localStateStore?: LocalStateStore;

Expand All @@ -67,6 +72,8 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
m
);

this.handlePollInputStreamEntry(m);
this.addReplayEntry(m);
break;

Expand Down Expand Up @@ -100,6 +107,28 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}
}

private handlePollInputStreamEntry(m: Message) {
const pollInputStreamMessage = m.message as PollInputStreamEntryMessage;

if (pollInputStreamMessage.value !== undefined) {
this.invocationValue = {
kind: "value",
value: pollInputStreamMessage.value,
};
} else if (pollInputStreamMessage.failure !== undefined) {
this.invocationValue = {
kind: "failure",
failure: pollInputStreamMessage.failure,
};
} else {
throw new Error(
`PollInputStreamEntry neither contains value nor failure: ${printMessageAsJson(
m
)}`
);
}
}

public handleStreamError(e: Error): void {
this.complete.reject(e);
}
Expand All @@ -120,10 +149,6 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}

private addReplayEntry(m: Message): InvocationBuilder<I, O> {
if (m.messageType === POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE) {
this.invocationValue = (m.message as PollInputStreamEntryMessage).value;
}

// Will be retrieved when the user code reaches this point
this.replayEntries.set(this.runtimeReplayIndex, m);
this.incrementRuntimeReplayIndex();
Expand Down Expand Up @@ -164,7 +189,7 @@ export class Invocation<I, O> {
public readonly debugId: string,
public readonly nbEntriesToReplay: number,
public readonly replayEntries: Map<number, Message>,
public readonly invocationValue: Buffer,
public readonly invocationValue: InvocationValue,
public readonly localStateStore: LocalStateStore
) {
this.logPrefix = `[${makeFqServiceName(
Expand Down
10 changes: 8 additions & 2 deletions src/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ export class Journal<I, O> {
this.resolveResult(
journalIndex,
journalEntry,
getStateMsg.value || getStateMsg.empty
getStateMsg.value || getStateMsg.empty,
getStateMsg.failure
);
break;
}
Expand All @@ -276,7 +277,12 @@ export class Journal<I, O> {
}
case SLEEP_ENTRY_MESSAGE_TYPE: {
const sleepMsg = replayMessage.message as SleepEntryMessage;
this.resolveResult(journalIndex, journalEntry, sleepMsg.result);
this.resolveResult(
journalIndex,
journalEntry,
sleepMsg.empty,
sleepMsg.failure
);
break;
}
case AWAKEABLE_ENTRY_MESSAGE_TYPE: {
Expand Down
20 changes: 16 additions & 4 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
TerminalError,
RetryableError,
errorToErrorMessage,
failureToTerminalError,
} from "./types/errors";
import { LocalStateStore } from "./local_state_store";

Expand Down Expand Up @@ -211,10 +212,21 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
rlog.debugInvokeMessage(this.invocation.logPrefix, "Invoking function.");
}

const resultBytes: Promise<Uint8Array> = this.invocation.method.invoke(
this.restateContext,
this.invocation.invocationValue
);
let resultBytes: Promise<Uint8Array>;

switch (this.invocation.invocationValue.kind) {
case "value":
resultBytes = this.invocation.method.invoke(
this.restateContext,
this.invocation.invocationValue.value
);
break;
case "failure":
resultBytes = Promise.reject(
failureToTerminalError(this.invocation.invocationValue.failure)
);
break;
}

resultBytes
.then((bytes) => {
Expand Down
4 changes: 4 additions & 0 deletions src/types/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ export function errorToFailureWithTerminal(err: Error): FailureWithTerminal {
});
}

export function failureToTerminalError(failure: Failure): TerminalError {
return failureToError(failure, true) as TerminalError;
}

export function failureToError(
failure: Failure,
terminalError: boolean
Expand Down
25 changes: 25 additions & 0 deletions test/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import * as restate from "../src/public_api";
import { TestDriver } from "./testdriver";
import {
checkJournalMismatchError,
checkTerminalError,
completionMessage,
failure,
getStateMessage,
greetRequest,
greetResponse,
Expand Down Expand Up @@ -106,6 +108,18 @@ describe("GetStringStateGreeter", () => {
]);
});

it("handles completion with failure", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
completionMessage(1, undefined, undefined, failure("Canceled")),
]).run();

expect(result.length).toStrictEqual(2);
expect(result[0]).toStrictEqual(getStateMessage("STATE"));
checkTerminalError(result[1], "Canceled");
});

it("handles replay with value", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
Expand All @@ -129,6 +143,17 @@ describe("GetStringStateGreeter", () => {
outputMessage(greetResponse("Hello nobody")),
]);
});

it("handles replay with failure", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
getStateMessage("STATE", undefined, undefined, failure("Canceled")),
]).run();

expect(result.length).toStrictEqual(1);
checkTerminalError(result[0], "Canceled");
});
});

class GetNumberStateGreeter implements TestGreeter {
Expand Down
54 changes: 43 additions & 11 deletions test/protoutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,24 @@ export function toStateEntries(entries: Buffer[][]) {
);
}

export function inputMessage(value: Uint8Array): Message {
return new Message(
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PollInputStreamEntryMessage.create({
value: Buffer.from(value),
})
);
export function inputMessage(value?: Uint8Array, failure?: Failure): Message {
if (failure !== undefined) {
return new Message(
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PollInputStreamEntryMessage.create({
failure: failure,
})
);
} else if (value !== undefined) {
return new Message(
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PollInputStreamEntryMessage.create({
value: Buffer.from(value),
})
);
} else {
throw new Error("Input message needs either a value or a failure set.");
}
}

export function outputMessage(value?: Uint8Array, failure?: Failure): Message {
Expand Down Expand Up @@ -130,7 +141,8 @@ export function outputMessage(value?: Uint8Array, failure?: Failure): Message {
export function getStateMessage<T>(
key: string,
value?: T,
empty?: boolean
empty?: boolean,
failure?: Failure
): Message {
if (empty === true) {
return new Message(
Expand All @@ -148,6 +160,14 @@ export function getStateMessage<T>(
value: Buffer.from(jsonSerialize(value)),
})
);
} else if (failure !== undefined) {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage.create({
key: Buffer.from(key),
failure: failure,
})
);
} else {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -177,13 +197,25 @@ export function clearStateMessage(key: string): Message {
);
}

export function sleepMessage(wakeupTime: number, result?: Empty): Message {
if (result !== undefined) {
export function sleepMessage(
wakeupTime: number,
empty?: Empty,
failure?: Failure
): Message {
if (empty !== undefined) {
return new Message(
SLEEP_ENTRY_MESSAGE_TYPE,
SleepEntryMessage.create({
wakeUpTime: wakeupTime,
result: result,
empty: empty,
})
);
} else if (failure !== undefined) {
return new Message(
SLEEP_ENTRY_MESSAGE_TYPE,
SleepEntryMessage.create({
wakeUpTime: wakeupTime,
failure: failure,
})
);
} else {
Expand Down
Loading

0 comments on commit b5e9da3

Please sign in to comment.