Skip to content

Commit

Permalink
Fix history stream
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Jul 17, 2023
1 parent 7a9b972 commit 49df211
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 55 deletions.
16 changes: 3 additions & 13 deletions backends/durableObjects/db.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import Emittery from "emittery";
import { PromiseOrValue } from "../../promise.ts";
import { HistoryEvent } from "../../runtime/core/events.ts";
import { secondsFromNow } from "../../utils.ts";
import {
DB,
Execution,
PaginationParams,
WorkflowExecution,
} from "../backend.ts";
import { secondsFromNow } from "../../utils.ts";

export interface GateOptions {
allowUnconfirmed?: boolean;
Expand Down Expand Up @@ -85,7 +84,6 @@ const sortHistoryEventByDate = (

export const durableExecution = (
db: DurableObjectTransaction | DurableObjectStorage,
historyStream: Emittery<{ "history": HistoryEvent[] }>,
gateOpts: GateOptions = { allowUnconfirmed: false },
) => {
const executions = useSingleton<WorkflowExecution>(
Expand All @@ -104,8 +102,7 @@ export const durableExecution = (
gateOpts,
);
return {
withGateOpts: (gateOpts: GateOptions) =>
durableExecution(db, historyStream, gateOpts),
withGateOpts: (gateOpts: GateOptions) => durableExecution(db, gateOpts),
get: executions.get.bind(executions),
create: executions.put.bind(executions),
update: executions.put.bind(executions),
Expand Down Expand Up @@ -194,12 +191,6 @@ export const durableExecution = (
},
history: {
...history,
add: async (...events: HistoryEvent[]) => {
return history.add(...events).then((r) => {
historyStream.emit("history", events);
return r;
});
},
get: async (pagination?: PaginationParams) => {
const reverse =
(pagination?.page ?? pagination?.pageSize) !== undefined;
Expand All @@ -225,7 +216,7 @@ export const durableExecution = (
if (!isDurableObjStorage(db)) {
throw new Error("cannot create inner transactions");
}
return await f(durableExecution(db, historyStream, gateOpts));
return await f(durableExecution(db, gateOpts));
},
};
};
Expand All @@ -243,7 +234,6 @@ export const dbFor = (
execution: (_executionId: string) => {
return durableExecution(
db,
new Emittery<{ "history": HistoryEvent[] }>(),
);
},
pendingExecutions: () => {
Expand Down
68 changes: 26 additions & 42 deletions src/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import Emittery from "emittery";
import { PaginationParams, WorkflowExecution } from "../backends/backend.ts";
import { durableExecution } from "../backends/durableObjects/db.ts";
import { PromiseOrValue } from "../promise.ts";
Expand Down Expand Up @@ -68,53 +67,41 @@ export const buildRoutes = (wkflow: Workflow): Routes => {
"GET": async (_req: Request) => {
const search = new URL(_req.url).searchParams;
if (search.has("stream")) {
const eventStream = wkflow.historyStream.anyEvent();
const sentEvents: Record<string, boolean> = {};
const eventStream = (async function* () {
while (true) {
if (_req.signal.aborted) {
break;
}
const [currentHistory, isCompleted] = await Promise.all([
wkflow.history(),
wkflow.isCompleted(),
]);
for (const event of currentHistory) {
const sent = sentEvents[event.id];
sentEvents[event.id] = true;
if (!sent) {
yield event;
}
}
if (isCompleted) {
break;
}
await new Promise((resolve) => setTimeout(resolve, 2000));
}
})();

if (!eventStream) {
return new Response(null, { status: 204 });
}
_req.signal.onabort = () => {
eventStream.return?.();
};
const { readable, writable } = new TransformStream();
(async () => {
const encoder = new TextEncoder();
const [currentHistory, isCompleted] = await Promise.all([
wkflow.history(),
wkflow.isCompleted(),
]);
const writer = writable.getWriter();
try {
if (currentHistory && currentHistory.length > 0) {
await writer.write(
encoder.encode(JSON.stringify(currentHistory)),
);
}
if (
isCompleted
) {
return;
}
let sentEvents: Record<string, boolean> = {};

for (const event of currentHistory) {
sentEvents[event.id] = true;
}
const withoutSentEvents = (event: HistoryEvent): boolean => {
const sent = sentEvents[event.id];
sentEvents[event.id] = true;
return !sent;
};

for await (const events of eventStream) {
for await (const event of eventStream) {
await writer.write(
encoder.encode(
JSON.stringify(events[1].filter(withoutSentEvents)),
JSON.stringify(event),
),
);
if (await wkflow.isCompleted()) {
return;
}
}
} finally {
try {
Expand Down Expand Up @@ -177,14 +164,12 @@ export class Workflow {
execution: ReturnType<typeof durableExecution>;
handler: (allowUnconfirmed?: boolean) => Promise<void>;
router: Handler;
historyStream: Emittery<{ "history": HistoryEvent[] }>;

constructor(state: DurableObjectState, env: Env) {
setFromString(env.WORKER_PUBLIC_KEY, env.WORKER_PRIVATE_KEY);
this.state = state;
this.historyStream = new Emittery<{ "history": HistoryEvent[] }>();
this.handler = async () => {};
this.execution = durableExecution(this.state.storage, this.historyStream);
this.execution = durableExecution(this.state.storage);
this.router = router(buildRoutes(this));
this.state.blockConcurrencyWhile(async () => {
const [registry] = await Promise.all([
Expand All @@ -195,7 +180,6 @@ export class Workflow {
return runWorkflow(
durableExecution(
this.state.storage,
this.historyStream,
{ allowUnconfirmed },
),
registry,
Expand Down

0 comments on commit 49df211

Please sign in to comment.