Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure variant for GetState, PollInputStream and SleepEntryMessage #211

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,18 @@ message ErrorMessage {

// ------ Input and output ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0400 + 0
message PollInputStreamEntryMessage {
bytes value = 14;
oneof result {
bytes value = 14;
Failure failure = 15;
}
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0400 + 1
message OutputStreamEntryMessage {
oneof result {
Expand All @@ -113,43 +118,52 @@ message OutputStreamEntryMessage {

// ------ State access ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0800 + 0
message GetStateEntryMessage {
bytes key = 1;

oneof result {
google.protobuf.Empty empty = 13;
bytes value = 14;
Failure failure = 15;
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0800 + 1
message SetStateEntryMessage {
bytes key = 1;
bytes value = 3;
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: No
// Type: 0x0800 + 2
message ClearStateEntryMessage {
bytes key = 1;
}

// ------ Syscalls ------

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0C00 + 0
message SleepEntryMessage {
// Wake up time.
// The time is set as duration since UNIX Epoch.
uint64 wake_up_time = 1;

google.protobuf.Empty result = 13;
oneof result {
google.protobuf.Empty empty = 13;
Failure failure = 15;
}
}

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: Yes
// Type: 0x0C00 + 1
message InvokeEntryMessage {
string service_name = 1;
Expand All @@ -163,7 +177,8 @@ message InvokeEntryMessage {
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 2
message BackgroundInvokeEntryMessage {
string service_name = 1;
Expand All @@ -178,7 +193,8 @@ message BackgroundInvokeEntryMessage {
uint64 invoke_time = 4;
}

// Kind: Completable JournalEntry
// Completable: Yes
// Fallible: No
// Type: 0x0C00 + 3
// Awakeables are addressed by an identifier exposed to the user. See the spec for more details.
message AwakeableEntryMessage {
Expand All @@ -188,7 +204,8 @@ message AwakeableEntryMessage {
};
}

// Kind: Non-Completable JournalEntry
// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 4
message CompleteAwakeableEntryMessage {
// Identifier of the awakeable. See the spec for more details.
Expand Down
37 changes: 31 additions & 6 deletions src/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import { Message } from "./types/types";
import { HostedGrpcServiceMethod } from "./types/grpc";
import {
Failure,
PollInputStreamEntryMessage,
StartMessage,
} from "./generated/proto/protocol";
Expand All @@ -37,6 +38,10 @@ enum State {
Complete = 3,
}

type InvocationValue =
| { kind: "value"; value: Buffer }
| { kind: "failure"; failure: Failure };

export class InvocationBuilder<I, O> implements RestateStreamConsumer {
private readonly complete = new CompletablePromise<void>();

Expand All @@ -46,7 +51,7 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
private replayEntries = new Map<number, Message>();
private id?: Buffer = undefined;
private debugId?: string = undefined;
private invocationValue?: Buffer = undefined;
private invocationValue?: InvocationValue = undefined;
private nbEntriesToReplay?: number = undefined;
private localStateStore?: LocalStateStore;

Expand All @@ -67,6 +72,8 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
m
);

this.handlePollInputStreamEntry(m);
this.addReplayEntry(m);
break;

Expand Down Expand Up @@ -100,6 +107,28 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}
}

private handlePollInputStreamEntry(m: Message) {
const pollInputStreamMessage = m.message as PollInputStreamEntryMessage;

if (pollInputStreamMessage.value !== undefined) {
this.invocationValue = {
kind: "value",
value: pollInputStreamMessage.value,
};
} else if (pollInputStreamMessage.failure !== undefined) {
this.invocationValue = {
kind: "failure",
failure: pollInputStreamMessage.failure,
};
} else {
throw new Error(
`PollInputStreamEntry neither contains value nor failure: ${printMessageAsJson(
m
)}`
);
}
}

public handleStreamError(e: Error): void {
this.complete.reject(e);
}
Expand All @@ -120,10 +149,6 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
}

private addReplayEntry(m: Message): InvocationBuilder<I, O> {
if (m.messageType === POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE) {
this.invocationValue = (m.message as PollInputStreamEntryMessage).value;
}

// Will be retrieved when the user code reaches this point
this.replayEntries.set(this.runtimeReplayIndex, m);
this.incrementRuntimeReplayIndex();
Expand Down Expand Up @@ -164,7 +189,7 @@ export class Invocation<I, O> {
public readonly debugId: string,
public readonly nbEntriesToReplay: number,
public readonly replayEntries: Map<number, Message>,
public readonly invocationValue: Buffer,
public readonly invocationValue: InvocationValue,
public readonly localStateStore: LocalStateStore
) {
this.logPrefix = `[${makeFqServiceName(
Expand Down
10 changes: 8 additions & 2 deletions src/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ export class Journal<I, O> {
this.resolveResult(
journalIndex,
journalEntry,
getStateMsg.value || getStateMsg.empty
getStateMsg.value || getStateMsg.empty,
getStateMsg.failure
);
break;
}
Expand All @@ -276,7 +277,12 @@ export class Journal<I, O> {
}
case SLEEP_ENTRY_MESSAGE_TYPE: {
const sleepMsg = replayMessage.message as SleepEntryMessage;
this.resolveResult(journalIndex, journalEntry, sleepMsg.result);
this.resolveResult(
journalIndex,
journalEntry,
sleepMsg.empty,
sleepMsg.failure
);
break;
}
case AWAKEABLE_ENTRY_MESSAGE_TYPE: {
Expand Down
20 changes: 16 additions & 4 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
TerminalError,
RetryableError,
errorToErrorMessage,
failureToTerminalError,
} from "./types/errors";
import { LocalStateStore } from "./local_state_store";

Expand Down Expand Up @@ -211,10 +212,21 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
rlog.debugInvokeMessage(this.invocation.logPrefix, "Invoking function.");
}

const resultBytes: Promise<Uint8Array> = this.invocation.method.invoke(
this.restateContext,
this.invocation.invocationValue
);
let resultBytes: Promise<Uint8Array>;

switch (this.invocation.invocationValue.kind) {
case "value":
resultBytes = this.invocation.method.invoke(
this.restateContext,
this.invocation.invocationValue.value
);
break;
case "failure":
resultBytes = Promise.reject(
failureToTerminalError(this.invocation.invocationValue.failure)
);
break;
}

resultBytes
.then((bytes) => {
Expand Down
4 changes: 4 additions & 0 deletions src/types/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ export function errorToFailureWithTerminal(err: Error): FailureWithTerminal {
});
}

export function failureToTerminalError(failure: Failure): TerminalError {
return failureToError(failure, true) as TerminalError;
}

export function failureToError(
failure: Failure,
terminalError: boolean
Expand Down
25 changes: 25 additions & 0 deletions test/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import * as restate from "../src/public_api";
import { TestDriver } from "./testdriver";
import {
checkJournalMismatchError,
checkTerminalError,
completionMessage,
failure,
getStateMessage,
greetRequest,
greetResponse,
Expand Down Expand Up @@ -106,6 +108,18 @@ describe("GetStringStateGreeter", () => {
]);
});

it("handles completion with failure", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
completionMessage(1, undefined, undefined, failure("Canceled")),
]).run();

expect(result.length).toStrictEqual(2);
expect(result[0]).toStrictEqual(getStateMessage("STATE"));
checkTerminalError(result[1], "Canceled");
});

it("handles replay with value", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
Expand All @@ -129,6 +143,17 @@ describe("GetStringStateGreeter", () => {
outputMessage(greetResponse("Hello nobody")),
]);
});

it("handles replay with failure", async () => {
const result = await new TestDriver(new GetStringStateGreeter(), [
startMessage(),
inputMessage(greetRequest("Till")),
getStateMessage("STATE", undefined, undefined, failure("Canceled")),
]).run();

expect(result.length).toStrictEqual(1);
checkTerminalError(result[0], "Canceled");
});
});

class GetNumberStateGreeter implements TestGreeter {
Expand Down
54 changes: 43 additions & 11 deletions test/protoutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,24 @@ export function toStateEntries(entries: Buffer[][]) {
);
}

export function inputMessage(value: Uint8Array): Message {
return new Message(
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PollInputStreamEntryMessage.create({
value: Buffer.from(value),
})
);
export function inputMessage(value?: Uint8Array, failure?: Failure): Message {
if (failure !== undefined) {
return new Message(
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PollInputStreamEntryMessage.create({
failure: failure,
})
);
} else if (value !== undefined) {
return new Message(
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
PollInputStreamEntryMessage.create({
value: Buffer.from(value),
})
);
} else {
throw new Error("Input message needs either a value or a failure set.");
}
}

export function outputMessage(value?: Uint8Array, failure?: Failure): Message {
Expand Down Expand Up @@ -130,7 +141,8 @@ export function outputMessage(value?: Uint8Array, failure?: Failure): Message {
export function getStateMessage<T>(
key: string,
value?: T,
empty?: boolean
empty?: boolean,
failure?: Failure
): Message {
if (empty === true) {
return new Message(
Expand All @@ -148,6 +160,14 @@ export function getStateMessage<T>(
value: Buffer.from(jsonSerialize(value)),
})
);
} else if (failure !== undefined) {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
GetStateEntryMessage.create({
key: Buffer.from(key),
failure: failure,
})
);
} else {
return new Message(
GET_STATE_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -177,13 +197,25 @@ export function clearStateMessage(key: string): Message {
);
}

export function sleepMessage(wakeupTime: number, result?: Empty): Message {
if (result !== undefined) {
export function sleepMessage(
wakeupTime: number,
empty?: Empty,
failure?: Failure
): Message {
if (empty !== undefined) {
return new Message(
SLEEP_ENTRY_MESSAGE_TYPE,
SleepEntryMessage.create({
wakeUpTime: wakeupTime,
result: result,
empty: empty,
})
);
} else if (failure !== undefined) {
return new Message(
SLEEP_ENTRY_MESSAGE_TYPE,
SleepEntryMessage.create({
wakeUpTime: wakeupTime,
failure: failure,
})
);
} else {
Expand Down
Loading
Loading