Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions lib/std/Io.zig
Original file line number Diff line number Diff line change
Expand Up @@ -734,16 +734,19 @@ pub const Clock = enum {
/// A settable system-wide clock that measures real (i.e. wall-clock)
/// time. This clock is affected by discontinuous jumps in the system
/// time (e.g., if the system administrator manually changes the
/// clock), and by frequency adjust‐ ments performed by NTP and similar
/// clock), and by frequency adjustments performed by NTP and similar
/// applications.
///
/// This clock normally counts the number of seconds since 1970-01-01
/// 00:00:00 Coordinated Universal Time (UTC) except that it ignores
/// leap seconds; near a leap second it is typically adjusted by NTP to
/// stay roughly in sync with UTC.
///
/// The epoch is implementation-defined. For example NTFS/Windows uses
/// 1601-01-01.
/// Timestamps returned by implementations of this clock represent time
/// elapsed since 1970-01-01T00:00:00Z, the POSIX/Unix epoch, ignoring
/// leap seconds. This is colloquially known as "Unix time". If the
/// underlying OS uses a different epoch for native timestamps (e.g.,
/// Windows, which uses 1601-01-01) they are translated accordingly.
real,
/// A nonsettable system-wide clock that represents time since some
/// unspecified point in the past.
Expand Down
194 changes: 108 additions & 86 deletions lib/std/Io/Threaded.zig
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,9 @@ const AsyncClosure = struct {
func: *const fn (context: *anyopaque, result: *anyopaque) void,
reset_event: ResetEvent,
select_condition: ?*ResetEvent,
context_alignment: std.mem.Alignment,
context_offset: usize,
result_offset: usize,
alloc_len: usize,

const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));

Expand Down Expand Up @@ -425,18 +426,57 @@ const AsyncClosure = struct {

fn contextPointer(ac: *AsyncClosure) [*]u8 {
const base: [*]u8 = @ptrCast(ac);
return base + ac.context_alignment.forward(@sizeOf(AsyncClosure));
return base + ac.context_offset;
}

fn init(
gpa: Allocator,
mode: enum { async, concurrent },
result_len: usize,
result_alignment: std.mem.Alignment,
context: []const u8,
context_alignment: std.mem.Alignment,
func: *const fn (context: *const anyopaque, result: *anyopaque) void,
) Allocator.Error!*AsyncClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure);
const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment);
const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len);
const alloc_len = worst_case_result_offset + result_len;

const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), alloc_len)));
errdefer comptime unreachable;

const actual_context_offset = context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)) - @intFromPtr(ac);
const actual_result_offset = result_alignment.forward(actual_context_offset + context.len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a case I found where the alignment of the result data is incorrect:

const std = @import("std");

pub fn main() !void {
    var buffer: [1024]u8 align(64) = undefined;
    var fba: std.heap.FixedBufferAllocator = .init(buffer[1..]);

    var threaded: std.Io.Threaded = .init(fba.allocator());
    defer threaded.deinit();
    const io = threaded.io();

    var future = io.async(foo, .{});
    _ = future.await(io);
}

fn foo() Align64 {
    return .{ .data = 5 };
}

const Align64 = struct {
    data: u8 align(64),
};
zig run sample.zig 
thread 49096 panic: incorrect alignment
/home/techatrix/repos/zig/lib/std/Io.zig:1538:44: 0x1163858 in start (std.zig)
            const result_casted: *Result = @ptrCast(@alignCast(result));
                                           ^
/home/techatrix/repos/zig/lib/std/Io/Threaded.zig:406:16: 0x108cede in start (std.zig)
        ac.func(ac.contextPointer(), ac.resultPointer());
               ^
/home/techatrix/repos/zig/lib/std/Io/Threaded.zig:188:26: 0x10d9a26 in worker (std.zig)
            closure.start(closure);
                         ^
/home/techatrix/repos/zig/lib/std/Thread.zig:558:13: 0x10b2df0 in callFn__anon_16045 (std.zig)
            @call(.auto, f, args);
            ^
/home/techatrix/repos/zig/lib/std/Thread.zig:1534:30: 0x108eb60 in entryFn (std.zig)
                return callFn(f, self.fn_args);
                             ^
/home/techatrix/repos/zig/lib/std/os/linux/x86_64.zig:105:5: 0x10b2ea5 in clone (std.zig)
    asm volatile (
    ^
Aborted (core dumped)

I suspect that the issue is with actual_result_offset because it tries to align the size/offset instead of the memory address like it's being done in actual_context_offset. I haven't looked to much into it so this may because of something else.

ac.* = .{
.closure = .{
.cancel_tid = .none,
.start = start,
.is_concurrent = switch (mode) {
.async => false,
.concurrent => true,
},
},
.func = func,
.context_offset = actual_context_offset,
.result_offset = actual_result_offset,
.alloc_len = alloc_len,
.reset_event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);
return ac;
}

fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
fn waitAndDeinit(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
ac.reset_event.waitUncancelable();
@memcpy(result, ac.resultPointer()[0..result.len]);
free(ac, gpa, result.len);
ac.deinit(gpa);
}

fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void {
fn deinit(ac: *AsyncClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
gpa.free(base[0 .. ac.result_offset + result_len]);
gpa.free(base[0..ac.alloc_len]);
}
};

Expand All @@ -452,44 +492,28 @@ fn async(
start(context.ptr, result.ptr);
return null;
}

const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
};
};

const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
}));

ac.* = .{
.closure = .{
.cancel_tid = .none,
.start = AsyncClosure.start,
.is_concurrent = false,
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.reset_event = .unset,
.select_condition = null,
};

@memcpy(ac.contextPointer()[0..context.len], context);

t.mutex.lock();

const thread_capacity = cpu_count - 1 + t.concurrent_count;

t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
ac.free(gpa, result.len);
ac.deinit(gpa);
start(context.ptr, result.ptr);
return null;
};
Expand All @@ -501,7 +525,7 @@ fn async(
if (t.threads.items.len == 0) {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
ac.free(gpa, result.len);
ac.deinit(gpa);
start(context.ptr, result.ptr);
return null;
}
Expand Down Expand Up @@ -530,27 +554,11 @@ fn concurrent(

const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1;

const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch
const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch {
return error.ConcurrencyUnavailable;
const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes));

ac.* = .{
.closure = .{
.cancel_tid = .none,
.start = AsyncClosure.start,
.is_concurrent = true,
},
.func = start,
.context_alignment = context_alignment,
.result_offset = result_offset,
.reset_event = .unset,
.select_condition = null,
};
@memcpy(ac.contextPointer()[0..context.len], context);

t.mutex.lock();

Expand All @@ -559,7 +567,7 @@ fn concurrent(

t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
t.mutex.unlock();
ac.free(gpa, result_len);
ac.deinit(gpa);
return error.ConcurrencyUnavailable;
};

Expand All @@ -569,7 +577,7 @@ fn concurrent(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &ac.closure.node);
t.mutex.unlock();
ac.free(gpa, result_len);
ac.deinit(gpa);
return error.ConcurrencyUnavailable;
};
t.threads.appendAssumeCapacity(thread);
Expand All @@ -587,8 +595,8 @@ const GroupClosure = struct {
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
func: *const fn (*Io.Group, context: *anyopaque) void,
context_alignment: std.mem.Alignment,
context_len: usize,
context_offset: usize,
alloc_len: usize,

fn start(closure: *Closure) void {
const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
Expand Down Expand Up @@ -616,22 +624,48 @@ const GroupClosure = struct {
if (prev_state == (sync_one_pending | sync_is_waiting)) reset_event.set();
}

fn free(gc: *GroupClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
}

fn contextOffset(context_alignment: std.mem.Alignment) usize {
return context_alignment.forward(@sizeOf(GroupClosure));
}

fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
return contextOffset(context_alignment) + context_len;
}

fn contextPointer(gc: *GroupClosure) [*]u8 {
const base: [*]u8 = @ptrCast(gc);
return base + contextOffset(gc.context_alignment);
return base + gc.context_offset;
}

/// Does not initialize the `node` field.
fn init(
gpa: Allocator,
t: *Threaded,
group: *Io.Group,
context: []const u8,
context_alignment: std.mem.Alignment,
func: *const fn (*Io.Group, context: *const anyopaque) void,
) Allocator.Error!*GroupClosure {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment);
const alloc_len = worst_case_context_offset + context.len;

const gc: *GroupClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(GroupClosure), alloc_len)));
errdefer comptime unreachable;

const actual_context_offset = context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc);
gc.* = .{
.closure = .{
.cancel_tid = .none,
.start = start,
.is_concurrent = false,
},
.t = t,
.group = group,
.node = undefined,
.func = func,
.context_offset = actual_context_offset,
.alloc_len = alloc_len,
};
@memcpy(gc.contextPointer()[0..context.len], context);
return gc;
}

fn deinit(gc: *GroupClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
gpa.free(base[0..gc.alloc_len]);
}

const sync_is_waiting: usize = 1 << 0;
Expand All @@ -646,27 +680,14 @@ fn groupAsync(
start: *const fn (*Io.Group, context: *const anyopaque) void,
) void {
if (builtin.single_threaded) return start(group, context.ptr);

const t: *Threaded = @ptrCast(@alignCast(userdata));
const cpu_count = t.cpu_count catch 1;

const gpa = t.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len);
const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch {
return start(group, context.ptr);
}));
gc.* = .{
.closure = .{
.cancel_tid = .none,
.start = GroupClosure.start,
.is_concurrent = false,
},
.t = t,
.group = group,
.node = undefined,
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
};
@memcpy(gc.contextPointer()[0..context.len], context);

t.mutex.lock();

Expand All @@ -678,7 +699,7 @@ fn groupAsync(

t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
t.mutex.unlock();
gc.free(gpa);
gc.deinit(gpa);
return start(group, context.ptr);
};

Expand All @@ -688,7 +709,7 @@ fn groupAsync(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
assert(t.run_queue.popFirst() == &gc.closure.node);
t.mutex.unlock();
gc.free(gpa);
gc.deinit(gpa);
return start(group, context.ptr);
};
t.threads.appendAssumeCapacity(thread);
Expand Down Expand Up @@ -730,7 +751,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
gc.deinit(gpa);
node = node_next orelse break;
}
}
Expand Down Expand Up @@ -761,7 +782,7 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
while (true) {
const gc: *GroupClosure = @fieldParentPtr("node", node);
const node_next = node.next;
gc.free(gpa);
gc.deinit(gpa);
node = node_next orelse break;
}
}
Expand All @@ -776,7 +797,7 @@ fn await(
_ = result_alignment;
const t: *Threaded = @ptrCast(@alignCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
closure.waitAndFree(t.allocator, result);
closure.waitAndDeinit(t.allocator, result);
}

fn cancel(
Expand All @@ -789,7 +810,7 @@ fn cancel(
const t: *Threaded = @ptrCast(@alignCast(userdata));
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
ac.closure.requestCancel();
ac.waitAndFree(t.allocator, result);
ac.waitAndDeinit(t.allocator, result);
}

fn cancelRequested(userdata: ?*anyopaque) bool {
Expand Down Expand Up @@ -2864,7 +2885,8 @@ fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestam
.real => {
// RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds
// and uses the NTFS/Windows epoch, which is 1601-01-01.
return .{ .nanoseconds = @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100 };
const epoch_ns = std.time.epoch.windows * std.time.ns_per_s;
return .{ .nanoseconds = @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100 + epoch_ns };
},
.awake, .boot => {
// QPC on windows doesn't fail on >= XP/2000 and includes time suspended.
Expand Down