Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3) fix queue and multipart flow #16890

Merged
merged 16 commits into from
Jan 31, 2025
198 changes: 156 additions & 42 deletions src/s3/multipart.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,104 @@ const MultiPartUploadOptions = @import("./multipart_options.zig").MultiPartUploa
const S3SimpleRequest = @import("./simple_request.zig");
const executeSimpleS3Request = S3SimpleRequest.executeSimpleS3Request;
const S3Error = @import("./error.zig").S3Error;

// When we start the request we will buffer data until partSize is reached or the last chunk is received.
// If the buffer is smaller than partSize, it will be sent as a single request. Otherwise, a multipart upload will be initiated.
// If we send a single request it will retry until the maximum retry count is reached. The single request do not increase the reference count of MultiPartUpload, as they are the final step.
// When sending a multipart upload, if there is space in the queue, the part is enqueued, and the request starts immediately.
// If the queue is full, it waits to be drained before starting a new part request.
// Each part maintains a reference to MultiPartUpload until completion.
// If a part is canceled or fails early, the allocated slice is freed, and the reference is removed. If a part completes successfully, an etag is received, the allocated slice is deallocated, and the etag is appended to multipart_etags. If a part request fails, it retries until the maximum retry count is reached. If it still fails, MultiPartUpload is marked as failed and its reference is removed.
// If all parts succeed, a complete request is sent.
// If any part fails, a rollback request deletes the uploaded parts. Rollback and commit requests do not increase the reference count of MultiPartUpload, as they are the final step. Once commit or rollback finishes, the reference count is decremented, and MultiPartUpload is freed. These requests retry up to the maximum retry count on a best-effort basis.

// Start Upload
// │
// ▼
// Buffer Incoming Data
// │
// │
// ┌────────────┴────────────────┐
// │ │
// ▼ ▼
// Buffer < PartSize Buffer >= PartSize
// and is Last Chunk │
// │ │
// │ │
// │ │
// │ │
// │ ▼
// │ Start Multipart Upload
// │ │
// │ Initialize Parts Queue
// │ │
// │ Process Upload Parts
// │ │
// │ ┌──────────┴──────────┐
// │ │ │
// │ ▼ ▼
// │ Queue Has Space Queue Full
// │ │ │
// │ │ ▼
// │ │ Wait for Queue
// │ │ │
// │ └──────────┬──────────┘
// │ │
// │ ▼
// │ Start Part Upload
// │ (Reference MultiPartUpload)
// │ │
// │ ┌─────────┼─────────┐
// │ │ │ │
// │ ▼ ▼ ▼
// │ Part Success Failure
// │ Canceled │ │
// │ │ │ Retry Part
// │ │ │ │
// │ Free Free Max Retries?
// │ Slice Slice │ │
// │ │ │ No Yes
// │ Deref Add eTag │ │
// │ MPU to Array │ Fail MPU
// │ │ │ │ │
// │ │ │ │ Deref MPU
// │ └─────────┼──────┘ │
// │ │ │
// │ ▼ │
// │ All Parts Complete? │
// │ │ │
// │ ┌───────┴───────┐ │
// │ │ │ │
// │ ▼ ▼ │
// │ All Success Some Failed │
// │ │ │ │
// │ ▼ ▼ │
// │ Send Commit Send Rollback │
// │ (No Ref Inc) (No Ref Inc) │
// │ │ │ │
// │ └───────┬───────┘ │
// │ │ │
// │ ▼ │
// │ Retry if Failed │
// │ (Best Effort Only) │
// │ │ │
// │ ▼ │
// │ Deref Final MPU │
// │ │ │
// ▼ │ │
// Single Upload Request │ │
// │ │ │
// └────────────────────────────┴───────────────┘
// │
// ▼
// End
pub const MultiPartUpload = struct {
const OneMiB: usize = MultiPartUploadOptions.OneMiB;
const MAX_SINGLE_UPLOAD_SIZE: usize = MultiPartUploadOptions.MAX_SINGLE_UPLOAD_SIZE; // we limit to 5 GiB
const MIN_SINGLE_UPLOAD_SIZE: usize = MultiPartUploadOptions.MIN_SINGLE_UPLOAD_SIZE;
const DefaultPartSize = MultiPartUploadOptions.DefaultPartSize;
const MAX_QUEUE_SIZE = MultiPartUploadOptions.MAX_QUEUE_SIZE;
const AWS = S3Credentials;
queue: std.ArrayListUnmanaged(UploadPart) = .{},
queue: ?[]UploadPart = null,
available: bun.bit_set.IntegerBitSet(MAX_QUEUE_SIZE) = bun.bit_set.IntegerBitSet(MAX_QUEUE_SIZE).initFull(),

currentPartNumber: u16 = 1,
Expand Down Expand Up @@ -64,11 +153,12 @@ pub const MultiPartUpload = struct {
data: []const u8,
ctx: *MultiPartUpload,
allocated_size: usize,
state: enum {
pending,
started,
completed,
canceled,
state: enum(u8) {
not_assigned = 0,
pending = 1,
started = 2,
completed = 3,
canceled = 4,
},
partNumber: u16, // max is 10,000
retry: u8, // auto retry, decrement until 0 and fail after this
Expand All @@ -78,6 +168,7 @@ pub const MultiPartUpload = struct {
number: u16,
etag: []const u8,
};

fn sortEtags(_: *MultiPartUpload, a: UploadPart.UploadPartResult, b: UploadPart.UploadPartResult) bool {
return a.number < b.number;
}
Expand Down Expand Up @@ -117,6 +208,7 @@ pub const MultiPartUpload = struct {
this.perform();
return;
} else {
this.state = .not_assigned;
log("onPartResponse {} failed", .{this.partNumber});
this.freeAllocatedSlice();
defer this.ctx.deref();
Expand All @@ -131,7 +223,7 @@ pub const MultiPartUpload = struct {
.number = this.partNumber,
.etag = bun.default_allocator.dupe(u8, etag) catch bun.outOfMemory(),
}) catch bun.outOfMemory();

this.state = .not_assigned;
defer this.ctx.deref();
// mark as available
this.ctx.available.set(this.index);
Expand All @@ -156,7 +248,7 @@ pub const MultiPartUpload = struct {
}, .{ .part = @ptrCast(&onPartResponse) }, this);
}
pub fn start(this: *@This()) void {
if (this.state != .pending or this.ctx.state != .multipart_completed or this.ctx.state == .finished) return;
if (this.state != .pending or this.ctx.state != .multipart_completed) return;
this.ctx.ref();
this.state = .started;
this.perform();
Expand All @@ -177,8 +269,10 @@ pub const MultiPartUpload = struct {

fn deinit(this: *@This()) void {
log("deinit", .{});
if (this.queue.capacity > 0)
this.queue.deinit(bun.default_allocator);
if (this.queue) |queue| {
this.queue = null;
bun.default_allocator.free(queue);
}
this.poll_ref.unref(this.vm);
bun.default_allocator.free(this.path);
if (this.proxy.len > 0) {
Expand All @@ -202,15 +296,12 @@ pub const MultiPartUpload = struct {
}

pub fn singleSendUploadResponse(result: S3SimpleRequest.S3UploadResult, this: *@This()) void {
defer this.deref();
if (this.state == .finished) return;
switch (result) {
.failure => |err| {
if (this.options.retry > 0) {
log("singleSendUploadResponse {} retry", .{this.options.retry});
this.options.retry -= 1;
this.ref();
// retry failed
executeSimpleS3Request(this.credentials, .{
.path = this.path,
.method = .PUT,
Expand All @@ -234,33 +325,40 @@ pub const MultiPartUpload = struct {
}
}

/// This is the only place we allocate the queue or the parts, this is responsible for the flow of parts and the max allowed concurrency
fn getCreatePart(this: *@This(), chunk: []const u8, allocated_size: usize, needs_clone: bool) ?*UploadPart {
const index = this.available.findFirstSet() orelse {
// this means that the queue is full and we cannot flush it
return null;
};

if (index >= this.options.queueSize) {
const queueSize = this.options.queueSize;
if (index >= queueSize) {
// ops too much concurrency wait more
return null;
}
this.available.unset(index);
defer this.currentPartNumber += 1;
const data = if (needs_clone) bun.default_allocator.dupe(u8, chunk) catch bun.outOfMemory() else chunk;
const allocated_len = if (needs_clone) data.len else allocated_size;
if (this.queue.items.len <= index) {
this.queue.append(bun.default_allocator, .{
.data = data,
.allocated_size = allocated_len,
.partNumber = this.currentPartNumber,
if (this.queue == null) {
// queueSize will never change and is small (max 255)
const queue = bun.default_allocator.alloc(UploadPart, queueSize) catch bun.outOfMemory();
// zero set just in case
@memset(queue, UploadPart{
.data = "",
.allocated_size = 0,
.partNumber = 0,
.ctx = this,
.index = @truncate(index),
.retry = this.options.retry,
.state = .pending,
}) catch bun.outOfMemory();
return &this.queue.items[index];
.index = 0,
.retry = 0,
.state = .not_assigned,
});
this.queue = queue;
}
this.queue.items[index] = .{
const data = if (needs_clone) bun.default_allocator.dupe(u8, chunk) catch bun.outOfMemory() else chunk;
const allocated_len = if (needs_clone) data.len else allocated_size;

const queue_item = &this.queue.?[index];
// always set all struct fields to avoid undefined behavior
queue_item.* = .{
.data = data,
.allocated_size = allocated_len,
.partNumber = this.currentPartNumber,
Expand All @@ -269,19 +367,22 @@ pub const MultiPartUpload = struct {
.retry = this.options.retry,
.state = .pending,
};
return &this.queue.items[index];
return queue_item;
}

/// Drain the parts, this is responsible for starting the parts and processing the buffered data
fn drainEnqueuedParts(this: *@This()) void {
if (this.state == .finished) {
if (this.state == .finished or this.state == .singlefile_started) {
return;
}
// check pending to start or transformed buffered ones into tasks
if (this.state == .multipart_completed) {
for (this.queue.items) |*part| {
if (part.state == .pending) {
// lets start the part request
part.start();
if (this.queue) |queue| {
for (queue) |*part| {
if (part.state == .pending) {
// lets start the part request
part.start();
}
}
}
}
Expand All @@ -291,35 +392,44 @@ pub const MultiPartUpload = struct {
}

if (this.ended and this.available.mask == std.bit_set.IntegerBitSet(MAX_QUEUE_SIZE).initFull().mask) {
// we are done
// we are done and no more parts are running
this.done();
}
}
/// Finalize the upload with a failure
pub fn fail(this: *@This(), _err: S3Error) void {
log("fail {s}:{s}", .{ _err.code, _err.message });
this.ended = true;
for (this.queue.items) |*task| {
task.cancel();
if (this.queue) |queue| {
for (queue) |*task| {
if (task.state != .not_assigned) {
task.cancel();
}
}
}
if (this.state != .finished) {
const old_state = this.state;
this.state = .finished;
this.callback(.{ .failure = _err }, this.callback_context);

if (old_state == .multipart_completed) {
// we are a multipart upload so we need to rollback
// will deref after rollback
this.rollbackMultiPartRequest();
} else {
// single file upload no need to rollback
this.deref();
}
}
}

/// Finalize successful the upload
fn done(this: *@This()) void {
if (this.state == .multipart_completed) {
// we are a multipart upload so we need to send the etags and commit
this.state = .finished;

// sort the etags
std.sort.block(UploadPart.UploadPartResult, this.multipart_etags.items, this, UploadPart.sortEtags);
// start the multipart upload list
this.multipart_upload_list.append(bun.default_allocator, "<?xml version=\"1.0\" encoding=\"UTF-8\"?><CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">") catch bun.outOfMemory();
for (this.multipart_etags.items) |tag| {
this.multipart_upload_list.appendFmt(bun.default_allocator, "<Part><PartNumber>{}</PartNumber><ETag>{s}</ETag></Part>", .{ tag.number, tag.etag }) catch bun.outOfMemory();
Expand All @@ -332,11 +442,14 @@ pub const MultiPartUpload = struct {
// will deref and ends after commit
this.commitMultiPartRequest();
} else {
// single file upload no need to commit
this.callback(.{ .success = {} }, this.callback_context);
this.state = .finished;
this.deref();
}
}

/// Result of the Multipart request, after this we can start draining the parts
pub fn startMultiPartRequestResult(result: S3SimpleRequest.S3DownloadResult, this: *@This()) void {
defer this.deref();
if (this.state == .finished) return;
Expand Down Expand Up @@ -365,6 +478,7 @@ pub const MultiPartUpload = struct {
}
log("startMultiPartRequestResult {s} success id: {s}", .{ this.path, this.upload_id });
this.state = .multipart_completed;
// start draining the parts
this.drainEnqueuedParts();
},
// this is "unreachable" but we cover in case AWS returns 404
Expand All @@ -375,6 +489,7 @@ pub const MultiPartUpload = struct {
}
}

/// We do a best effort to commit the multipart upload, if it fails we will retry, if it still fails we will fail the upload
pub fn onCommitMultiPartRequest(result: S3SimpleRequest.S3CommitResult, this: *@This()) void {
log("onCommitMultiPartRequest {s}", .{this.upload_id});

Expand All @@ -396,7 +511,7 @@ pub const MultiPartUpload = struct {
},
}
}

/// We do a best effort to rollback the multipart upload, if it fails we will retry, if it still we just deinit the upload
pub fn onRollbackMultiPartRequest(result: S3SimpleRequest.S3UploadResult, this: *@This()) void {
log("onRollbackMultiPartRequest {s}", .{this.upload_id});
switch (result) {
Expand Down Expand Up @@ -527,7 +642,6 @@ pub const MultiPartUpload = struct {
if (this.ended and this.buffered.items.len < this.partSizeInBytes() and this.state == .not_started) {
log("processBuffered {s} singlefile_started", .{this.path});
this.state = .singlefile_started;
this.ref();
// we can do only 1 request
executeSimpleS3Request(this.credentials, .{
.path = this.path,
Expand Down
15 changes: 15 additions & 0 deletions test/js/bun/s3/s3.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,21 @@ for (let credentials of allCredentials) {
expect(await s3File.text()).toBe(bigPayload);
}
}, 10_000);

it("should be able to upload large files using writer() in multiple parts", async () => {
{
const s3File = bucket.file(tmp_filename, options);
const writer = s3File.writer({
queueSize: 10,
});
for (let i = 0; i < 10; i++) {
writer.write(mediumPayload);
}
await writer.end();
const stat = await s3File.stat();
expect(stat.size).toBe(Buffer.byteLength(mediumPayload) * 10);
}
}, 30_000);
});
});

Expand Down