Skip to content

Commit

Permalink
Deterministic promise combinators (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Jan 30, 2024
1 parent 9e0be25 commit 309ccc9
Show file tree
Hide file tree
Showing 17 changed files with 1,363 additions and 188 deletions.
7 changes: 7 additions & 0 deletions proto/javascript.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@ message SideEffectEntryMessage {
bytes value = 14;
FailureWithTerminal failure = 15;
};
}

// Type: 0xFC00 + 2
message CombinatorEntryMessage {
int32 combinator_id = 1;

repeated int32 journal_entries_order = 2;
}
3 changes: 2 additions & 1 deletion src/invocation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
PollInputStreamEntryMessage,
StartMessage,
} from "./generated/proto/protocol";
import { CompletablePromise, formatMessageAsJson } from "./utils/utils";
import { formatMessageAsJson } from "./utils/utils";
import {
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
START_MESSAGE_TYPE,
Expand All @@ -27,6 +27,7 @@ import { RestateStreamConsumer } from "./connection/connection";
import { LocalStateStore } from "./local_state_store";
import { ensureError } from "./types/errors";
import { LoggerContext } from "./logger";
import { CompletablePromise } from "./utils/promises";

enum State {
ExpectingStart = 0,
Expand Down
80 changes: 43 additions & 37 deletions src/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
AwakeableEntryMessage,
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
COMBINATOR_ENTRY_MESSAGE,
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
CompletionMessage,
EntryAckMessage,
Expand All @@ -39,6 +40,7 @@ import { Message } from "./types/types";
import { SideEffectEntryMessage } from "./generated/proto/javascript";
import { Invocation } from "./invocation";
import { failureToError, RetryableError } from "./types/errors";
import { CompletablePromise } from "./utils/promises";

const RESOLVED = Promise.resolve(undefined);

Expand Down Expand Up @@ -99,7 +101,7 @@ export class Journal<I, O> {

const journalEntry = new JournalEntry(messageType, message);
this.handleReplay(this.userCodeJournalIndex, replayEntry, journalEntry);
return journalEntry.promise;
return journalEntry.completablePromise.promise;
}
case NewExecutionState.PROCESSING: {
switch (messageType) {
Expand All @@ -109,7 +111,7 @@ export class Journal<I, O> {
messageType,
message as p.SuspensionMessage | p.OutputStreamEntryMessage
);
return Promise.resolve(undefined);
return RESOLVED;
}
case p.SET_STATE_ENTRY_MESSAGE_TYPE:
case p.CLEAR_STATE_ENTRY_MESSAGE_TYPE:
Expand All @@ -128,22 +130,11 @@ export class Journal<I, O> {
return Promise.resolve(getStateMsg.value || getStateMsg.empty);
} else {
// Need to retrieve state by going to the runtime.
const journalEntry = new JournalEntry(messageType, message);
this.pendingJournalEntries.set(
this.userCodeJournalIndex,
journalEntry
);
return journalEntry.promise;
return this.appendJournalEntry(messageType, message);
}
}
default: {
// Need completion
const journalEntry = new JournalEntry(messageType, message);
this.pendingJournalEntries.set(
this.userCodeJournalIndex,
journalEntry
);
return journalEntry.promise;
return this.appendJournalEntry(messageType, message);
}
}
}
Expand All @@ -153,7 +144,7 @@ export class Journal<I, O> {
// So no more user messages can come in...
// - Print warning log and continue...
//TODO received user-side message but state machine is closed
return Promise.resolve(undefined);
return RESOLVED;
}
default: {
throw RetryableError.protocolViolation(
Expand All @@ -178,18 +169,18 @@ export class Journal<I, O> {
}

if (m.value !== undefined) {
journalEntry.resolve(m.value);
journalEntry.completablePromise.resolve(m.value);
this.pendingJournalEntries.delete(m.entryIndex);
} else if (m.failure !== undefined) {
// we do all completions with Terminal Errors, because failures triggered by those exceptions
// when the bubble up would otherwise lead to re-tries, deterministic replay, re-throwing, and
// thus an infinite loop that keeps replay-ing but never makes progress
// these failures here consequently need to cause terminal failures, unless caught and handled
// by the handler code
journalEntry.reject(failureToError(m.failure, true));
journalEntry.completablePromise.reject(failureToError(m.failure, true));
this.pendingJournalEntries.delete(m.entryIndex);
} else if (m.empty !== undefined) {
journalEntry.resolve(m.empty);
journalEntry.completablePromise.resolve(m.empty);
this.pendingJournalEntries.delete(m.entryIndex);
} else {
//TODO completion message without a value/failure/empty
Expand All @@ -205,7 +196,7 @@ export class Journal<I, O> {
}

// Just needs an ack
journalEntry.resolve(undefined);
journalEntry.completablePromise.resolve(undefined);
this.pendingJournalEntries.delete(m.entryIndex);
}

Expand Down Expand Up @@ -314,17 +305,18 @@ export class Journal<I, O> {
} else {
// A side effect can have a void return type
// If it was replayed, then it is acked, so we should resolve it.
journalEntry.resolve(undefined);
journalEntry.completablePromise.resolve(undefined);
this.pendingJournalEntries.delete(journalIndex);
}
break;
}
case SET_STATE_ENTRY_MESSAGE_TYPE:
case CLEAR_STATE_ENTRY_MESSAGE_TYPE:
case COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE:
case BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE: {
case BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE:
case COMBINATOR_ENTRY_MESSAGE: {
// Do not need a completion. So if the match has passed then the entry can be deleted.
journalEntry.resolve(undefined);
journalEntry.completablePromise.resolve(undefined);
this.pendingJournalEntries.delete(journalIndex);
break;
}
Expand All @@ -342,11 +334,11 @@ export class Journal<I, O> {
failureWouldBeTerminal?: boolean
) {
if (value !== undefined) {
journalEntry.resolve(value);
journalEntry.completablePromise.resolve(value);
this.pendingJournalEntries.delete(journalIndex);
} else if (failure !== undefined) {
const error = failureToError(failure, failureWouldBeTerminal ?? true);
journalEntry.reject(error);
journalEntry.completablePromise.reject(error);
this.pendingJournalEntries.delete(journalIndex);
} else {
this.pendingJournalEntries.set(journalIndex, journalEntry);
Expand All @@ -369,7 +361,9 @@ export class Journal<I, O> {
}

this.pendingJournalEntries.delete(0);
rootJournalEntry.resolve(new Message(messageType, message));
rootJournalEntry.completablePromise.resolve(
new Message(messageType, message)
);
}

private checkJournalMatch(
Expand Down Expand Up @@ -412,7 +406,7 @@ export class Journal<I, O> {
}
}

private incrementUserCodeIndex() {
incrementUserCodeIndex() {
this.userCodeJournalIndex++;
if (
this.userCodeJournalIndex === this.invocation.nbEntriesToReplay &&
Expand All @@ -422,6 +416,26 @@ export class Journal<I, O> {
}
}

/**
* Read the next replay entry
*/
public readNextReplayEntry() {
this.incrementUserCodeIndex();
return this.invocation.replayEntries.get(this.userCodeJournalIndex);
}

/**
* Append journal entry. This won't increment the journal index.
*/
public appendJournalEntry(
messageType: bigint,
message: p.ProtocolMessage | Uint8Array
): Promise<unknown> {
const journalEntry = new JournalEntry(messageType, message);
this.pendingJournalEntries.set(this.userCodeJournalIndex, journalEntry);
return journalEntry.completablePromise.promise;
}

public isClosed(): boolean {
return this.state === NewExecutionState.CLOSED;
}
Expand Down Expand Up @@ -464,22 +478,14 @@ export class Journal<I, O> {
}

export class JournalEntry {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public promise: Promise<any>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public resolve!: (value: any) => void;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public reject!: (reason?: any) => void;
public completablePromise: CompletablePromise<unknown>;

constructor(
readonly messageType: bigint,
readonly message: p.ProtocolMessage | Uint8Array
) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.promise = new Promise<any>((res, rej) => {
this.resolve = res;
this.reject = rej;
});
this.completablePromise = new CompletablePromise<any>();
}
}

Expand Down
Loading

0 comments on commit 309ccc9

Please sign in to comment.