From 9170021b77cae866996d16cda36e98a4b7ed6529 Mon Sep 17 00:00:00 2001 From: Matteo Ferrando Date: Mon, 25 Nov 2024 21:27:27 -0400 Subject: [PATCH] feat: automatically do multi-part upload on 90+ MB files (#111) * feat: introduce fal cdn v3 with storage API * feat: automatically do multi-part upload on 90+ MB files * fix: use new API from CDN * alpha * fix: include dependency * chore: bump alpha * refactor: remove semaphore * refactor: remove console --- libs/client/package.json | 2 +- libs/client/src/storage.ts | 120 +++++++++++++++++++++++++++++++++++-- 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/libs/client/package.json b/libs/client/package.json index 5b13b60..b197974 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/client", "description": "The fal.ai client for JavaScript and TypeScript", - "version": "1.1.3", + "version": "1.2.0-alpha.5", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/storage.ts b/libs/client/src/storage.ts index ebb4186..5bd86a7 100644 --- a/libs/client/src/storage.ts +++ b/libs/client/src/storage.ts @@ -1,7 +1,6 @@ import { getRestApiUrl, RequiredConfig } from "./config"; import { dispatchRequest } from "./request"; import { isPlainObject } from "./utils"; - /** * File support for the client. This interface establishes the contract for * uploading files to the server and transforming the input to replace file @@ -53,17 +52,15 @@ function getExtensionFromContentType(contentType: string): string { /** * Initiate the upload of a file to the server. This returns the URL to upload * the file to and the URL of the file once it is uploaded. - * - * @param file the file to upload - * @returns the URL to upload the file to and the URL of the file once it is uploaded. */ async function initiateUpload( file: Blob, config: RequiredConfig, + contentType: string, ): Promise { - const contentType = file.type || "application/octet-stream"; const filename = file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; + return await dispatchRequest({ method: "POST", // NOTE: We want to test V3 without making it the default at the API level @@ -76,6 +73,111 @@ async function initiateUpload( }); } +/** + * Initiate the multipart upload of a file to the server. This returns the URL to upload + * the file to and the URL of the file once it is uploaded. + */ +async function initiateMultipartUpload( + file: Blob, + config: RequiredConfig, + contentType: string, +): Promise { + const filename = + file.name || `${Date.now()}.${getExtensionFromContentType(contentType)}`; + + return await dispatchRequest({ + method: "POST", + targetUrl: `${getRestApiUrl()}/storage/upload/initiate-multipart?storage_type=fal-cdn-v3`, + input: { + content_type: contentType, + file_name: filename, + }, + config, + }); +} + +type MultipartObject = { + partNumber: number; + etag: string; +}; + +async function partUploadRetries( + uploadUrl: string, + chunk: Blob, + config: RequiredConfig, + tries: number = 3, +): Promise { + if (tries === 0) { + throw new Error("Part upload failed, retries exhausted"); + } + + const { fetch, responseHandler } = config; + + try { + const response = await fetch(uploadUrl, { + method: "PUT", + body: chunk, + }); + + return (await responseHandler(response)) as MultipartObject; + } catch (error) { + return await partUploadRetries(uploadUrl, chunk, config, tries - 1); + } +} + +async function multipartUpload( + file: Blob, + config: RequiredConfig, +): Promise { + const { fetch, responseHandler } = config; + const contentType = file.type || "application/octet-stream"; + const { upload_url: uploadUrl, file_url: url } = + await initiateMultipartUpload(file, config, contentType); + + // Break the file into 10MB chunks + const chunkSize = 10 * 1024 * 1024; + const chunks = Math.ceil(file.size / chunkSize); + + const parsedUrl = new URL(uploadUrl); + + const responses: MultipartObject[] = []; + + try { + for (let i = 0; i < chunks; i++) { + const start = i * chunkSize; + const end = Math.min(start + chunkSize, file.size); + + const chunk = file.slice(start, end); + + const partNumber = i + 1; + // {uploadUrl}/{part_number}?uploadUrlParams=... + const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; + + responses.push(await partUploadRetries(partUploadUrl, chunk, config)); + } + } catch (error) { + throw error; + } + + // Complete the upload + const completeUrl = `${parsedUrl.origin}${parsedUrl.pathname}/complete${parsedUrl.search}`; + const response = await fetch(completeUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + parts: responses.map((mpart) => ({ + partNumber: mpart.partNumber, + etag: mpart.etag, + })), + }), + }); + await responseHandler(response); + + return url; +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any type KeyValuePair = [string, any]; @@ -88,10 +190,18 @@ export function createStorageClient({ }: StorageClientDependencies): StorageClient { const ref: StorageClient = { upload: async (file: Blob) => { + // Check for 90+ MB file size to do multipart upload + if (file.size > 90 * 1024 * 1024) { + return await multipartUpload(file, config); + } + + const contentType = file.type || "application/octet-stream"; + const { fetch, responseHandler } = config; const { upload_url: uploadUrl, file_url: url } = await initiateUpload( file, config, + contentType, ); const response = await fetch(uploadUrl, { method: "PUT",