Skip to content

Commit ad46484

Browse files
Progress
1 parent 2a84430 commit ad46484

File tree

3 files changed

+88
-20
lines changed

3 files changed

+88
-20
lines changed

packages/libs/backend/src/services.ts

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ import {
4949
WorkflowRunSchema,
5050
} from "@workflow/world";
5151
import {
52-
DEFAULT_RESOLVE_DATA_OPTION,
5352
filterEventData,
5453
filterHookData,
5554
filterRunData,
@@ -412,7 +411,10 @@ export const workflow = object({
412411
ctx.objectSendClient(keyValue, data.correlationId).append(ctx.key);
413412
}
414413

415-
return filterEventData(result, data.resolveData);
414+
return filterEventData(
415+
result,
416+
/* There is an issue with zod schemas here, if none is used then the schema won't allow to parse the output here because eventData is not optional! */ "all"
417+
);
416418
}
417419
),
418420

@@ -450,7 +452,12 @@ export const workflow = object({
450452
(a.createdAt.getTime() - b.createdAt.getTime()) * orderingSign
451453
)
452454
// Filter data
453-
.map((event) => filterEventData(event, params.resolveData))
455+
.map((event) =>
456+
filterEventData(
457+
event,
458+
/* There is an issue with zod schemas here, if none is used then the schema won't allow to parse the output here because eventData is not optional! */ "all"
459+
)
460+
)
454461
);
455462
}
456463
),
@@ -635,7 +642,10 @@ export const index = service({
635642
for (const runId of runIds || []) {
636643
const events = await ctx
637644
.objectClient(workflow, runId)
638-
.listEvents({ resolveData: param.resolveData }, rpc.opts({output: serde.zod(EventSchema.array())}));
645+
.listEvents(
646+
{ resolveData: param.resolveData },
647+
rpc.opts({ output: serde.zod(EventSchema.array()) })
648+
);
639649

640650
matchingEvents.push(
641651
...events.filter(
@@ -662,10 +672,13 @@ export const index = service({
662672
if (runId === undefined || runId.length === 0) {
663673
throw new TerminalError("No hooks found", { errorCode: 404 });
664674
}
665-
return await ctx.objectClient(workflow, runId[0]!).getHook({
666-
hookId: param.hookId,
667-
resolveData: param.resolveData,
668-
}, rpc.opts({output: serde.zod(HookSchema)}));
675+
return await ctx.objectClient(workflow, runId[0]!).getHook(
676+
{
677+
hookId: param.hookId,
678+
resolveData: param.resolveData,
679+
},
680+
rpc.opts({ output: serde.zod(HookSchema) })
681+
);
669682
}
670683
),
671684

@@ -686,10 +699,13 @@ export const index = service({
686699
}
687700
return await ctx
688701
.objectClient(workflow, runIdAndHookId[0]!.runId)
689-
.getHook({
690-
hookId: runIdAndHookId[0]!.hookId,
691-
resolveData: param.resolveData,
692-
}, rpc.opts({output: serde.zod(HookSchema)}));
702+
.getHook(
703+
{
704+
hookId: runIdAndHookId[0]!.hookId,
705+
resolveData: param.resolveData,
706+
},
707+
rpc.opts({ output: serde.zod(HookSchema) })
708+
);
693709
}
694710
),
695711

@@ -710,9 +726,12 @@ export const index = service({
710726
(
711727
await RestatePromise.all(
712728
runIds.map((runId) =>
713-
ctx.objectClient(workflow, runId).getRun({
714-
resolveData: params.resolveData,
715-
}, rpc.opts({output: serde.zod(WorkflowRunSchema)}))
729+
ctx.objectClient(workflow, runId).getRun(
730+
{
731+
resolveData: params.resolveData,
732+
},
733+
rpc.opts({ output: serde.zod(WorkflowRunSchema) })
734+
)
716735
)
717736
)
718737
)
@@ -733,6 +752,43 @@ export const index = service({
733752
);
734753
}
735754
),
755+
756+
listHooks: createServiceHandler(
757+
{
758+
output: serde.zod(HookSchema.array()),
759+
},
760+
async (
761+
ctx: Context,
762+
params: Omit<ListHooksParams, "runId"> // This API is for the aggregated search of hooks
763+
): Promise<Hook[]> => {
764+
const runIds = (await ctx
765+
.objectClient(keyValue, "workflows")
766+
.get()) as string[];
767+
768+
const orderingSign = params.pagination?.sortOrder === "desc" ? -1 : 1;
769+
return (
770+
(
771+
await RestatePromise.all(
772+
runIds.map((runId) =>
773+
ctx.objectClient(workflow, runId).listHooks(
774+
{
775+
resolveData: params.resolveData,
776+
pagination: params.pagination,
777+
},
778+
rpc.opts({ output: serde.zod(HookSchema.array()) })
779+
)
780+
)
781+
)
782+
)
783+
.flatMap((l) => l)
784+
// Sort as requested
785+
.sort(
786+
(a, b) =>
787+
(a.createdAt.getTime() - b.createdAt.getTime()) * orderingSign
788+
)
789+
);
790+
}
791+
),
736792
},
737793
options: {
738794
// No need to have journal retention for this service, as it just performs reads

packages/libs/backend/src/utils.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import {
66
} from "@restatedev/restate-sdk";
77
import { JsonTransport } from "@vercel/queue";
88

9-
export const DEFAULT_RESOLVE_DATA_OPTION = "all";
10-
119
// Helper functions to filter data based on resolveData setting
1210
export function filterRunData(
1311
run: WorkflowRun,

packages/libs/world/src/storage.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,23 @@ const createStorage = (client: Ingress): Storage => {
361361
params: ListHooksParams
362362
): Promise<PaginatedResponse<Hook>> {
363363
if (!params.runId) {
364-
throw new WorkflowAPIError("Unsupported list hooks without runId", {
365-
status: 501,
366-
});
364+
try {
365+
return {
366+
data: await client
367+
.serviceClient<IndexApi>({ name: "index" })
368+
.listHooks(
369+
{
370+
pagination: params.pagination,
371+
resolveData: params.resolveData,
372+
},
373+
rpc.opts({ output: serde.zod(HookSchema.array()) })
374+
),
375+
hasMore: false,
376+
cursor: null,
377+
};
378+
} catch (e) {
379+
throwVercelError(e);
380+
}
367381
}
368382
try {
369383
const res = await client

0 commit comments

Comments
 (0)