From 7383651bf2ae3b804abe5f9f47304fd711d2632f Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Fri, 27 Dec 2024 22:57:37 -0800 Subject: [PATCH] fix part start --- src/s3.zig | 14 ++++++++------ test/js/bun/s3/bun-write-leak-fixture.js | 1 - test/js/bun/s3/s3-stream-leak-fixture.js | 1 - test/js/bun/s3/s3-writer-leak-fixture.js | 1 - test/js/bun/s3/s3.test.ts | 7 ++----- 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/s3.zig b/src/s3.zig index 374d51bba3774d..982242a7506fb2 100644 --- a/src/s3.zig +++ b/src/s3.zig @@ -1813,7 +1813,7 @@ pub const MultiPartUpload = struct { }, .{ .part = @ptrCast(&onPartResponse) }, this); } pub fn start(this: *@This()) void { - if (this.state != .pending) return; + if (this.state != .pending or this.ctx.state != .multipart_completed) return; this.ctx.ref(); this.state = .started; this.perform(); @@ -1925,10 +1925,12 @@ pub const MultiPartUpload = struct { fn drainEnqueuedParts(this: *@This()) void { // check pending to start or transformed buffered ones into tasks - for (this.queue.items) |*part| { - if (part.state == .pending) { - // lets start the part request - part.start(); + if (this.state == .multipart_completed) { + for (this.queue.items) |*part| { + if (part.state == .pending) { + // lets start the part request + part.start(); + } } } const partSize = this.partSizeInBytes(); @@ -2099,7 +2101,7 @@ pub const MultiPartUpload = struct { .search_params = "?uploads=", .content_type = this.content_type, }, .{ .download = @ptrCast(&startMultiPartRequestResult) }, this); - } else { + } else if (this.state == .multipart_completed) { part.start(); } return true; diff --git a/test/js/bun/s3/bun-write-leak-fixture.js b/test/js/bun/s3/bun-write-leak-fixture.js index 13ec13534edab9..a00a94fa963cfe 100644 --- a/test/js/bun/s3/bun-write-leak-fixture.js +++ b/test/js/bun/s3/bun-write-leak-fixture.js @@ -18,7 +18,6 @@ async function run(inputType) { MAX_ALLOWED_MEMORY_USAGE = ((process.memoryUsage.rss() / 1024 / 1024) | 0) + MAX_ALLOWED_MEMORY_USAGE_INCREMENT; } const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; - console.log("Memory usage:", rss, "MB"); if (rss > MAX_ALLOWED_MEMORY_USAGE) { await s3file.unlink(); throw new Error("Memory usage is too high"); diff --git a/test/js/bun/s3/s3-stream-leak-fixture.js b/test/js/bun/s3/s3-stream-leak-fixture.js index 315f6d76948aba..9f2ee258d4072c 100644 --- a/test/js/bun/s3/s3-stream-leak-fixture.js +++ b/test/js/bun/s3/s3-stream-leak-fixture.js @@ -26,7 +26,6 @@ async function run(inputType) { MAX_ALLOWED_MEMORY_USAGE = ((process.memoryUsage.rss() / 1024 / 1024) | 0) + MAX_ALLOWED_MEMORY_USAGE_INCREMENT; } const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; - console.log("Memory usage:", rss, "MB"); if (rss > MAX_ALLOWED_MEMORY_USAGE) { await s3file.unlink(); throw new Error("Memory usage is too high"); diff --git a/test/js/bun/s3/s3-writer-leak-fixture.js b/test/js/bun/s3/s3-writer-leak-fixture.js index a047ea086931cb..49f52b9676814e 100644 --- a/test/js/bun/s3/s3-writer-leak-fixture.js +++ b/test/js/bun/s3/s3-writer-leak-fixture.js @@ -23,7 +23,6 @@ async function run(inputType) { MAX_ALLOWED_MEMORY_USAGE = ((process.memoryUsage.rss() / 1024 / 1024) | 0) + MAX_ALLOWED_MEMORY_USAGE_INCREMENT; } const rss = (process.memoryUsage.rss() / 1024 / 1024) | 0; - console.log("Memory usage:", rss, "MB"); if (rss > MAX_ALLOWED_MEMORY_USAGE) { await s3file.unlink(); throw new Error("Memory usage is too high"); diff --git a/test/js/bun/s3/s3.test.ts b/test/js/bun/s3/s3.test.ts index 566f7baad5aaab..e33272fad36236 100644 --- a/test/js/bun/s3/s3.test.ts +++ b/test/js/bun/s3/s3.test.ts @@ -109,7 +109,7 @@ describe.skipIf(!s3Options.accessKeyId)("s3", () => { // 15 MiB big enough to Multipart upload in more than one part const buffer = Buffer.alloc(1 * 1024 * 1024, "a"); { - await fetch(tmp_filename + "-large", { + await fetch(tmp_filename, { method: "PUT", body: async function* () { for (let i = 0; i < 15; i++) { @@ -120,12 +120,9 @@ describe.skipIf(!s3Options.accessKeyId)("s3", () => { s3: options, }).then(res => res.text()); - const result = await fetch(tmp_filename + "-large", { method: "HEAD", s3: options }); - + const result = await fetch(tmp_filename, { method: "HEAD", s3: options }); expect(result.status).toBe(200); expect(result.headers.get("content-length")).toBe("15728640"); - - await fetch(tmp_filename + "-large", { method: "DELETE", s3: options }); } }, 10_000); });