Skip to content

Commit

Permalink
fix(fs): WriteStream pending write fastpath (#16856)
Browse files Browse the repository at this point in the history
Co-authored-by: Jarred Sumner <[email protected]>
  • Loading branch information
dylan-conway and Jarred-Sumner authored Jan 31, 2025
1 parent 212944a commit b098c9e
Show file tree
Hide file tree
Showing 147 changed files with 7,037 additions and 247 deletions.
4 changes: 2 additions & 2 deletions src/baby_list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub fn BabyList(comptime Type: type) type {
return this.len - initial;
}

pub fn writeLatin1(this: *@This(), allocator: std.mem.Allocator, str: []const u8) !u32 {
pub fn writeLatin1(this: *@This(), allocator: std.mem.Allocator, str: []const u8) OOM!u32 {
if (comptime Type != u8)
@compileError("Unsupported for type " ++ @typeName(Type));
const initial = this.len;
Expand All @@ -352,7 +352,7 @@ pub fn BabyList(comptime Type: type) type {
return this.len - initial;
}

pub fn writeUTF16(this: *@This(), allocator: std.mem.Allocator, str: []const u16) !u32 {
pub fn writeUTF16(this: *@This(), allocator: std.mem.Allocator, str: []const u16) OOM!u32 {
if (comptime Type != u8)
@compileError("Unsupported for type " ++ @typeName(Type));

Expand Down
3 changes: 3 additions & 0 deletions src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4378,6 +4378,9 @@ pub fn jsCreateSocketPair(global: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JS
return global.throwValue(err.toJSC(global));
}

_ = bun.sys.setNonblocking(bun.toFD(fds_[0]));
_ = bun.sys.setNonblocking(bun.toFD(fds_[1]));

const array = JSC.JSValue.createEmptyArray(global, 2);
array.putIndex(global, 0, JSC.jsNumber(fds_[0]));
array.putIndex(global, 1, JSC.jsNumber(fds_[1]));
Expand Down
43 changes: 30 additions & 13 deletions src/bun.js/bindings/BunProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2018,10 +2018,14 @@ static JSValue constructProcessHrtimeObject(VM& vm, JSObject* processObject)

return hrtime;
}
enum class BunProcessStdinFdType : int32_t {
file = 0,
pipe = 1,
socket = 2,
};
extern "C" BunProcessStdinFdType Bun__Process__getStdinFdType(void*, int fd);

#if OS(WINDOWS)
extern "C" void Bun__ForceFileSinkToBeSynchronousOnWindows(JSC::JSGlobalObject*, JSC::EncodedJSValue);
#endif
extern "C" void Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio(JSC::JSGlobalObject*, JSC::EncodedJSValue);
static JSValue constructStdioWriteStream(JSC::JSGlobalObject* globalObject, int fd)
{
auto& vm = JSC::getVM(globalObject);
Expand All @@ -2030,6 +2034,9 @@ static JSValue constructStdioWriteStream(JSC::JSGlobalObject* globalObject, int
JSC::JSFunction* getStdioWriteStream = JSC::JSFunction::create(vm, globalObject, processObjectInternalsGetStdioWriteStreamCodeGenerator(vm), globalObject);
JSC::MarkedArgumentBuffer args;
args.append(JSC::jsNumber(fd));
args.append(jsBoolean(bun_stdio_tty[fd]));
BunProcessStdinFdType fdType = Bun__Process__getStdinFdType(Bun::vm(vm), fd);
args.append(jsNumber(static_cast<int32_t>(fdType)));

JSC::CallData callData = JSC::getCallData(getStdioWriteStream);

Expand All @@ -2049,15 +2056,26 @@ static JSValue constructStdioWriteStream(JSC::JSGlobalObject* globalObject, int
ASSERT_WITH_MESSAGE(JSC::isJSArray(result), "Expected an array from getStdioWriteStream");
JSC::JSArray* resultObject = JSC::jsCast<JSC::JSArray*>(result);

// process.stdout and process.stderr differ from other Node.js streams in important ways:
// 1. They are used internally by console.log() and console.error(), respectively.
// 2. Writes may be synchronous depending on what the stream is connected to and whether the system is Windows or POSIX:
// Files: synchronous on Windows and POSIX
// TTYs (Terminals): asynchronous on Windows, synchronous on POSIX
// Pipes (and sockets): synchronous on Windows, asynchronous on POSIX
bool forceSync = false;
#if OS(WINDOWS)
Zig::GlobalObject* globalThis = jsCast<Zig::GlobalObject*>(globalObject);
// Node.js docs - https://nodejs.org/api/process.html#a-note-on-process-io
// > Files: synchronous on Windows and POSIX
// > TTYs (Terminals): asynchronous on Windows, synchronous on POSIX
// > Pipes (and sockets): synchronous on Windows, asynchronous on POSIX
// > Synchronous writes avoid problems such as output written with console.log() or console.error() being unexpectedly interleaved, or not written at all if process.exit() is called before an asynchronous write completes. See process.exit() for more information.
Bun__ForceFileSinkToBeSynchronousOnWindows(globalThis, JSValue::encode(resultObject->getIndex(globalObject, 1)));
forceSync = fdType == BunProcessStdinFdType::file || fdType == BunProcessStdinFdType::pipe;
#else
// Note: files are always sync anyway.
// forceSync = fdType == BunProcessStdinFdType::file || bun_stdio_tty[fd];

// TDOO: once console.* is wired up to write/read through the same buffering mechanism as FileSink for process.stdout, process.stderr, we can make this non-blocking for sockets on POSIX.
// Until then, we have to force it to be sync EVEN for sockets or else console.log() may flush at a different time than process.stdout.write.
forceSync = true;
#endif
if (forceSync) {
Bun__ForceFileSinkToBeSynchronousForProcessObjectStdio(globalObject, JSValue::encode(resultObject->getIndex(globalObject, 1)));
}

return resultObject->getIndex(globalObject, 0);
}
Expand All @@ -2076,8 +2094,6 @@ static JSValue constructStderr(VM& vm, JSObject* processObject)
#define STDIN_FILENO 0
#endif

extern "C" int32_t Bun__Process__getStdinFdType(void*);

static JSValue constructStdin(VM& vm, JSObject* processObject)
{
auto* globalObject = processObject->globalObject();
Expand All @@ -2086,7 +2102,8 @@ static JSValue constructStdin(VM& vm, JSObject* processObject)
JSC::MarkedArgumentBuffer args;
args.append(JSC::jsNumber(STDIN_FILENO));
args.append(jsBoolean(bun_stdio_tty[STDIN_FILENO]));
args.append(jsNumber(Bun__Process__getStdinFdType(Bun::vm(vm))));
BunProcessStdinFdType fdType = Bun__Process__getStdinFdType(Bun::vm(vm), STDIN_FILENO);
args.append(jsNumber(static_cast<int32_t>(fdType)));
JSC::CallData callData = JSC::getCallData(getStdioWriteStream);

NakedPtr<JSC::Exception> returnedException = nullptr;
Expand Down
21 changes: 20 additions & 1 deletion src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ const ProcessMiniEventLoopWaiterThreadTask = if (Environment.isPosix) bun.spawn.
const ShellAsyncSubprocessDone = bun.shell.Interpreter.Cmd.ShellAsyncSubprocessDone;
const RuntimeTranspilerStore = JSC.RuntimeTranspilerStore;
const ServerAllConnectionsClosedTask = @import("./api/server.zig").ServerAllConnectionsClosedTask;
const FlushPendingFileSinkTask = JSC.WebCore.FlushPendingFileSinkTask;

// Task.get(ReadFileTask) -> ?ReadFileTask
pub const Task = TaggedPointerUnion(.{
Expand Down Expand Up @@ -510,6 +511,7 @@ pub const Task = TaggedPointerUnion(.{
ReadFileTask,
Readlink,
Readv,
FlushPendingFileSinkTask,
Realpath,
RealpathNonNative,
Rename,
Expand Down Expand Up @@ -895,7 +897,7 @@ pub const EventLoop = struct {

defer this.debug.exit();

if (count == 1) {
if (count == 1 and !this.virtual_machine.is_inside_deferred_task_queue) {
this.drainMicrotasksWithGlobal(this.global, this.virtual_machine.jsc);
}

Expand Down Expand Up @@ -925,7 +927,10 @@ pub const EventLoop = struct {

jsc_vm.releaseWeakRefs();
JSC__JSGlobalObject__drainMicrotasks(globalObject);

this.virtual_machine.is_inside_deferred_task_queue = true;
this.deferred_tasks.run();
this.virtual_machine.is_inside_deferred_task_queue = false;

if (comptime bun.Environment.isDebug) {
this.debug.drain_microtasks_count_outside_tick_queue += @as(usize, @intFromBool(!this.debug.is_inside_tick_queue));
Expand Down Expand Up @@ -1054,6 +1059,7 @@ pub const EventLoop = struct {
var shell_rm_task: *ShellRmDirTask = task.get(ShellRmDirTask).?;
shell_rm_task.runFromMainThread();
},

@field(Task.Tag, typeBaseName(@typeName(ShellGlobTask))) => {
var shell_glob_task: *ShellGlobTask = task.get(ShellGlobTask).?;
shell_glob_task.runFromMainThread();
Expand Down Expand Up @@ -1349,6 +1355,11 @@ pub const EventLoop = struct {
any.runFromJSThread();
},

@field(Task.Tag, typeBaseName(@typeName(FlushPendingFileSinkTask))) => {
var any: *FlushPendingFileSinkTask = task.get(FlushPendingFileSinkTask).?;
any.runFromJSThread();
},

else => {
bun.Output.panic("Unexpected tag: {s}", .{@tagName(task.tag())});
},
Expand Down Expand Up @@ -2254,6 +2265,14 @@ pub const EventLoopHandle = union(enum) {
};
}

pub fn bunVM(this: EventLoopHandle) ?*JSC.VirtualMachine {
if (this == .js) {
return this.js.virtual_machine;
}

return null;
}

pub fn stderr(this: EventLoopHandle) *JSC.WebCore.Blob.Store {
return switch (this) {
.js => this.js.virtual_machine.rareData().stderr(),
Expand Down
2 changes: 2 additions & 0 deletions src/bun.js/javascript.zig
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,8 @@ pub const VirtualMachine = struct {

body_value_hive_allocator: BodyValueHiveAllocator = undefined,

is_inside_deferred_task_queue: bool = false,

pub const OnUnhandledRejection = fn (*VirtualMachine, globalObject: *JSC.JSGlobalObject, JSC.JSValue) void;

pub const OnException = fn (*ZigException) void;
Expand Down
11 changes: 8 additions & 3 deletions src/bun.js/rare_data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,16 @@ const StdinFdType = enum(i32) {
socket = 2,
};

pub export fn Bun__Process__getStdinFdType(vm: *JSC.VirtualMachine) StdinFdType {
const mode = vm.rareData().stdin().data.file.mode;
pub export fn Bun__Process__getStdinFdType(vm: *JSC.VirtualMachine, fd: i32) StdinFdType {
const mode = switch (fd) {
0 => vm.rareData().stdin().data.file.mode,
1 => vm.rareData().stdout().data.file.mode,
2 => vm.rareData().stderr().data.file.mode,
else => unreachable,
};
if (bun.S.ISFIFO(mode)) {
return .pipe;
} else if (bun.S.ISSOCK(mode) or bun.S.ISCHR(mode)) {
} else if (bun.S.ISSOCK(mode)) {
return .socket;
} else {
return .file;
Expand Down
Loading

0 comments on commit b098c9e

Please sign in to comment.