From d6458d9a117003ba3b7a0f85bd682a64e46bc9c6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 1 Nov 2024 13:21:32 +0000 Subject: [PATCH] Handle realtime with large payloads or outputs #1451 --- apps/webapp/app/routes/api.v1.packets.$.ts | 42 +++++++++---------- packages/core/src/v3/apiClient/index.ts | 3 ++ packages/core/src/v3/apiClient/runStream.ts | 5 ++- packages/core/src/v3/utils/ioSerialization.ts | 23 ++++++---- .../src/app/runs/[id]/ClientRunDetails.tsx | 4 +- .../src/components/RunDetails.tsx | 4 +- .../nextjs-realtime/src/trigger/example.ts | 5 ++- references/nextjs-realtime/trigger.config.ts | 2 +- 8 files changed, 52 insertions(+), 36 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.packets.$.ts b/apps/webapp/app/routes/api.v1.packets.$.ts index 9cb72d30eb..d88773d941 100644 --- a/apps/webapp/app/routes/api.v1.packets.$.ts +++ b/apps/webapp/app/routes/api.v1.packets.$.ts @@ -2,6 +2,7 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server"; import { generatePresignedUrl } from "~/v3/r2.server"; const ParamsSchema = z.object({ @@ -39,28 +40,27 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ presignedUrl }); } -export async function loader({ request, params }: ActionFunctionArgs) { - // Next authenticate the request - const authenticationResult = await authenticateApiRequest(request); +export const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + }, + async ({ params, authentication }) => { + const filename = params["*"]; - if (!authenticationResult) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } + const presignedUrl = await generatePresignedUrl( + authentication.environment.project.externalRef, + authentication.environment.slug, + filename, + "GET" + ); - const parsedParams = ParamsSchema.parse(params); - const filename = parsedParams["*"]; - - const presignedUrl = await generatePresignedUrl( - authenticationResult.environment.project.externalRef, - authenticationResult.environment.slug, - filename, - "GET" - ); + if (!presignedUrl) { + return json({ error: "Failed to generate presigned URL" }, { status: 500 }); + } - if (!presignedUrl) { - return json({ error: "Failed to generate presigned URL" }, { status: 500 }); + // Caller can now use this URL to fetch that object. + return json({ presignedUrl }); } - - // Caller can now use this URL to fetch that object. - return json({ presignedUrl }); -} +); diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index e0fd38e895..09275c0e1f 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -598,6 +598,7 @@ export class ApiClient { return runShapeStream(`${this.baseUrl}/realtime/v1/runs/${runId}`, { closeOnComplete: true, headers: this.#getRealtimeHeaders(), + client: this, }); } @@ -611,6 +612,7 @@ export class ApiClient { { closeOnComplete: false, headers: this.#getRealtimeHeaders(), + client: this, } ); } @@ -619,6 +621,7 @@ export class ApiClient { return runShapeStream(`${this.baseUrl}/realtime/v1/batches/${batchId}`, { closeOnComplete: false, headers: this.#getRealtimeHeaders(), + client: this, }); } diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 2ecbca9181..6769235b2e 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -8,6 +8,7 @@ import { IOPacket, parsePacket, } from "../utils/ioSerialization.js"; +import { ApiClient } from "./index.js"; import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js"; export type RunShape = TRunTypes extends AnyRunTypes @@ -50,6 +51,7 @@ export type RunShapeStreamOptions = { fetchClient?: typeof fetch; closeOnComplete?: boolean; signal?: AbortSignal; + client?: ApiClient; }; export type StreamPartResult> = { @@ -84,6 +86,7 @@ export function runShapeStream( signal: options?.signal, } ), + ...options, }; return new RunSubscription($options); @@ -306,7 +309,7 @@ export class RunSubscription { return cachedResult; } - const result = await conditionallyImportAndParsePacket(packet); + const result = await conditionallyImportAndParsePacket(packet, this.options.client); this.packetCache.set(`${row.friendlyId}/${key}`, result); return result; diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index c16d07d61d..b1a8d4587f 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -7,6 +7,7 @@ import { apiClientManager } from "../apiClientManager-api.js"; import { zodfetch } from "../zodfetch.js"; import { z } from "zod"; import type { RetryOptions } from "../schemas/index.js"; +import { ApiClient } from "../apiClient/index.js"; export type IOPacket = { data?: string | undefined; @@ -36,8 +37,11 @@ export async function parsePacket(value: IOPacket): Promise { } } -export async function conditionallyImportAndParsePacket(value: IOPacket): Promise { - const importedPacket = await conditionallyImportPacket(value); +export async function conditionallyImportAndParsePacket( + value: IOPacket, + client?: ApiClient +): Promise { + const importedPacket = await conditionallyImportPacket(value, undefined, client); return await parsePacket(importedPacket); } @@ -159,19 +163,20 @@ async function exportPacket(packet: IOPacket, pathPrefix: string): Promise { if (packet.dataType !== "application/store") { return packet; } if (!tracer) { - return await importPacket(packet); + return await importPacket(packet, undefined, client); } else { const result = await tracer.startActiveSpan( "store.downloadPayload", async (span) => { - return await importPacket(packet, span); + return await importPacket(packet, span, client); }, { attributes: { @@ -209,16 +214,18 @@ export async function resolvePresignedPacketUrl( } } -async function importPacket(packet: IOPacket, span?: Span): Promise { +async function importPacket(packet: IOPacket, span?: Span, client?: ApiClient): Promise { if (!packet.data) { return packet; } - if (!apiClientManager.client) { + const $client = client ?? apiClientManager.client; + + if (!$client) { return packet; } - const presignedResponse = await apiClientManager.client.getPayloadUrl(packet.data); + const presignedResponse = await $client.getPayloadUrl(packet.data); const response = await zodfetch(z.any(), presignedResponse.presignedUrl, undefined, { retry: ioRetryOptions, diff --git a/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx b/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx index 486ca238e0..2ee2841cab 100644 --- a/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx +++ b/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx @@ -2,11 +2,11 @@ import RunDetails from "@/components/RunDetails"; import { Card, CardContent } from "@/components/ui/card"; -import { TriggerAuthContext, useRun } from "@trigger.dev/react-hooks"; +import { TriggerAuthContext, useRealtimeRun } from "@trigger.dev/react-hooks"; import type { exampleTask } from "@/trigger/example"; function RunDetailsWrapper({ runId }: { runId: string }) { - const { run, error } = useRun(runId, { refreshInterval: 1000 }); + const { run, error } = useRealtimeRun(runId); if (error) { return ( diff --git a/references/nextjs-realtime/src/components/RunDetails.tsx b/references/nextjs-realtime/src/components/RunDetails.tsx index 1f8b094865..64deb8940a 100644 --- a/references/nextjs-realtime/src/components/RunDetails.tsx +++ b/references/nextjs-realtime/src/components/RunDetails.tsx @@ -2,7 +2,7 @@ import { Badge } from "@/components/ui/badge"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { ScrollArea } from "@/components/ui/scroll-area"; import { exampleTask } from "@/trigger/example"; -import type { RetrieveRunResult } from "@trigger.dev/sdk/v3"; +import type { TaskRunShape } from "@trigger.dev/sdk/v3"; import { AlertTriangleIcon, CheckCheckIcon, XIcon } from "lucide-react"; function formatDate(date: Date | undefined) { @@ -17,7 +17,7 @@ function JsonDisplay({ data }: { data: any }) { ); } -export default function RunDetails({ record }: { record: RetrieveRunResult }) { +export default function RunDetails({ record }: { record: TaskRunShape }) { return ( diff --git a/references/nextjs-realtime/src/trigger/example.ts b/references/nextjs-realtime/src/trigger/example.ts index 90b8590570..031888d187 100644 --- a/references/nextjs-realtime/src/trigger/example.ts +++ b/references/nextjs-realtime/src/trigger/example.ts @@ -23,6 +23,9 @@ export const exampleTask = schemaTask({ metadata.set("status", { type: "finished", progress: 1.0 }); - return { message: "All good here!" }; + // Generate a return payload that is more than 128KB + const bigPayload = new Array(100000).fill("a".repeat(10)).join(""); + + return { message: bigPayload }; }, }); diff --git a/references/nextjs-realtime/trigger.config.ts b/references/nextjs-realtime/trigger.config.ts index 4dcd27b930..9820fb2223 100644 --- a/references/nextjs-realtime/trigger.config.ts +++ b/references/nextjs-realtime/trigger.config.ts @@ -1,6 +1,6 @@ import { defineConfig } from "@trigger.dev/sdk/v3"; export default defineConfig({ - project: "proj_xyxzzpnujsnhjiskihvs", + project: "proj_bzhdaqhlymtuhlrcgbqy", dirs: ["./src/trigger"], });