Skip to content

Commit

Permalink
Introduce EndMessage (#216)
Browse files Browse the repository at this point in the history
* Update protocol.proto

* Introduce EndMessage

* Update tests
  • Loading branch information
slinkydeveloper authored Dec 21, 2023
1 parent 1785bd4 commit a5c8aaf
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 54 deletions.
15 changes: 10 additions & 5 deletions proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ message CompletionMessage {
};
}

// Type: 0x0000 + 4
message EntryAckMessage {
uint32 entry_index = 1;
}

// Type: 0x0000 + 2
// Implementations MUST send this message when suspending an invocation.
message SuspensionMessage {
Expand All @@ -83,6 +78,16 @@ message ErrorMessage {
string description = 3;
}

// Type: 0x0000 + 4
message EntryAckMessage {
uint32 entry_index = 1;
}

// Type: 0x0000 + 5
// Implementations MUST send this message when the invocation lifecycle ends.
message EndMessage {
}

// --- Journal Entries ---

// Every Completable JournalEntry has a result field, filled only and only if the entry is in DONE state.
Expand Down
14 changes: 11 additions & 3 deletions src/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { rlog } from "./utils/logger";
import { clearTimeout } from "timers";
import {
COMPLETION_MESSAGE_TYPE,
END_MESSAGE_TYPE,
EndMessage,
ENTRY_ACK_MESSAGE_TYPE,
ERROR_MESSAGE_TYPE,
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
Expand Down Expand Up @@ -282,6 +284,9 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
"Function completed successfully."
);

// Mark the end of the invocation
this.send(new Message(END_MESSAGE_TYPE, EndMessage.create()));

this.finish(value);
} catch (e) {
this.unhandledError(ensureError(e));
Expand All @@ -303,7 +308,7 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
"Function completed with an error: " + error.message
);

this.finishWithError(error);
this.sendErrorAndFinish(error);
} catch (ee) {
this.unhandledError(ensureError(ee));
}
Expand All @@ -314,7 +319,7 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
return this.invocationComplete.promise;
}

private async finishWithError(e: Error) {
private async sendErrorAndFinish(e: Error) {
if (e instanceof TerminalError) {
this.sendTerminalError(e);
} else {
Expand Down Expand Up @@ -354,6 +359,9 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
if (!this.journal.outputMsgWasReplayed()) {
this.send(msg);
}

// Mark the end of the invocation
this.send(new Message(END_MESSAGE_TYPE, EndMessage.create()));
}

private send(message: Message) {
Expand Down Expand Up @@ -464,7 +472,7 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
}

public async notifyHandlerExecutionError(e: RetryableError | TerminalError) {
await this.finishWithError(e);
await this.sendErrorAndFinish(e);
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/types/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
CompletionMessage,
EntryAckMessage,
ErrorMessage,
EndMessage,
GetStateEntryMessage,
InvokeEntryMessage,
OutputStreamEntryMessage,
Expand All @@ -36,6 +37,7 @@ export {
CompleteAwakeableEntryMessage,
CompletionMessage,
ErrorMessage,
EndMessage,
GetStateEntryMessage,
InvokeEntryMessage,
OutputStreamEntryMessage,
Expand All @@ -53,6 +55,7 @@ export const COMPLETION_MESSAGE_TYPE = 0x0001n;
export const SUSPENSION_MESSAGE_TYPE = 0x0002n;
export const ERROR_MESSAGE_TYPE = 0x0003n;
export const ENTRY_ACK_MESSAGE_TYPE = 0x0004n;
export const END_MESSAGE_TYPE = 0x0005n;
export const POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0400n;
export const OUTPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0401n;
export const GET_STATE_ENTRY_MESSAGE_TYPE = 0x0800n;
Expand All @@ -78,6 +81,7 @@ export const KNOWN_MESSAGE_TYPES = new Set([
SUSPENSION_MESSAGE_TYPE,
ERROR_MESSAGE_TYPE,
ENTRY_ACK_MESSAGE_TYPE,
END_MESSAGE_TYPE,
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
GET_STATE_ENTRY_MESSAGE_TYPE,
Expand All @@ -97,6 +101,7 @@ export const PROTOBUF_MESSAGE_NAME_BY_TYPE = new Map<bigint, string>([
[SUSPENSION_MESSAGE_TYPE, "SuspensionMessage"],
[ERROR_MESSAGE_TYPE, "ErrorMessage"],
[ENTRY_ACK_MESSAGE_TYPE, "EntryAckMessage"],
[END_MESSAGE_TYPE, "EndMessage"],
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, "PollInputStreamEntryMessage"],
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, "OutputStreamEntryMessage"],
[GET_STATE_ENTRY_MESSAGE_TYPE, "GetStateEntryMessage"],
Expand All @@ -117,6 +122,7 @@ const PROTOBUF_MESSAGES: Array<[bigint, any]> = [
[SUSPENSION_MESSAGE_TYPE, SuspensionMessage],
[ERROR_MESSAGE_TYPE, ErrorMessage],
[ENTRY_ACK_MESSAGE_TYPE, EntryAckMessage],
[END_MESSAGE_TYPE, EndMessage],
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, PollInputStreamEntryMessage],
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, OutputStreamEntryMessage],
[GET_STATE_ENTRY_MESSAGE_TYPE, GetStateEntryMessage],
Expand All @@ -138,6 +144,7 @@ export type ProtocolMessage =
| SuspensionMessage
| ErrorMessage
| EntryAckMessage
| EndMessage
| PollInputStreamEntryMessage
| OutputStreamEntryMessage
| GetStateEntryMessage
Expand Down
11 changes: 9 additions & 2 deletions test/awakeable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
outputMessage,
startMessage,
suspensionMessage,
END_MESSAGE,
} from "./protoutils";
import { TestGreeter, TestResponse } from "../src/generated/proto/test";
import { ProtocolMode } from "../src/generated/proto/discovery";
Expand Down Expand Up @@ -74,6 +75,7 @@ describe("AwakeableGreeter", () => {
expect(result).toStrictEqual([
awakeableMessage(),
outputMessage(greetResponse(`Hello Francesco for ${getAwakeableId(1)}`)),
END_MESSAGE,
]);
});

Expand All @@ -87,6 +89,7 @@ describe("AwakeableGreeter", () => {
expect(result).toStrictEqual([
awakeableMessage(),
outputMessage(greetResponse(`Hello for ${getAwakeableId(1)}`)),
END_MESSAGE,
]);
});

Expand All @@ -102,6 +105,7 @@ describe("AwakeableGreeter", () => {
outputMessage(
greetResponse(`Hello [object Object] for ${getAwakeableId(1)}`)
),
END_MESSAGE,
]);
});

Expand All @@ -117,9 +121,10 @@ describe("AwakeableGreeter", () => {
),
]).run();

expect(result.length).toStrictEqual(2);
expect(result.length).toStrictEqual(3);
expect(result[0]).toStrictEqual(awakeableMessage());
checkTerminalError(result[1], "Something went wrong");
expect(result[2]).toStrictEqual(END_MESSAGE);
});

it("handles replay with value", async () => {
Expand All @@ -131,6 +136,7 @@ describe("AwakeableGreeter", () => {

expect(result).toStrictEqual([
outputMessage(greetResponse(`Hello Francesco for ${getAwakeableId(1)}`)),
END_MESSAGE,
]);
});

Expand All @@ -141,8 +147,9 @@ describe("AwakeableGreeter", () => {
awakeableMessage(undefined, failure("Something went wrong")),
]).run();

expect(result.length).toStrictEqual(1);
expect(result.length).toStrictEqual(2);
checkTerminalError(result[0], "Something went wrong");
expect(result[1]).toStrictEqual(END_MESSAGE);
});

it("fails on journal mismatch. Completed with CompleteAwakeable during replay.", async () => {
Expand Down
14 changes: 12 additions & 2 deletions test/complete_awakeable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
outputMessage,
startMessage,
rejectAwakeableMessage,
END_MESSAGE,
} from "./protoutils";
import { describe, expect } from "@jest/globals";
import { TestDriver } from "./testdriver";
Expand Down Expand Up @@ -49,6 +50,7 @@ describe("ResolveAwakeableGreeter", () => {
expect(result).toStrictEqual([
resolveAwakeableMessage(getAwakeableId(1), "hello"),
outputMessage(greetResponse("Hello")),
END_MESSAGE,
]);
});

Expand All @@ -61,6 +63,7 @@ describe("ResolveAwakeableGreeter", () => {
expect(result).toStrictEqual([
resolveAwakeableMessage(getAwakeableId(1), ""),
outputMessage(greetResponse("Hello")),
END_MESSAGE,
]);
});

Expand All @@ -71,7 +74,10 @@ describe("ResolveAwakeableGreeter", () => {
resolveAwakeableMessage(getAwakeableId(1), "hello"),
]).run();

expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]);
expect(result).toStrictEqual([
outputMessage(greetResponse("Hello")),
END_MESSAGE,
]);
});

it("handles replay with value empty string", async () => {
Expand All @@ -81,7 +87,10 @@ describe("ResolveAwakeableGreeter", () => {
resolveAwakeableMessage(getAwakeableId(1), ""),
]).run();

expect(result).toStrictEqual([outputMessage(greetResponse("Hello"))]);
expect(result).toStrictEqual([
outputMessage(greetResponse("Hello")),
END_MESSAGE,
]);
});

it("fails on journal mismatch. Completed with invoke during replay.", async () => {
Expand Down Expand Up @@ -138,6 +147,7 @@ describe("RejectAwakeableGreeter", () => {
expect(result).toStrictEqual([
rejectAwakeableMessage(getAwakeableId(1), "my bad error"),
outputMessage(greetResponse("Hello")),
END_MESSAGE,
]);
});
});
15 changes: 14 additions & 1 deletion test/eager_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { TestDriver } from "./testdriver";
import {
clearStateMessage,
completionMessage,
END_MESSAGE,
getStateMessage,
greetRequest,
greetResponse,
Expand Down Expand Up @@ -55,6 +56,7 @@ describe("GetEmpty", () => {
expect(result).toStrictEqual([
getStateMessage("STATE", undefined, true),
outputMessage(greetResponse("true")),
END_MESSAGE,
]);
});

Expand All @@ -78,7 +80,10 @@ describe("GetEmpty", () => {
ProtocolMode.BIDI_STREAM
).run();

expect(result).toStrictEqual([outputMessage(greetResponse("true"))]);
expect(result).toStrictEqual([
outputMessage(greetResponse("true")),
END_MESSAGE,
]);
});
});

Expand All @@ -103,6 +108,7 @@ describe("Get", () => {
expect(result).toStrictEqual([
getStateMessage("STATE", "One"),
outputMessage(greetResponse("One")),
END_MESSAGE,
]);
});

Expand All @@ -116,6 +122,7 @@ describe("Get", () => {
expect(result).toStrictEqual([
getStateMessage("STATE", "One"),
outputMessage(greetResponse("One")),
END_MESSAGE,
]);
});

Expand Down Expand Up @@ -158,6 +165,7 @@ describe("GetAppendAndGet", () => {
setStateMessage("STATE", "OneTwo"),
getStateMessage("STATE", "OneTwo"),
outputMessage(greetResponse("OneTwo")),
END_MESSAGE,
]);
});

Expand All @@ -173,6 +181,7 @@ describe("GetAppendAndGet", () => {
setStateMessage("STATE", "OneTwo"),
getStateMessage("STATE", "OneTwo"),
outputMessage(greetResponse("OneTwo")),
END_MESSAGE,
]);
});
});
Expand Down Expand Up @@ -202,6 +211,7 @@ describe("GetClearAndGet", () => {
clearStateMessage("STATE"),
getStateMessage("STATE", undefined, true),
outputMessage(greetResponse("One-nothing")),
END_MESSAGE,
]);
});

Expand All @@ -217,6 +227,7 @@ describe("GetClearAndGet", () => {
clearStateMessage("STATE"),
getStateMessage("STATE", undefined, true),
outputMessage(greetResponse("One-nothing")),
END_MESSAGE,
]);
});
});
Expand Down Expand Up @@ -249,6 +260,7 @@ describe("MultipleGet", () => {
getStateMessage("STATE", "One"),
getStateMessage("STATE", "One"),
outputMessage(greetResponse("One - One - One")),
END_MESSAGE,
]);
});

Expand All @@ -264,6 +276,7 @@ describe("MultipleGet", () => {
getStateMessage("STATE", "One"),
getStateMessage("STATE", "One"),
outputMessage(greetResponse("One - One - One")),
END_MESSAGE,
]);
});
});
Loading

0 comments on commit a5c8aaf

Please sign in to comment.