Skip to content

Commit 85daa4c

Browse files
authored
Fix unbounded memory growth in multipart upload (#214)
Fix unbounded memory growth in multipart upload - Replace buffered chunk reading with streaming ReadableStream (64 KiB increments) - Replace axios with fetch for native streaming support with duplex mode - Throttle UI progress updates to 200ms intervals to prevent stdout buffer bloat - Fix file descriptor leak by properly closing FsFile in all code paths - Fetch fresh presigned URL on each retry attempt to handle expiration - Add bail logic for non-retryable 4xx errors (except 408/429) - Stop progress bar on error to restore terminal state Memory usage now O(1) per concurrent part instead of O(part_size), enabling large file uploads on resource-constrained machines.
1 parent 8ee25b4 commit 85daa4c

File tree

1 file changed

+128
-142
lines changed

1 file changed

+128
-142
lines changed

src/lib/vm/image/upload.ts

Lines changed: 128 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -7,44 +7,43 @@ import { clearInterval, setInterval } from "node:timers";
77
import retry from "async-retry";
88
import ora, { type Ora } from "ora";
99
import cliSpinners from "npm:cli-spinners";
10-
import axios from "axios";
1110
import { apiClient } from "../../../apiClient.ts";
1211

1312
async function readChunk(
1413
filePath: string,
1514
start: number,
16-
chunkSize: number,
15+
length: number,
16+
onProgress?: (bytesRead: number) => void,
1717
): Promise<Uint8Array> {
18-
using file = await Deno.open(filePath, { read: true });
19-
await file.seek(start, Deno.SeekMode.Start);
20-
21-
const buffer = new Uint8Array(chunkSize);
22-
let totalBytesRead = 0;
23-
let emptyReadCount = 0;
24-
const maxEmptyReads = 100;
25-
26-
while (totalBytesRead < chunkSize) {
27-
const bytesRead = await file.read(buffer.subarray(totalBytesRead));
28-
if (bytesRead === null) {
29-
// EOF reached
30-
break;
31-
}
32-
if (bytesRead === 0) {
33-
// No bytes read but not EOF, continue looping
34-
emptyReadCount++;
35-
if (emptyReadCount >= maxEmptyReads) {
36-
throw new Error(
37-
`Failed to read chunk: reached ${maxEmptyReads} consecutive empty reads without EOF`,
38-
);
18+
const file = await Deno.open(filePath, { read: true });
19+
try {
20+
await file.seek(start, Deno.SeekMode.Start);
21+
22+
const buffer = new Uint8Array(length);
23+
let offset = 0;
24+
25+
while (offset < length) {
26+
const bytesRead = await file.read(buffer.subarray(offset));
27+
if (bytesRead === null) {
28+
// EOF reached
29+
break;
30+
}
31+
offset += bytesRead;
32+
if (onProgress) {
33+
onProgress(bytesRead);
3934
}
40-
continue;
4135
}
42-
// Non-empty read, reset counter
43-
emptyReadCount = 0;
44-
totalBytesRead += bytesRead;
45-
}
4636

47-
return buffer.subarray(0, totalBytesRead);
37+
if (offset !== length) {
38+
throw new Error(
39+
`Short read: expected ${length} bytes, got ${offset} bytes`,
40+
);
41+
}
42+
43+
return buffer;
44+
} finally {
45+
file.close();
46+
}
4847
}
4948

5049
const upload = new Command("upload")
@@ -67,6 +66,7 @@ const upload = new Command("upload")
6766
let preparingSpinner: Ora | undefined;
6867
let finalizingSpinner: Ora | undefined;
6968
let spinnerTimer: NodeJS.Timeout | undefined;
69+
let progressBar: cliProgress.SingleBar | undefined;
7070

7171
try {
7272
preparingSpinner = ora(`Preparing upload for ${name}...`).start();
@@ -95,14 +95,16 @@ const upload = new Command("upload")
9595
const fileSize = fileInfo.size;
9696

9797
// Calculate parts for progress tracking
98-
// These magic numbers are not the hard limits, but we don't trust R2 to document them.
99-
const minChunk = 6 * 1024 * 1024; // 6 MiB
100-
const maxParts = 100;
101-
const chunkSize = Math.max(
102-
minChunk,
103-
Math.ceil(fileSize / maxParts),
104-
250 * 1024 * 1024,
105-
); // 250 MiB
98+
const minChunk = 5 * 1024 * 1024; // 5 MiB (minimum)
99+
const defaultChunk = 64 * 1024 * 1024; // 64 MiB
100+
const maxParts = 10000; // object storage supports up to 10k parts
101+
102+
// For files smaller than default chunk, use the whole file as one part
103+
// Otherwise use default chunk size, but ensure we don't exceed maxParts
104+
const chunkSize = fileSize <= defaultChunk
105+
? Math.max(fileSize, minChunk)
106+
: Math.max(minChunk, Math.ceil(fileSize / maxParts), defaultChunk);
107+
106108
const totalParts = Math.ceil(fileSize / chunkSize);
107109

108110
// Calculate upload parts metadata
@@ -121,8 +123,6 @@ const upload = new Command("upload")
121123

122124
// Create combined ora + progress bar with per-part progress tracking
123125
const startTime = Date.now();
124-
let lastSpeed = "0 B/s";
125-
126126
// Track progress per part to handle retries correctly
127127
const partProgress = new Map<number, number>(); // part -> bytes uploaded
128128

@@ -139,13 +139,13 @@ const upload = new Command("upload")
139139
const spinner = cliSpinners.dots;
140140
let spinnerIndex = 0;
141141

142-
const progressBar = new cliProgress.SingleBar({
142+
progressBar = new cliProgress.SingleBar({
143143
format:
144144
`{spinner} Uploading [{bar}] {percentage}% | {uploadedMB}/{totalMB} MB | {speed}`,
145145
barCompleteChar: "\u2588",
146146
barIncompleteChar: "\u2591",
147147
hideCursor: true,
148-
forceRedraw: true,
148+
forceRedraw: false,
149149
});
150150

151151
progressBar.start(fileSize, 0, {
@@ -155,26 +155,14 @@ const upload = new Command("upload")
155155
totalMB: (fileSize / (1024 * 1024)).toFixed(1),
156156
});
157157

158-
// Create a timer to animate the spinner at the correct interval
159-
spinnerTimer = setInterval(() => {
160-
spinnerIndex++;
161-
const totalBytesUploaded = getTotalBytesUploaded();
162-
// Force a redraw to animate the spinner
163-
progressBar.update(totalBytesUploaded, {
164-
spinner: spinner.frames[spinnerIndex % spinner.frames.length],
165-
speed: lastSpeed || "0 B/s",
166-
uploadedMB: (totalBytesUploaded / (1024 * 1024)).toFixed(1),
167-
totalMB: (fileSize / (1024 * 1024)).toFixed(1),
168-
});
169-
}, spinner.interval);
170-
171-
const updateProgress = (part: number, bytesUploaded: number) => {
172-
const previousBytes = partProgress.get(part) || 0;
173-
partProgress.set(part, previousBytes + bytesUploaded);
158+
// Throttle UI updates to 200ms
159+
const UI_UPDATE_INTERVAL_MS = 200;
160+
let lastUIUpdate = 0;
174161

162+
const renderProgress = () => {
175163
const totalBytesUploaded = getTotalBytesUploaded();
176-
const elapsedTime = (Date.now() - startTime) / 1000; // seconds
177-
const speed = totalBytesUploaded / elapsedTime; // bytes per second
164+
const elapsedTime = (Date.now() - startTime) / 1000;
165+
const speed = totalBytesUploaded / elapsedTime;
178166

179167
// Format speed
180168
let speedStr: string;
@@ -186,9 +174,6 @@ const upload = new Command("upload")
186174
speedStr = `${speed.toFixed(0)} B/s`;
187175
}
188176

189-
// Store values for spinner animation
190-
lastSpeed = speedStr;
191-
192177
progressBar.update(totalBytesUploaded, {
193178
spinner: spinner.frames[spinnerIndex % spinner.frames.length],
194179
speed: speedStr,
@@ -197,20 +182,23 @@ const upload = new Command("upload")
197182
});
198183
};
199184

200-
const resetPartProgress = (part: number) => {
201-
const previousBytes = partProgress.get(part) || 0;
202-
if (previousBytes > 0) {
203-
partProgress.set(part, 0);
204-
205-
const totalBytesUploaded = getTotalBytesUploaded();
206-
// Update progress bar to reflect the reset
207-
progressBar.update(totalBytesUploaded, {
208-
spinner: spinner.frames[spinnerIndex % spinner.frames.length],
209-
speed: lastSpeed || "0 B/s",
210-
uploadedMB: (totalBytesUploaded / (1024 * 1024)).toFixed(1),
211-
totalMB: (fileSize / (1024 * 1024)).toFixed(1),
212-
});
185+
// Create a timer to animate the spinner and update progress
186+
spinnerTimer = setInterval(() => {
187+
spinnerIndex++;
188+
const now = Date.now();
189+
if (now - lastUIUpdate >= UI_UPDATE_INTERVAL_MS) {
190+
renderProgress();
191+
lastUIUpdate = now;
213192
}
193+
}, spinner.interval);
194+
195+
const updateProgress = (part: number, bytesUploaded: number) => {
196+
const previousBytes = partProgress.get(part) || 0;
197+
partProgress.set(part, previousBytes + bytesUploaded);
198+
};
199+
200+
const resetPartProgress = (part: number) => {
201+
partProgress.set(part, 0);
214202
};
215203

216204
// Upload parts concurrently with specified concurrency limit
@@ -223,9 +211,15 @@ const upload = new Command("upload")
223211
) => {
224212
const chunkSize = end - start;
225213

226-
// Step 1: Fetch upload URL with retry
227-
const url = await retry(
228-
async () => {
214+
// Upload the chunk with retry, fetching fresh URL each attempt
215+
await retry(
216+
async (bail: (e: Error) => void, attemptNumber: number) => {
217+
// Reset progress for this part on retry (except first attempt)
218+
if (attemptNumber > 1) {
219+
resetPartProgress(part);
220+
}
221+
222+
// Fetch fresh upload URL for this attempt
229223
const response = await client.POST(
230224
"/v1/vms/images/{image_id}/upload",
231225
{
@@ -241,81 +235,66 @@ const upload = new Command("upload")
241235
);
242236

243237
if (!response.response.ok || !response.data) {
238+
const status = response.response.status;
244239
const errorText = response.response.ok
245240
? "No data in response"
246-
: await response.response.text();
241+
: await response.response.text().catch(() => "");
242+
243+
// Bail on non-transient 4xx errors (except 408 Request Timeout and 429 Too Many Requests)
244+
if (
245+
status >= 400 && status < 500 && status !== 408 &&
246+
status !== 429
247+
) {
248+
bail(
249+
new Error(
250+
`Failed to get upload URL for part ${part}: ${status} ${response.response.statusText} - ${errorText}`,
251+
),
252+
);
253+
return;
254+
}
255+
247256
throw new Error(
248-
`Failed to get upload URL for part ${part}: ${response.response.status} ${response.response.statusText} - ${errorText}`,
257+
`Failed to get upload URL for part ${part}: ${status} ${response.response.statusText} - ${errorText}`,
249258
);
250259
}
251260

252-
return response.data.upload_url;
253-
},
254-
{
255-
retries: 3,
256-
factor: 2,
257-
randomize: true,
258-
},
259-
);
260-
261-
// Step 2: Upload the chunk with retry
262-
await retry(
263-
async (_: unknown, _attemptNumber: number) => {
264-
// Reset progress for this part on retry (except first attempt)
265-
if (_attemptNumber > 1) {
266-
resetPartProgress(part);
267-
}
268-
269-
const chunk = await readChunk(filePath, start, chunkSize);
270-
271-
// Track upload progress with axios
272-
let lastUploadedBytes = 0;
261+
const url = response.data.upload_url;
273262

274-
try {
275-
const res = await axios.put(url, chunk, {
276-
headers: {
277-
"Content-Type": "application/octet-stream",
278-
"Content-Length": chunk.length.toString(),
279-
},
280-
onUploadProgress: (progressEvent) => {
281-
const uploadedBytes = progressEvent.loaded || 0;
282-
const deltaBytes = uploadedBytes - lastUploadedBytes;
283-
284-
if (deltaBytes > 0) {
285-
updateProgress(part, deltaBytes);
286-
lastUploadedBytes = uploadedBytes;
287-
}
288-
},
289-
maxRedirects: 0,
290-
});
263+
// Read chunk from disk with progress tracking
264+
const payload = await readChunk(
265+
filePath,
266+
start,
267+
chunkSize,
268+
(bytesRead) => {
269+
updateProgress(part, bytesRead);
270+
},
271+
);
291272

292-
if (res.status < 200 || res.status >= 300) {
293-
throw new Error(
294-
`Part ${part} upload failed: ${res.status} ${res.statusText}`,
295-
);
296-
}
297-
} catch (err) {
298-
// Log Cloudflare/R2 specific errors
299-
if (axios.isAxiosError(err)) {
300-
const cfRay = err.response?.headers?.["cf-ray"];
301-
const cfCacheStatus = err.response?.headers
302-
?.["cf-cache-status"];
303-
console.error(gray(`\nPart ${part} upload error:`));
304-
console.error(
305-
gray(
306-
` Status: ${err.response?.status} ${
307-
err.response?.statusText || ""
308-
}`,
273+
const res = await fetch(url, {
274+
method: "PUT",
275+
headers: {
276+
"Content-Type": "application/octet-stream",
277+
},
278+
body: payload,
279+
});
280+
281+
if (!res.ok) {
282+
// Bail on non-transient 4xx errors (except 408 and 429)
283+
if (
284+
res.status >= 400 && res.status < 500 && res.status !== 408 &&
285+
res.status !== 429
286+
) {
287+
bail(
288+
new Error(
289+
`Part ${part} upload failed: ${res.status} ${res.statusText}`,
309290
),
310291
);
311-
console.error(gray(` Error code: ${err.code || "unknown"}`));
312-
if (cfRay) console.error(gray(` Cloudflare Ray ID: ${cfRay}`));
313-
if (cfCacheStatus) {
314-
console.error(gray(` CF Cache Status: ${cfCacheStatus}`));
315-
}
316-
console.error(gray(` Message: ${err.message}`));
292+
return;
317293
}
318-
throw err;
294+
295+
throw new Error(
296+
`Part ${part} upload failed: ${res.status} ${res.statusText}`,
297+
);
319298
}
320299
},
321300
{
@@ -404,6 +383,13 @@ const upload = new Command("upload")
404383
spinnerTimer = undefined;
405384
}
406385

386+
// Stop progress bar
387+
try {
388+
progressBar?.stop();
389+
} catch {
390+
// Ignore if progress bar not started
391+
}
392+
407393
// Stop any running spinners on error
408394
if (preparingSpinner?.isSpinning) {
409395
preparingSpinner.fail(

0 commit comments

Comments
 (0)