Skip to content

Commit cf2767e

Browse files
Getting there with hooks + update both restate sdk and vercel workflow
1 parent b83d447 commit cf2767e

File tree

12 files changed

+694
-692
lines changed

12 files changed

+694
-692
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ dist
55
.DS_Store
66
.next
77
.swc
8-
.well-known
8+
.well-known
9+
.restate

package.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@
3434
"devDependencies": {
3535
"@arethetypeswrong/cli": "^0.18.2",
3636
"@changesets/cli": "^2.29.7",
37-
"@eslint/js": "^9.37.0",
37+
"@eslint/js": "^9.38.0",
3838
"@eslint/json": "^0.13.2",
39-
"@microsoft/api-extractor": "^7.53.1",
40-
"@types/node": "^24.7.2",
41-
"eslint": "^9.37.0",
39+
"@microsoft/api-extractor": "^7.53.3",
40+
"@types/node": "^24.9.1",
41+
"eslint": "^9.38.0",
4242
"globals": "^16.4.0",
4343
"plop": "^4.0.4",
4444
"prettier": "^3.6.2",
45-
"tsdown": "^0.15.6",
45+
"tsdown": "^0.15.10",
4646
"tsx": "^4.20.6",
4747
"turbo": "^2.5.8",
4848
"typescript": "^5.9.3",
49-
"typescript-eslint": "^8.46.0",
49+
"typescript-eslint": "^8.46.2",
5050
"vitest": "^3.2.4"
5151
},
5252
"packageManager": "[email protected]",

packages/examples/workflow/src/workflows/user-signup.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* eslint-disable @typescript-eslint/require-await */
2-
import { sleep } from "workflow";
2+
import { createWebhook, sleep } from "workflow";
33
import { FatalError } from "workflow";
44

55
export async function handleUserSignup(email: string) {
@@ -9,7 +9,8 @@ export async function handleUserSignup(email: string) {
99

1010
await sendWelcomeEmail(user);
1111
await sleep("30s");
12-
await sendOnboardingEmail(user);
12+
const webhook = createWebhook();
13+
await sendOnboardingEmail(user, webhook.url);
1314

1415
return { userId: user.id, status: "onboarded" };
1516
}
@@ -33,7 +34,10 @@ async function sendWelcomeEmail(user: { id: string; email: string }) {
3334
}
3435
}
3536

36-
async function sendOnboardingEmail(user: { id: string; email: string }) {
37+
async function sendOnboardingEmail(
38+
user: { id: string; email: string },
39+
url: string
40+
) {
3741
"use step";
3842

3943
if (!user.email.includes("@")) {
@@ -42,4 +46,5 @@ async function sendOnboardingEmail(user: { id: string; email: string }) {
4246
}
4347

4448
console.log(`Sending onboarding email to user: ${user.id}`);
49+
console.log(`Complete it with webhook: ${url}`);
4550
}

packages/libs/backend/src/services.ts

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,40 +23,41 @@ import {
2323

2424
import {
2525
type CancelWorkflowRunParams,
26+
type CreateEventRequest,
27+
type CreateHookRequest,
28+
type CreateStepRequest,
2629
type CreateWorkflowRunRequest,
30+
type Event,
31+
EventSchema,
32+
type GetHookParams,
33+
type GetStepParams,
2734
type GetWorkflowRunParams,
35+
type Hook,
36+
HookSchema,
37+
ListEventsByCorrelationIdParams,
38+
type ListEventsParams,
2839
type ListWorkflowRunsParams,
40+
ListWorkflowRunStepsParams,
2941
type PauseWorkflowRunParams,
3042
type ResumeWorkflowRunParams,
31-
type UpdateWorkflowRunRequest,
32-
type WorkflowRun,
3343
type Step,
34-
type CreateStepRequest,
35-
type GetStepParams,
44+
StepSchema,
3645
type UpdateStepRequest,
37-
type Hook,
38-
type CreateHookRequest,
39-
type GetHookParams,
40-
type Event,
41-
type CreateEventRequest,
42-
type ListEventsParams,
43-
ListWorkflowRunStepsParams,
44-
ListEventsByCorrelationIdParams,
46+
type UpdateWorkflowRunRequest,
47+
type WorkflowRun,
4548
WorkflowRunSchema,
46-
StepSchema,
47-
EventSchema,
48-
HookSchema,
4949
} from "@workflow/world";
5050
import {
5151
DEFAULT_RESOLVE_DATA_OPTION,
52+
filterEventData,
53+
filterHookData,
5254
filterRunData,
5355
filterStepData,
54-
filterHookData,
55-
filterEventData,
5656
} from "./utils.js";
5757

5858
import { JsonTransport } from "@vercel/queue";
59-
import { QueueParamsSchema, serde } from "@restatedev/common";
59+
import { serde } from "@restatedev/restate-sdk-zod";
60+
import { schemas } from "@restatedev/common";
6061

6162
// key by runId
6263

@@ -591,14 +592,12 @@ export const indexService = service({
591592
async (ctx: Context, param: { token: string }): Promise<Hook> => {
592593
const hookTokens = (await ctx
593594
.objectClient(keyValue, param.token)
594-
.get()) as { runId: string; hookId: string }[];
595+
.get()) as string[];
595596

596597
if (hookTokens === undefined || hookTokens.length === 0) {
597598
throw new TerminalError("No hooks found", { errorCode: 404 });
598599
}
599-
const theHook = hookTokens[0];
600-
const hook = await ctx.objectClient(hooksApi, theHook!.hookId).get({});
601-
return hook;
600+
return await ctx.objectClient(hooksApi, hookTokens[0]!).get({});
602601
}
603602
),
604603

@@ -626,7 +625,7 @@ export const queue = service({
626625
onMaxAttempts: "kill",
627626
},
628627

629-
input: serde.zod(QueueParamsSchema),
628+
input: serde.zod(schemas.QueueParamsSchema),
630629
},
631630
async (ctx: Context, params) => {
632631
let pathname: string;
@@ -650,7 +649,7 @@ export const queue = service({
650649
headers: {
651650
"x-vqs-queue-name": params.queueName,
652651
"x-vqs-message-id": messageId,
653-
"x-vqs-message-attempt": String(1), // TODO: fix this.
652+
"x-vqs-message-attempt": String(params.attempt),
654653
},
655654
}
656655
);
@@ -661,11 +660,14 @@ export const queue = service({
661660
if (response.status === 503) {
662661
const { retryIn } = (await response.json()) as { retryIn: number };
663662

663+
// Increment attempt count
664+
params.attempt += 1;
665+
664666
ctx.serviceSendClient(queue).queue(
665667
params,
666668
rpc.sendOpts({
667669
delay: { seconds: retryIn },
668-
input: serde.zod(QueueParamsSchema),
670+
input: serde.zod(schemas.QueueParamsSchema),
669671
})
670672
);
671673

packages/libs/common/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
"lint": "eslint ."
2929
},
3030
"dependencies": {
31-
"@restatedev/restate-sdk-core": "catalog:",
3231
"@workflow/world": "catalog:",
3332
"zod": "catalog:"
3433
},

packages/libs/common/src/index.ts

Lines changed: 16 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,79 +9,24 @@
99
* TODO: Add repository URL
1010
*/
1111

12-
import type { Serde } from "@restatedev/restate-sdk-core";
1312
import { QueuePayloadSchema } from "@workflow/world";
1413

1514
import * as z4 from "zod/v4";
1615

17-
export type { Serde } from "@restatedev/restate-sdk-core";
18-
19-
class ZodSerde<T extends z4.ZodType> implements Serde<z4.infer<T>> {
20-
contentType? = "application/json";
21-
jsonSchema?: object | undefined;
22-
23-
constructor(private readonly schema: T) {
24-
if ("_zod" in schema) {
25-
this.jsonSchema = z4.toJSONSchema(schema, {
26-
unrepresentable: "any",
27-
});
28-
}
29-
if (schema instanceof z4.ZodVoid || schema instanceof z4.ZodUndefined) {
30-
this.contentType = undefined;
31-
}
32-
}
33-
34-
serialize(value: z4.infer<T>): Uint8Array {
35-
if (value === undefined) {
36-
return new Uint8Array(0);
37-
}
38-
return new TextEncoder().encode(JSON.stringify(value));
39-
}
40-
41-
deserialize(data: Uint8Array): z4.infer<T> {
42-
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
43-
const js =
44-
data.length === 0
45-
? undefined
46-
: JSON.parse(new TextDecoder().decode(data));
47-
if (
48-
"safeParse" in this.schema &&
49-
typeof this.schema.safeParse === "function"
50-
) {
51-
const res = this.schema.safeParse(js);
52-
if (res.success) {
53-
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
54-
return res.data;
55-
}
56-
throw res.error;
57-
} else {
58-
throw new TypeError("Unsupported data type. Expected 'safeParse'.");
59-
}
60-
}
16+
/** Bunch of zod schemas we need **/
17+
export namespace schemas {
18+
export const QueueParamsSchema = z4.object({
19+
deliverTo: z4.string(),
20+
queueName: z4.string(),
21+
attempt: z4.int(),
22+
message: QueuePayloadSchema,
23+
opts: z4
24+
.object({
25+
deploymentId: z4.string().optional(),
26+
idempotencyKey: z4.string().optional(),
27+
})
28+
.optional(),
29+
});
30+
31+
export type QueueParams = z4.infer<typeof QueueParamsSchema>;
6132
}
62-
63-
export namespace serde {
64-
/**
65-
* A Zod-based serde.
66-
*
67-
* @param zodType the zod type
68-
* @returns a serde that will validate the data with the zod schema
69-
*/
70-
export const zod = <T extends z4.ZodType>(zodType: T): Serde<z4.infer<T>> => {
71-
return new ZodSerde(zodType);
72-
};
73-
}
74-
75-
export const QueueParamsSchema = z4.object({
76-
deliverTo: z4.string(),
77-
queueName: z4.string(),
78-
message: QueuePayloadSchema,
79-
opts: z4
80-
.object({
81-
deploymentId: z4.string().optional(),
82-
idempotencyKey: z4.string().optional(),
83-
})
84-
.optional(),
85-
});
86-
87-
export type QueueParams = z4.infer<typeof QueueParamsSchema>;

packages/libs/world/src/auth.ts

Lines changed: 0 additions & 20 deletions
This file was deleted.

packages/libs/world/src/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { World } from "@workflow/world";
1414
import { createQueue } from "./queue.js";
1515
import { createStorage } from "./storage.js";
1616
import { createStreamer } from "./streamer.js";
17-
import { auth } from "./auth.js";
1817

1918
export function createWorld(args?: {
2019
opts: ConnectionOpts;
@@ -28,7 +27,6 @@ export function createWorld(args?: {
2827
...createQueue(client, deliverTo),
2928
...createStorage(client),
3029
...createStreamer(client),
31-
...auth,
3230
start: async () => {
3331
// TODO: verify subscription
3432
},

packages/libs/world/src/queue.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { MessageId, Queue, ValidQueueName } from "@workflow/world";
22
import { JsonTransport } from "@vercel/queue";
33
import z from "zod/v4";
44
import { Ingress, rpc } from "@restatedev/restate-sdk-clients";
5+
import { serde } from "@restatedev/restate-sdk-zod";
56
import { QueueService } from "@restatedev/backend";
6-
import { QueueParamsSchema, serde } from "@restatedev/common";
7+
import { schemas } from "@restatedev/common";
78

89
const HeaderParser = z.object({
910
"x-vqs-queue-name": ValidQueueName,
@@ -22,8 +23,12 @@ export function createQueue(client: Ingress, deliverTo: string): Queue {
2223
deliverTo,
2324
queueName: name,
2425
message: body,
26+
attempt: 1,
2527
},
26-
rpc.sendOpts({ idempotencyKey, input: serde.zod(QueueParamsSchema) })
28+
rpc.sendOpts({
29+
idempotencyKey,
30+
input: serde.zod(schemas.QueueParamsSchema),
31+
})
2732
);
2833

2934
const messageId = res.invocationId as MessageId;
@@ -34,7 +39,6 @@ export function createQueue(client: Ingress, deliverTo: string): Queue {
3439
const createQueueHandler: Queue["createQueueHandler"] = (prefix, handler) => {
3540
return async (req) => {
3641
const headers = HeaderParser.safeParse(Object.fromEntries(req.headers));
37-
3842
if (!headers.success || !req.body) {
3943
return Response.json(
4044
{ error: "Missing required headers" },
@@ -52,16 +56,20 @@ export function createQueue(client: Ingress, deliverTo: string): Queue {
5256

5357
const body = await new JsonTransport().deserialize(req.body);
5458
try {
59+
// Run the queue handler
5560
const response = await handler(body, { attempt, queueName, messageId });
5661
const retryIn =
5762
typeof response === "undefined" ? null : response.timeoutSeconds;
5863

64+
// 503 is used to notify to retry later
5965
if (retryIn) {
6066
return Response.json({ retryIn }, { status: 503 });
6167
}
6268

69+
// Processed
6370
return Response.json({ ok: true });
6471
} catch (error) {
72+
// Some error happened
6573
return Response.json(String(error), { status: 500 });
6674
}
6775
};

0 commit comments

Comments
 (0)