Skip to content

Commit 2a84430

Browse files
Fix
1 parent 66c2ab2 commit 2a84430

File tree

2 files changed

+57
-14
lines changed

2 files changed

+57
-14
lines changed

packages/libs/backend/src/queue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export const queue = service({
2222
input: serde.zod(schemas.QueueParamsSchema),
2323
},
2424
async (ctx: Context, params) => {
25-
// We use Restate invocation id as message id
25+
// We use Restate invocation id as the message id
2626
const messageId = ctx.request().id;
2727

2828
// Serialize using vercel transport

packages/libs/backend/src/services.ts

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
object,
1717
ObjectContext,
1818
RestatePromise,
19+
rpc,
1920
service,
2021
TerminalError,
2122
} from "@restatedev/restate-sdk";
@@ -113,12 +114,17 @@ export const workflow = object({
113114

114115
ctx.set("run", result, serde.zod(WorkflowRunSchema));
115116

117+
// Build index
118+
ctx.objectSendClient(keyValue, "workflows").append(runId);
119+
116120
return result;
117121
}
118122
),
119123

120124
getRun: createObjectHandler(
121125
{
126+
// No need to have journal retention for this handler, as it just performs reads
127+
journalRetention: { milliseconds: 0 },
122128
output: serde.zod(WorkflowRunSchema),
123129
},
124130
async (ctx: WorkflowContext, params: GetWorkflowRunParams) => {
@@ -287,6 +293,8 @@ export const workflow = object({
287293

288294
getStep: createObjectHandler(
289295
{
296+
// No need to have journal retention for this handler, as it just performs reads
297+
journalRetention: { milliseconds: 0 },
290298
output: serde.zod(StepSchema),
291299
},
292300
async (
@@ -339,6 +347,8 @@ export const workflow = object({
339347

340348
listSteps: createObjectHandler(
341349
{
350+
// No need to have journal retention for this handler, as it just performs reads
351+
journalRetention: { milliseconds: 0 },
342352
output: serde.zod(StepSchema.array()),
343353
},
344354
async (ctx: WorkflowContext, params: ListWorkflowRunStepsParams) => {
@@ -408,6 +418,8 @@ export const workflow = object({
408418

409419
listEvents: createObjectHandler(
410420
{
421+
// No need to have journal retention for this handler, as it just performs reads
422+
journalRetention: { milliseconds: 0 },
411423
output: serde.zod(EventSchema.array()),
412424
},
413425
async (
@@ -487,6 +499,8 @@ export const workflow = object({
487499

488500
getHook: createObjectHandler(
489501
{
502+
// No need to have journal retention for this handler, as it just performs reads
503+
journalRetention: { milliseconds: 0 },
490504
output: serde.zod(HookSchema),
491505
},
492506
async (
@@ -506,6 +520,8 @@ export const workflow = object({
506520

507521
listHooks: createObjectHandler(
508522
{
523+
// No need to have journal retention for this handler, as it just performs reads
524+
journalRetention: { milliseconds: 0 },
509525
output: serde.zod(HookSchema.array()),
510526
},
511527
async (
@@ -594,6 +610,10 @@ export const keyValue = object({
594610
ctx.set("value", newValue);
595611
},
596612
},
613+
options: {
614+
// No need to have journal retention for this service
615+
journalRetention: { milliseconds: 0 },
616+
},
597617
});
598618

599619
export const index = service({
@@ -615,7 +635,7 @@ export const index = service({
615635
for (const runId of runIds || []) {
616636
const events = await ctx
617637
.objectClient(workflow, runId)
618-
.listEvents({ resolveData: param.resolveData });
638+
.listEvents({ resolveData: param.resolveData }, rpc.opts({output: serde.zod(EventSchema.array())}));
619639

620640
matchingEvents.push(
621641
...events.filter(
@@ -645,7 +665,7 @@ export const index = service({
645665
return await ctx.objectClient(workflow, runId[0]!).getHook({
646666
hookId: param.hookId,
647667
resolveData: param.resolveData,
648-
});
668+
}, rpc.opts({output: serde.zod(HookSchema)}));
649669
}
650670
),
651671

@@ -669,25 +689,48 @@ export const index = service({
669689
.getHook({
670690
hookId: runIdAndHookId[0]!.hookId,
671691
resolveData: param.resolveData,
672-
});
692+
}, rpc.opts({output: serde.zod(HookSchema)}));
673693
}
674694
),
675695

676696
listRun: createServiceHandler(
677697
{
678698
output: serde.zod(WorkflowRunSchema.array()),
679699
},
680-
async (ctx: Context, params: ListWorkflowRunsParams) => {
681-
throw new TerminalError("Unimplemented yet", { errorCode: 501 });
682-
}
683-
),
700+
async (
701+
ctx: Context,
702+
params: ListWorkflowRunsParams
703+
): Promise<WorkflowRun[]> => {
704+
const runIds = (await ctx
705+
.objectClient(keyValue, "workflows")
706+
.get()) as string[];
684707

685-
listHooks: createServiceHandler(
686-
{
687-
output: serde.zod(WorkflowRunSchema.array()),
688-
},
689-
async (ctx: Context, params: ListWorkflowRunsParams) => {
690-
throw new TerminalError("Unimplemented yet", { errorCode: 501 });
708+
const orderingSign = params.pagination?.sortOrder === "desc" ? -1 : 1;
709+
return (
710+
(
711+
await RestatePromise.all(
712+
runIds.map((runId) =>
713+
ctx.objectClient(workflow, runId).getRun({
714+
resolveData: params.resolveData,
715+
}, rpc.opts({output: serde.zod(WorkflowRunSchema)}))
716+
)
717+
)
718+
)
719+
// Apply filters
720+
.filter((run) =>
721+
params.status !== undefined ? run.status === params.status : true
722+
)
723+
.filter((run) =>
724+
params.workflowName !== undefined
725+
? run.workflowName === params.workflowName
726+
: true
727+
)
728+
// Sort as requested
729+
.sort(
730+
(a, b) =>
731+
(a.createdAt.getTime() - b.createdAt.getTime()) * orderingSign
732+
)
733+
);
691734
}
692735
),
693736
},

0 commit comments

Comments
 (0)