Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fs): WriteStream pending write fastpath #16856

Merged
merged 41 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6ef67d3
dont write out of order
dylan-conway Jan 29, 2025
dc04802
update
dylan-conway Jan 29, 2025
21a4816
update
dylan-conway Jan 29, 2025
ce66af7
fixup
dylan-conway Jan 29, 2025
b8ff702
one more
dylan-conway Jan 29, 2025
135c13b
auto flush
dylan-conway Jan 29, 2025
07feacb
progress
dylan-conway Jan 29, 2025
9086433
buffered stream
dylan-conway Jan 30, 2025
08e8588
missing onWrite
dylan-conway Jan 30, 2025
e80764b
Merge branch 'main' into dylan/fix-pending-write
dylan-conway Jan 30, 2025
991871a
fix windows zig build
dylan-conway Jan 30, 2025
3ff9aa2
update ref
dylan-conway Jan 30, 2025
1242201
clear capacity
dylan-conway Jan 30, 2025
a21c44c
test
dylan-conway Jan 30, 2025
5c13864
fix windows build
dylan-conway Jan 30, 2025
7dce37a
Update src/bun.js/webcore/streams.zig
dylan-conway Jan 30, 2025
d33699b
runPending update
dylan-conway Jan 30, 2025
07b1dfb
mini eventloop
dylan-conway Jan 30, 2025
17b5d04
stderr
dylan-conway Jan 30, 2025
6c1520c
update
dylan-conway Jan 30, 2025
d34465b
Don't spin loop
Jarred-Sumner Jan 30, 2025
a707cd9
Update sys.zig
Jarred-Sumner Jan 30, 2025
3aa413c
Update streams.zig
Jarred-Sumner Jan 30, 2025
4040d9e
fix
Jarred-Sumner Jan 30, 2025
11e477c
Update sys.zig
Jarred-Sumner Jan 30, 2025
3d2bdcf
Update sys.zig
Jarred-Sumner Jan 30, 2025
ee815bd
Match node's behavior with force sync
Jarred-Sumner Jan 30, 2025
0f410dd
Update streams.zig
Jarred-Sumner Jan 30, 2025
d561ea3
Ensure we actually set blocking
Jarred-Sumner Jan 30, 2025
1a6181b
Workaround for zig std lib decision
Jarred-Sumner Jan 30, 2025
b397a20
Update PipeWriter.zig
Jarred-Sumner Jan 30, 2025
0f36adb
Delete the function that is labeled as not being correct
Jarred-Sumner Jan 30, 2025
4309c75
Update tty.ts
Jarred-Sumner Jan 30, 2025
c87cb34
update
dylan-conway Jan 30, 2025
5401b5f
update
dylan-conway Jan 31, 2025
f9d5d91
write fast
dylan-conway Jan 31, 2025
6bef911
oops
dylan-conway Jan 31, 2025
5690100
update
dylan-conway Jan 31, 2025
f6b96ce
Merge branch 'main' into dylan/fix-pending-write
dylan-conway Jan 31, 2025
fe2ae42
block
dylan-conway Jan 31, 2025
2de9cc2
fix it!!
dylan-conway Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/baby_list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub fn BabyList(comptime Type: type) type {
return this.len - initial;
}

pub fn writeLatin1(this: *@This(), allocator: std.mem.Allocator, str: []const u8) !u32 {
pub fn writeLatin1(this: *@This(), allocator: std.mem.Allocator, str: []const u8) OOM!u32 {
if (comptime Type != u8)
@compileError("Unsupported for type " ++ @typeName(Type));
const initial = this.len;
Expand All @@ -352,7 +352,7 @@ pub fn BabyList(comptime Type: type) type {
return this.len - initial;
}

pub fn writeUTF16(this: *@This(), allocator: std.mem.Allocator, str: []const u16) !u32 {
pub fn writeUTF16(this: *@This(), allocator: std.mem.Allocator, str: []const u16) OOM!u32 {
if (comptime Type != u8)
@compileError("Unsupported for type " ++ @typeName(Type));

Expand Down
3 changes: 3 additions & 0 deletions src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4378,6 +4378,9 @@ pub fn jsCreateSocketPair(global: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JS
return global.throwValue(err.toJSC(global));
}

_ = bun.sys.setNonblocking(bun.toFD(fds_[0]));
_ = bun.sys.setNonblocking(bun.toFD(fds_[1]));

const array = JSC.JSValue.createEmptyArray(global, 2);
array.putIndex(global, 0, JSC.jsNumber(fds_[0]));
array.putIndex(global, 1, JSC.jsNumber(fds_[1]));
Expand Down
13 changes: 2 additions & 11 deletions src/bun.js/bindings/BunProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2019,9 +2019,7 @@ static JSValue constructProcessHrtimeObject(VM& vm, JSObject* processObject)
return hrtime;
}

#if OS(WINDOWS)
extern "C" void Bun__ForceFileSinkToBeSynchronousOnWindows(JSC::JSGlobalObject*, JSC::EncodedJSValue);
#endif
extern "C" void Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio(JSC::JSGlobalObject*, JSC::EncodedJSValue);
static JSValue constructStdioWriteStream(JSC::JSGlobalObject* globalObject, int fd)
{
auto& vm = JSC::getVM(globalObject);
Expand Down Expand Up @@ -2049,15 +2047,8 @@ static JSValue constructStdioWriteStream(JSC::JSGlobalObject* globalObject, int
ASSERT_WITH_MESSAGE(JSC::isJSArray(result), "Expected an array from getStdioWriteStream");
JSC::JSArray* resultObject = JSC::jsCast<JSC::JSArray*>(result);

#if OS(WINDOWS)
Zig::GlobalObject* globalThis = jsCast<Zig::GlobalObject*>(globalObject);
// Node.js docs - https://nodejs.org/api/process.html#a-note-on-process-io
// > Files: synchronous on Windows and POSIX
// > TTYs (Terminals): asynchronous on Windows, synchronous on POSIX
// > Pipes (and sockets): synchronous on Windows, asynchronous on POSIX
// > Synchronous writes avoid problems such as output written with console.log() or console.error() being unexpectedly interleaved, or not written at all if process.exit() is called before an asynchronous write completes. See process.exit() for more information.
Bun__ForceFileSinkToBeSynchronousOnWindows(globalThis, JSValue::encode(resultObject->getIndex(globalObject, 1)));
#endif
Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio(globalThis, JSValue::encode(resultObject->getIndex(globalObject, 1)));

return resultObject->getIndex(globalObject, 0);
}
Expand Down
134 changes: 94 additions & 40 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,7 @@ pub const ArrayBufferSink = struct {
pub const JSSink = NewJSSink(@This(), "ArrayBufferSink");
};

const AutoFlusher = struct {
pub const AutoFlusher = struct {
registered: bool = false,

pub fn registerDeferredMicrotaskWithType(comptime Type: type, this: *Type, vm: *JSC.VirtualMachine) void {
Expand Down Expand Up @@ -2894,7 +2894,8 @@ pub const NetworkSink = struct {
return .{ .owned = len };
}

this.buffer.writeLatin1(bytes) catch {
const check_ascii = false;
this.buffer.writeLatin1(bytes, check_ascii) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};

Expand All @@ -2905,15 +2906,18 @@ pub const NetworkSink = struct {
} else if (this.buffer.size() + len >= this.getHighWaterMark()) {
// kinda fast path:
// - combined chunk is large enough to flush automatically
this.buffer.writeLatin1(bytes) catch {

const check_ascii = true;
this.buffer.writeLatin1(bytes, check_ascii) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
_ = this.internalFlush() catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
} else {
this.buffer.writeLatin1(bytes) catch {
const check_ascii = true;
this.buffer.writeLatin1(bytes, check_ascii) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
}
Expand Down Expand Up @@ -3421,10 +3425,12 @@ pub const FileSink = struct {
// we should not duplicate these fields...
pollable: bool = false,
nonblocking: bool = false,
force_sync_on_windows: bool = false,
force_sync: bool = false,

is_socket: bool = false,
fd: bun.FileDescriptor = bun.invalid_fd,
has_js_called_unref: bool = false,

auto_flusher: AutoFlusher = .{},

const log = Output.scoped(.FileSink, false);

Expand All @@ -3438,17 +3444,16 @@ pub const FileSink = struct {
return this.writer.memoryCost();
}

fn Bun__ForceFileSinkToBeSynchronousOnWindows(globalObject: *JSC.JSGlobalObject, jsvalue: JSC.JSValue) callconv(.C) void {
comptime bun.assert(Environment.isWindows);

fn Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio(globalObject: *JSC.JSGlobalObject, jsvalue: JSC.JSValue) callconv(.C) void {
var this: *FileSink = @alignCast(@ptrCast(JSSink.fromJS(globalObject, jsvalue) orelse return));
this.force_sync_on_windows = true;
this.force_sync = true;
if (comptime !Environment.isWindows) {
this.writer.force_sync = true;
}
}

comptime {
if (Environment.isWindows) {
@export(Bun__ForceFileSinkToBeSynchronousOnWindows, .{ .name = "Bun__ForceFileSinkToBeSynchronousOnWindows" });
}
@export(Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio, .{ .name = "Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio" });
}

pub fn onAttachedProcessExit(this: *FileSink) void {
Expand Down Expand Up @@ -3488,6 +3493,10 @@ pub const FileSink = struct {
// If there's no pending write, no need to keep the event loop ref'd.
this.writer.updateRef(this.eventLoop(), has_pending_data);

if (has_pending_data) {
AutoFlusher.registerDeferredMicrotaskWithType(@This(), this, JSC.VirtualMachine.get());
}

// if we are not done yet and has pending data we just wait so we do not runPending twice
if (status == .pending and has_pending_data) {
if (this.pending.state == .pending) {
Expand Down Expand Up @@ -3589,22 +3598,36 @@ pub const FileSink = struct {
pub fn setup(this: *FileSink, options: *const StreamStart.FileSinkOptions) JSC.Maybe(void) {
// TODO: this should be concurrent.
var isatty: ?bool = null;
var is_nonblocking_tty = false;
var is_nonblocking = false;
const fd = switch (switch (options.input_path) {
.path => |path| bun.sys.openA(path.slice(), options.flags(), options.mode),
.path => |path| brk: {
is_nonblocking = true;
break :brk bun.sys.openA(path.slice(), options.flags(), options.mode);
},
.fd => |fd_| brk: {
if (comptime Environment.isPosix and FeatureFlags.nonblocking_stdout_and_stderr_on_posix) {
if (bun.FDTag.get(fd_) != .none) {
const rc = bun.C.open_as_nonblocking_tty(@intCast(fd_.cast()), bun.O.WRONLY);
if (rc > -1) {
isatty = true;
is_nonblocking_tty = true;
is_nonblocking = true;
break :brk JSC.Maybe(bun.FileDescriptor){ .result = bun.toFD(rc) };
}
}
}

break :brk bun.sys.dupWithFlags(fd_, if (bun.FDTag.get(fd_) == .none and !this.force_sync_on_windows) bun.O.NONBLOCK else 0);
const duped = bun.sys.dupWithFlags(fd_, 0);

if (comptime Environment.isPosix) {
if (bun.FDTag.get(fd_) == .none and !this.force_sync and duped == .result) {
is_nonblocking = switch (bun.sys.getFcntlFlags(duped.result)) {
.result => |flags| (flags & bun.O.NONBLOCK) != 0,
.err => false,
};
}
}

break :brk duped;
},
}) {
.err => |err| return .{ .err = err },
Expand All @@ -3630,21 +3653,21 @@ pub const FileSink = struct {

this.fd = fd;
this.is_socket = std.posix.S.ISSOCK(stat.mode);
this.nonblocking = is_nonblocking_tty or (this.pollable and switch (options.input_path) {
this.nonblocking = is_nonblocking and this.pollable and switch (options.input_path) {
.path => true,
.fd => |fd_| bun.FDTag.get(fd_) == .none,
});
};
},
}
} else if (comptime Environment.isWindows) {
this.pollable = (bun.windows.GetFileType(fd.cast()) & bun.windows.FILE_TYPE_PIPE) != 0 and !this.force_sync_on_windows;
this.pollable = (bun.windows.GetFileType(fd.cast()) & bun.windows.FILE_TYPE_PIPE) != 0 and !this.force_sync;
this.fd = fd;
} else {
@compileError("TODO: implement for this platform");
}

if (comptime Environment.isWindows) {
if (this.force_sync_on_windows) {
if (this.force_sync) {
switch (this.writer.startSync(
fd,
this.pollable,
Expand Down Expand Up @@ -3721,6 +3744,33 @@ pub const FileSink = struct {
return .{ .result = {} };
}

pub fn onAutoFlush(this: *FileSink) bool {
if (this.done or !this.writer.hasPendingData()) {
this.updateRef(false);
this.auto_flusher.registered = false;
return false;
}

const amt = switch (this.writer.flush()) {
.done, .wrote, .pending => |amt| amt,
else => 0,
};

if (amt == 0) {
this.updateRef(false);
} else {
this.runPending();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to run pending when:

  • done
  • If we wrote part of the data

Instead, we should runPending when !this.writer.hasPendingData()

If this leads to a timeout in tests, it's likely due to not draining microtasks since this is being called after the microtask queue is over. If the developer immediately calls write every time the write Promise fulfills, that could maybe lead to an infinite loop but we will have to see.

I think both of these branches if (amt == 0) and this one should be removed. And instead

}

if (!this.writer.hasPendingData()) {
this.updateRef(false);
this.auto_flusher.registered = false;
return false;
}

return true;
}

pub fn flush(_: *FileSink) JSC.Maybe(void) {
return .{ .result = {} };
}
Expand Down Expand Up @@ -3841,6 +3891,7 @@ pub const FileSink = struct {
pub fn deinit(this: *FileSink) void {
this.pending.deinit();
this.writer.deinit();
AutoFlusher.unregisterDeferredMicrotaskWithType(@This(), this, JSC.VirtualMachine.get());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to not run when it's the shell

}

pub fn toJS(this: *FileSink, globalThis: *JSGlobalObject) JSValue {
Expand Down Expand Up @@ -3892,7 +3943,6 @@ pub const FileSink = struct {
}

pub fn updateRef(this: *FileSink, value: bool) void {
this.has_js_called_unref = !value;
if (value) {
this.writer.enableKeepingProcessAlive(this.event_loop_handle);
} else {
Expand Down Expand Up @@ -3961,7 +4011,6 @@ pub const FileReader = struct {
buffered: std.ArrayListUnmanaged(u8) = .{},
read_inside_on_pull: ReadDuringJSOnPullResult = .{ .none = {} },
highwater_mark: usize = 16384,
has_js_called_unref: bool = false,

pub const IOReader = bun.io.BufferedReader;
pub const Poll = IOReader;
Expand Down Expand Up @@ -3989,37 +4038,43 @@ pub const FileReader = struct {
pub fn openFileBlob(file: *Blob.FileStore) JSC.Maybe(OpenedFileBlob) {
var this = OpenedFileBlob{ .fd = bun.invalid_fd };
var file_buf: bun.PathBuffer = undefined;
var is_nonblocking_tty = false;
var is_nonblocking = false;

const fd = if (file.pathlike == .fd)
if (file.pathlike.fd.isStdio()) brk: {
if (comptime Environment.isPosix) {
if (comptime Environment.isPosix and FeatureFlags.nonblocking_stdout_and_stderr_on_posix) {
dylan-conway marked this conversation as resolved.
Show resolved Hide resolved
const rc = bun.C.open_as_nonblocking_tty(file.pathlike.fd.int(), bun.O.RDONLY);
if (rc > -1) {
is_nonblocking_tty = true;
is_nonblocking = true;
file.is_atty = true;
break :brk bun.toFD(rc);
}
}
break :brk file.pathlike.fd;
} else switch (Syscall.dupWithFlags(file.pathlike.fd, brk: {
} else brk: {
const duped = Syscall.dupWithFlags(file.pathlike.fd, 0);

if (duped != .result) {
return .{ .err = duped.err.withFd(file.pathlike.fd) };
}

const fd = duped.result;

if (comptime Environment.isPosix) {
if (bun.FDTag.get(file.pathlike.fd) == .none and !(file.is_atty orelse false)) {
break :brk bun.O.NONBLOCK;
if (bun.FDTag.get(fd) == .none) {
is_nonblocking = switch (bun.sys.getFcntlFlags(fd)) {
.result => |flags| (flags & bun.O.NONBLOCK) != 0,
.err => false,
};
}
}

break :brk 0;
})) {
.result => |fd| switch (bun.sys.toLibUVOwnedFD(fd, .dup, .close_on_fail)) {
break :brk switch (bun.sys.toLibUVOwnedFD(fd, .dup, .close_on_fail)) {
.result => |owned_fd| owned_fd,
.err => |err| {
return .{ .err = err };
},
},
.err => |err| {
return .{ .err = err.withFd(file.pathlike.fd) };
},
};
}
else switch (Syscall.open(file.pathlike.path.sliceZ(&file_buf), bun.O.RDONLY | bun.O.NONBLOCK | bun.O.CLOEXEC, 0)) {
.result => |fd| fd,
Expand Down Expand Up @@ -4055,7 +4110,7 @@ pub const FileReader = struct {
return .{ .err = Syscall.Error.fromCode(.ISDIR, .fstat) };
}

this.pollable = bun.sys.isPollable(stat.mode) or is_nonblocking_tty or (file.is_atty orelse false);
this.pollable = bun.sys.isPollable(stat.mode) or is_nonblocking or (file.is_atty orelse false);
this.file_type = if (bun.S.ISFIFO(stat.mode))
.pipe
else if (bun.S.ISSOCK(stat.mode))
Expand All @@ -4064,11 +4119,11 @@ pub const FileReader = struct {
.file;

// pretend it's a non-blocking pipe if it's a TTY
if (is_nonblocking_tty and this.file_type != .socket) {
if (is_nonblocking and this.file_type != .socket) {
this.file_type = .nonblocking_pipe;
}

this.nonblocking = is_nonblocking_tty or (this.pollable and !(file.is_atty orelse false));
this.nonblocking = is_nonblocking or (this.pollable and !(file.is_atty orelse false));

if (this.nonblocking and this.file_type == .pipe) {
this.file_type = .nonblocking_pipe;
Expand Down Expand Up @@ -4507,7 +4562,6 @@ pub const FileReader = struct {

pub fn setRefOrUnref(this: *FileReader, enable: bool) void {
if (this.done) return;
this.has_js_called_unref = !enable;
this.reader.updateRef(enable);
}

Expand Down
Loading