diff --git a/build.zig.zon b/build.zig.zon index 015935032d..eb2971201e 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -50,8 +50,8 @@ .hash = "secp256k1-0.5.0-2oLmGRAHAACjbp4BBLAoCU6Sotb_gqChbYOIIvBejN0V", }, .tracy = .{ - .url = "git+https://github.com/Syndica/tracy-zig#8fcc7521aa47e6dc8bf75de888f4ed4ed503c142", - .hash = "zig_tracy-0.12.2-4TLLRxpkAABhUEzSm2wXjioLd_-qruRviTnehDTbhZYR", + .url = "git+https://github.com/Syndica/tracy-zig#e4dfba6d9b2c6cbae9575c8f0df1794ca674514f", + .hash = "zig_tracy-0.12.2-4TLLRxFkAADqlI3jJRIfVpqKW99BKIjSi4aLKYGJnIYM", }, }, } diff --git a/src/accountsdb/snapshot/data.zig b/src/accountsdb/snapshot/data.zig index 75cd1daf06..6bdd771695 100644 --- a/src/accountsdb/snapshot/data.zig +++ b/src/accountsdb/snapshot/data.zig @@ -945,7 +945,7 @@ pub const FullSnapshotFileInfo = struct { } const str_max_len = std.fmt.count("{d}", .{std.math.maxInt(Slot)}); - const end_max = @max(filename.len, start + str_max_len + 1); + const end_max = @min(filename.len, start + str_max_len + 1); const filename_trunc = filename[0..end_max]; const end = std.mem.indexOfScalarPos(u8, filename_trunc, start + 1, '-') orelse return error.MissingSlotDelimiter; @@ -1086,7 +1086,7 @@ pub const IncrementalSnapshotFileInfo = struct { } const str_max_len = std.fmt.count("{d}", .{std.math.maxInt(Slot)}); - const end_max = @max(filename.len, start + str_max_len + 1); + const end_max = @min(filename.len, start + str_max_len + 1); const filename_trunc = filename[0..end_max]; const end = std.mem.indexOfScalarPos(u8, filename_trunc, start + 1, '-') orelse return error.MissingSlotDelimiter; @@ -1107,7 +1107,7 @@ pub const IncrementalSnapshotFileInfo = struct { } const str_max_len = std.fmt.count("{d}", .{std.math.maxInt(Slot)}); - const end_max = @max(filename.len, start + str_max_len + 1); + const end_max = @min(filename.len, start + str_max_len + 1); const filename_trunc = filename[0..end_max]; const end = std.mem.indexOfScalarPos(u8, filename_trunc, start + 1, '-') orelse return error.MissingSlotDelimiter; diff --git a/src/gossip/data.zig b/src/gossip/data.zig index 6b33561cee..75a67c0e53 100644 --- a/src/gossip/data.zig +++ b/src/gossip/data.zig @@ -557,6 +557,8 @@ pub const LegacyContactInfo = struct { /// call ContactInfo.deinit to free pub fn toContactInfo(self: *const LegacyContactInfo, allocator: std.mem.Allocator) !ContactInfo { var ci = ContactInfo.init(allocator, self.id, self.wallclock, self.shred_version); + errdefer ci.deinit(); + try ci.setSocket(.gossip, self.gossip); try ci.setSocket(.turbine_recv, self.turbine_recv); try ci.setSocket(.turbine_recv_quic, self.turbine_recv_quic); @@ -567,6 +569,7 @@ pub const LegacyContactInfo = struct { try ci.setSocket(.rpc, self.rpc); try ci.setSocket(.rpc_pubsub, self.rpc_pubsub); try ci.setSocket(.serve_repair, self.serve_repair); + return ci; } diff --git a/src/gossip/ping_pong.zig b/src/gossip/ping_pong.zig index 8a1e41b859..a2e50a7cec 100644 --- a/src/gossip/ping_pong.zig +++ b/src/gossip/ping_pong.zig @@ -124,12 +124,22 @@ pub const PingCache = struct { cache_capacity: usize, ) error{OutOfMemory}!Self { std.debug.assert(rate_limit_delay.asNanos() <= ttl.asNanos() / 2); + + var pings = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity); + errdefer pings.deinit(); + + var pongs = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity); + errdefer pongs.deinit(); + + var pending_cache = try LruCache(.non_locking, Hash, PubkeyAndSocketAddr).init(allocator, cache_capacity); + errdefer pending_cache.deinit(); + return Self{ .ttl = ttl, .rate_limit_delay = rate_limit_delay, - .pings = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity), - .pongs = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity), - .pending_cache = try LruCache(.non_locking, Hash, PubkeyAndSocketAddr).init(allocator, cache_capacity), + .pings = pings, + .pongs = pongs, + .pending_cache = pending_cache, .allocator = allocator, }; } diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 08e5ec8740..e29a2b01e0 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -223,12 +223,15 @@ pub const GossipService = struct { // setup channels for communication between threads var packet_incoming_channel = try Channel(Packet).create(allocator); errdefer packet_incoming_channel.destroy(); + packet_incoming_channel.name = "gossip packet incoming channel"; var packet_outgoing_channel = try Channel(Packet).create(allocator); errdefer packet_outgoing_channel.destroy(); + packet_outgoing_channel.name = "gossip packet outgoing channel"; var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator); errdefer verified_incoming_channel.destroy(); + packet_outgoing_channel.name = "gossip verified incoming channel"; // setup the socket (bind with read-timeout) const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified; @@ -249,10 +252,13 @@ pub const GossipService = struct { errdefer gossip_table.deinit(); // setup the active set for push messages - const active_set = ActiveSet.init(allocator); + var active_set = ActiveSet.init(allocator); + errdefer active_set.deinit(); // setup entrypoints var entrypoints = ArrayList(Entrypoint).init(allocator); + errdefer entrypoints.deinit(); + if (maybe_entrypoints) |entrypoint_addrs| { try entrypoints.ensureTotalCapacityPrecise(entrypoint_addrs.len); for (entrypoint_addrs) |entrypoint_addr| { @@ -261,25 +267,32 @@ pub const GossipService = struct { } // setup ping/pong cache - const ping_cache = try PingCache.init( + var ping_cache = try PingCache.init( allocator, PING_CACHE_TTL, PING_CACHE_RATE_LIMIT_DELAY, PING_CACHE_CAPACITY, ); + errdefer ping_cache.deinit(); const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key); const my_shred_version = my_contact_info.shred_version; - const failed_pull_hashes = HashTimeQueue.init(allocator); + + var failed_pull_hashes = HashTimeQueue.init(allocator); + errdefer failed_pull_hashes.deinit(); + const metrics = try GossipMetrics.init(); const exit_counter = try allocator.create(Atomic(u64)); + errdefer allocator.destroy(exit_counter); exit_counter.* = Atomic(u64).init(0); const exit = try allocator.create(Atomic(bool)); + errdefer allocator.destroy(exit); exit.* = Atomic(bool).init(false); - const service_manager = ServiceManager.init(allocator, .from(logger), exit, "gossip", .{}); + var service_manager = ServiceManager.init(allocator, .from(logger), exit, "gossip", .{}); + errdefer service_manager.deinit(); return .{ .allocator = allocator, @@ -3324,6 +3337,30 @@ test "init, exit, and deinit" { } } +test "leak checked gossip init" { + const testfn = struct { + fn f(allocator: std.mem.Allocator) !void { + var my_keypair = try KeyPair.generateDeterministic(@splat(1)); + const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key); + const contact_info = try localhostTestContactInfo(my_pubkey); + errdefer contact_info.deinit(); + + var gossip_service = try GossipService.init( + allocator, + allocator, + contact_info, + my_keypair, + null, + .FOR_TESTS, + ); + gossip_service.shutdown(); + gossip_service.deinit(); + } + }.f; + + try std.testing.checkAllAllocationFailures(std.testing.allocator, testfn, .{}); +} + const fuzz_service = sig.gossip.fuzz_service; pub const BenchmarkGossipServiceGeneral = struct { @@ -3576,6 +3613,7 @@ pub const BenchmarkGossipServicePullRequests = struct { fn localhostTestContactInfo(id: Pubkey) !ContactInfo { comptime std.debug.assert(@import("builtin").is_test); // should only be used for testin var contact_info = try LegacyContactInfo.default(id).toContactInfo(std.testing.allocator); + errdefer contact_info.deinit(); try contact_info.setSocket(.gossip, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 0)); return contact_info; } diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index e16807fadf..3a4b024c65 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const tracy = @import("tracy"); const sig = @import("../../sig.zig"); const ledger = @import("../lib.zig"); const lib = @import("lib.zig"); @@ -196,6 +197,9 @@ pub const ShredInserter = struct { is_repaired: []const bool, options: Options, ) !Result { + const zone = tracy.Zone.init(@src(), .{ .name = "insertShreds" }); + defer zone.deinit(); + const timestamp = sig.time.Instant.now(); /////////////////////////// // check inputs for validity and edge cases @@ -231,70 +235,79 @@ pub const ShredInserter = struct { var shred_insertion_timer = try Timer.start(); var newly_completed_data_sets = ArrayList(CompletedDataSetInfo).init(allocator); errdefer newly_completed_data_sets.deinit(); - for (shreds, is_repaired) |shred, is_repair| { - const shred_source: ShredSource = if (is_repair) .repaired else .turbine; - switch (shred) { - .data => |data_shred| { - if (options.shred_tracker) |tracker| { - tracker.registerDataShred(&shred.data, timestamp) catch |err| { - self.metrics.register_shred_error.observe(@errorCast(err)); - switch (err) { - error.SlotUnderflow, error.SlotOverflow => {}, - else => return err, + + { + const z = tracy.Zone.init(@src(), .{ .name = "insert received shreds" }); + defer z.deinit(); + + for (shreds, is_repaired) |shred, is_repair| { + const shred_source: ShredSource = if (is_repair) .repaired else .turbine; + switch (shred) { + .data => |data_shred| { + if (options.shred_tracker) |tracker| { + tracker.registerDataShred(&shred.data, timestamp) catch |err| { + self.metrics.register_shred_error.observe(@errorCast(err)); + switch (err) { + error.SlotUnderflow, error.SlotOverflow => {}, + else => return err, + } + }; + } + if (self.checkInsertDataShred( + data_shred, + &state, + merkle_root_validator, + write_batch, + options.is_trusted, + options.slot_leaders, + shred_source, + )) |completed_data_sets| { + if (is_repair) { + self.metrics.num_repair.inc(); } - }; - } - if (self.checkInsertDataShred( - data_shred, - &state, - merkle_root_validator, - write_batch, - options.is_trusted, - options.slot_leaders, - shred_source, - )) |completed_data_sets| { - if (is_repair) { - self.metrics.num_repair.inc(); + defer completed_data_sets.deinit(); + try newly_completed_data_sets.appendSlice(completed_data_sets.items); + self.metrics.num_inserted.inc(); + } else |e| switch (e) { + error.Exists => if (is_repair) { + self.metrics.num_repaired_data_shreds_exists.inc(); + } else { + self.metrics.num_turbine_data_shreds_exists.inc(); + }, + error.InvalidShred => self.metrics.num_data_shreds_invalid.inc(), + // error.LedgerError => { + // self.metrics.num_data_shreds_ledger_error.inc(); + // // TODO improve this (maybe should be an error set) + // }, + else => return e, // TODO explicit } - defer completed_data_sets.deinit(); - try newly_completed_data_sets.appendSlice(completed_data_sets.items); - self.metrics.num_inserted.inc(); - } else |e| switch (e) { - error.Exists => if (is_repair) { - self.metrics.num_repaired_data_shreds_exists.inc(); - } else { - self.metrics.num_turbine_data_shreds_exists.inc(); - }, - error.InvalidShred => self.metrics.num_data_shreds_invalid.inc(), - // error.LedgerError => { - // self.metrics.num_data_shreds_ledger_error.inc(); - // // TODO improve this (maybe should be an error set) - // }, - else => return e, // TODO explicit - } - }, - .code => |code_shred| { - // TODO error handling? - _ = try self.checkInsertCodeShred( - code_shred, - &state, - merkle_root_validator, - write_batch, - options.is_trusted, - shred_source, - ); - }, + }, + .code => |code_shred| { + // TODO error handling? + _ = try self.checkInsertCodeShred( + code_shred, + &state, + merkle_root_validator, + write_batch, + options.is_trusted, + shred_source, + ); + }, + } } + self.metrics.insert_shreds_elapsed_us.add(shred_insertion_timer.read().asMicros()); } - self.metrics.insert_shreds_elapsed_us.add(shred_insertion_timer.read().asMicros()); - ///////////////////////////////////// // recover shreds and insert them // var shred_recovery_timer = try Timer.start(); var valid_recovered_shreds = ArrayList([]const u8).init(allocator); defer valid_recovered_shreds.deinit(); + if (options.slot_leaders) |leaders| { + const z = tracy.Zone.init(@src(), .{ .name = "recover shreds and insert" }); + defer z.deinit(); + var reed_solomon_cache = try ReedSolomonCache.init(allocator); defer reed_solomon_cache.deinit(); const recovered_shreds = try self.tryShredRecovery( @@ -388,55 +401,65 @@ pub const ShredInserter = struct { // var merkle_chaining_timer = try Timer.start(); - const em0_keys, const em0_values = state.erasure_metas.items(); - for (em0_keys, em0_values) |erasure_set, working_em| if (working_em == .dirty) { - const slot = erasure_set.slot; - const erasure_meta: ErasureMeta = working_em.dirty; - if (try self.hasDuplicateShredsInSlot(slot)) { - continue; - } - // First code shred from this erasure batch, check the forward merkle root chaining - const shred_id = ShredId{ - .slot = slot, - .index = @intCast(erasure_meta.first_received_code_index), - .shred_type = .code, + { + const z = tracy.Zone.init(@src(), .{ .name = "check forward chaining" }); + defer z.deinit(); + + const em0_keys, const em0_values = state.erasure_metas.items(); + for (em0_keys, em0_values) |erasure_set, working_em| if (working_em == .dirty) { + const slot = erasure_set.slot; + const erasure_meta: ErasureMeta = working_em.dirty; + if (try self.hasDuplicateShredsInSlot(slot)) { + continue; + } + // First code shred from this erasure batch, check the forward merkle root chaining + const shred_id = ShredId{ + .slot = slot, + .index = @intCast(erasure_meta.first_received_code_index), + .shred_type = .code, + }; + // unreachable: Erasure meta was just created, initial shred must exist + const shred = state.just_inserted_shreds.get(shred_id).?; + // TODO: agave discards the result here. should we also? + _ = try merkle_root_validator.checkForwardChaining( + shred.code, + erasure_meta, + state.merkleRootMetas(), + ); }; - // unreachable: Erasure meta was just created, initial shred must exist - const shred = state.just_inserted_shreds.get(shred_id).?; - // TODO: agave discards the result here. should we also? - _ = try merkle_root_validator.checkForwardChaining( - shred.code, - erasure_meta, - state.merkleRootMetas(), - ); - }; + } ////////////////////////////////////////////////////// // check backward chaining for each merkle root // - var merkle_root_metas_iter = state.merkle_root_metas.iterator(); - while (merkle_root_metas_iter.next()) |mrm_entry| { - const erasure_set_id = mrm_entry.key_ptr.*; - const working_merkle_root_meta = mrm_entry.value_ptr; - if (working_merkle_root_meta.* == .clean or - try self.hasDuplicateShredsInSlot(erasure_set_id.slot)) - { - continue; + { + const z = tracy.Zone.init(@src(), .{ .name = "check backward chaining" }); + defer z.deinit(); + + var merkle_root_metas_iter = state.merkle_root_metas.iterator(); + while (merkle_root_metas_iter.next()) |mrm_entry| { + const erasure_set_id = mrm_entry.key_ptr.*; + const working_merkle_root_meta = mrm_entry.value_ptr; + if (working_merkle_root_meta.* == .clean or + try self.hasDuplicateShredsInSlot(erasure_set_id.slot)) + { + continue; + } + // First shred from this erasure batch, check the backwards merkle root chaining + const merkle_root_meta = working_merkle_root_meta.asRef(); + const shred_id = ShredId{ + .slot = erasure_set_id.slot, + .index = merkle_root_meta.first_received_shred_index, + .shred_type = merkle_root_meta.first_received_shred_type, + }; + // unreachable: Merkle root meta was just created, initial shred must exist + const shred = state.just_inserted_shreds.get(shred_id).?; + // TODO: agave discards the result here. should we also? + _ = try merkle_root_validator.checkBackwardChaining(shred, state.erasureMetas()); } - // First shred from this erasure batch, check the backwards merkle root chaining - const merkle_root_meta = working_merkle_root_meta.asRef(); - const shred_id = ShredId{ - .slot = erasure_set_id.slot, - .index = merkle_root_meta.first_received_shred_index, - .shred_type = merkle_root_meta.first_received_shred_type, - }; - // unreachable: Merkle root meta was just created, initial shred must exist - const shred = state.just_inserted_shreds.get(shred_id).?; - // TODO: agave discards the result here. should we also? - _ = try merkle_root_validator.checkBackwardChaining(shred, state.erasureMetas()); - } - self.metrics.merkle_chaining_elapsed_us.add(merkle_chaining_timer.read().asMicros()); + self.metrics.merkle_chaining_elapsed_us.add(merkle_chaining_timer.read().asMicros()); + } /////////////////////////// // commit and return diff --git a/src/ledger/shred_inserter/slot_chaining.zig b/src/ledger/shred_inserter/slot_chaining.zig index 977c577756..9302a60385 100644 --- a/src/ledger/shred_inserter/slot_chaining.zig +++ b/src/ledger/shred_inserter/slot_chaining.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const tracy = @import("tracy"); const sig = @import("../../sig.zig"); const ledger = @import("../lib.zig"); const shred_inserter = @import("lib.zig"); @@ -25,6 +26,9 @@ pub fn handleChaining( write_batch: *WriteBatch, working_set: *AutoHashMap(u64, SlotMetaWorkingSetEntry), ) !void { + const zone = tracy.Zone.init(@src(), .{ .name = "handleChaining" }); + defer zone.deinit(); + const count = working_set.count(); if (count == 0) return; // TODO is this correct? diff --git a/src/ledger/shred_inserter/working_state.zig b/src/ledger/shred_inserter/working_state.zig index 9e783bdb9d..bea913c54c 100644 --- a/src/ledger/shred_inserter/working_state.zig +++ b/src/ledger/shred_inserter/working_state.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const tracy = @import("tracy"); const sig = @import("../../sig.zig"); const ledger = @import("../lib.zig"); const shred_inserter = @import("lib.zig"); @@ -198,6 +199,9 @@ pub const PendingInsertShredsState = struct { } pub fn commit(self: *Self) !void { + const zone = tracy.Zone.init(@src(), .{ .name = "commit" }); + defer zone.deinit(); + var commit_working_sets_timer = try Timer.start(); // TODO: inputs and outputs of this function may need to be fleshed out diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 97a7ee47d1..f9ec295a92 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -2,6 +2,7 @@ const std = @import("std"); const builtin = @import("builtin"); const sig = @import("../sig.zig"); const network = @import("zig-network"); +const tracy = @import("tracy"); const xev = @import("xev"); const Allocator = std.mem.Allocator; @@ -352,8 +353,10 @@ const PerThread = struct { pub const Handle = std.Thread; pub fn spawn(st: *SocketThread) !void { + const sender_fn = if (builtin.os.tag == .linux) runSenderBatched else runSender; + st.handle = switch (st.direction) { - .sender => try std.Thread.spawn(.{}, runSender, .{st}), + .sender => try std.Thread.spawn(.{}, sender_fn, .{st}), .receiver => try std.Thread.spawn(.{}, runReceiver, .{st}), }; } @@ -363,6 +366,9 @@ const PerThread = struct { } fn runReceiver(st: *SocketThread) !void { + const zone = tracy.Zone.init(@src(), .{ .name = "runReceiver" }); + defer zone.deinit(); + defer { st.exit.afterExit(); st.logger.info().log("readSocket loop closed"); @@ -389,6 +395,9 @@ const PerThread = struct { } fn runSender(st: *SocketThread) !void { + const zone = tracy.Zone.init(@src(), .{ .name = "runSender" }); + defer zone.deinit(); + defer { // empty the channel while (st.channel.tryReceive()) |_| {} @@ -424,6 +433,108 @@ const PerThread = struct { } } } + + fn runSenderBatched(st: *SocketThread) !void { + const zone = tracy.Zone.init(@src(), .{ .name = "runSender (batched)" }); + defer zone.deinit(); + + defer { + // empty the channel + while (st.channel.tryReceive()) |_| {} + st.exit.afterExit(); + st.logger.debug().log("sendSocket loop closed"); + } + + var packets: std.ArrayListUnmanaged(Packet) = try .initCapacity( + st.allocator, + PACKETS_PER_BATCH, + ); + defer packets.deinit(st.allocator); + + // temp data needed for sending packets + const Msg = struct { + hdr: std.os.linux.mmsghdr_const, + sock_addr: network.EndPoint.SockAddr, + iovec: std.posix.iovec_const, + }; + + var msgs: std.MultiArrayList(Msg) = .empty; + defer msgs.deinit(st.allocator); + try msgs.setCapacity(st.allocator, PACKETS_PER_BATCH); + + while (true) { + st.channel.waitToReceive(st.exit) catch break; + + // drain packets and channel + while (!st.channel.isEmpty() or packets.items.len > 0) { + + // refill packets buf from channel + while (packets.items.len < PACKETS_PER_BATCH) { + packets.appendAssumeCapacity(st.channel.tryReceive() orelse break); + } + + defer msgs.clearRetainingCapacity(); + std.debug.assert(msgs.len == 0); + + // setup for sending packet batch + for (packets.items) |packet| { + // we're just filling buffers here, let's not error + errdefer comptime unreachable; + + const new_msg_idx = msgs.addOneAssumeCapacity(); + const msgs_slice = msgs.slice(); + + const new_io_vec: *std.posix.iovec_const = + &msgs_slice.items(.iovec)[new_msg_idx]; + new_io_vec.* = .{ .base = packet.data().ptr, .len = packet.size }; + + const new_sock_addr: *network.EndPoint.SockAddr = + &msgs_slice.items(.sock_addr)[new_msg_idx]; + new_sock_addr.* = toSocketAddress(packet.addr); + + const sock_addr: *std.posix.sockaddr, const sock_size: u32 = // + switch (new_sock_addr.*) { + inline else => |*sock| .{ @ptrCast(sock), @sizeOf(@TypeOf(sock.*)) }, + }; + + const new_hdr: *std.os.linux.mmsghdr_const = + &msgs_slice.items(.hdr)[new_msg_idx]; + new_hdr.* = .{ + .hdr = .{ + .name = sock_addr, + .namelen = sock_size, + .iov = new_io_vec[0..1], + .iovlen = 1, + .control = null, + .controllen = 0, + .flags = 0, + }, + .len = 0, + }; + } + + std.debug.assert(msgs.len == packets.items.len); + std.debug.assert(msgs.len <= PACKETS_PER_BATCH); + + // send off packet batch + const messages_sent = sendmmsg( + st.socket.internal, + msgs.items(.hdr), + 0, + ) catch |e| blk: { + st.logger.err().logf("sendmmsg error: {s}", .{@errorName(e)}); + break :blk msgs.len; // skip all packets in this batch + }; + + std.mem.copyBackwards( + Packet, + packets.items[0 .. packets.items.len - messages_sent], + packets.items[messages_sent..packets.items.len], + ); + packets.items.len -= messages_sent; + } + } + } }; // TODO: Evaluate when XevThread socket backend is beneficial. @@ -491,6 +602,71 @@ pub const SocketThread = struct { } }; +fn toSocketAddress(self: network.EndPoint) network.EndPoint.SockAddr { + return switch (self.address) { + .ipv4 => |addr| network.EndPoint.SockAddr{ + .ipv4 = .{ + .family = std.posix.AF.INET, + .port = std.mem.nativeToBig(u16, self.port), + .addr = @bitCast(addr.value), + .zero = [_]u8{0} ** 8, + }, + }, + .ipv6 => |addr| network.EndPoint.SockAddr{ + .ipv6 = .{ + .family = std.posix.AF.INET6, + .port = std.mem.nativeToBig(u16, self.port), + .flowinfo = 0, + .addr = addr.value, + .scope_id = addr.scope_id, + }, + }, + }; +} + +/// std.posix.sendmsg ported over to use linux's sendmmsg instead +fn sendmmsg( + /// The file descriptor of the sending socket. + sockfd: std.posix.socket_t, + /// Message header and iovecs + msgvec: []std.os.linux.mmsghdr_const, + flags: u32, +) std.posix.SendMsgError!usize { + while (true) { + const rc = std.os.linux.sendmmsg(sockfd, msgvec.ptr, @intCast(msgvec.len), flags); + + return switch (std.posix.errno(rc)) { + .SUCCESS => @intCast(rc), + .ACCES => error.AccessDenied, + .AGAIN => error.WouldBlock, + .ALREADY => error.FastOpenAlreadyInProgress, + .BADF => unreachable, // always a race condition + .CONNRESET => error.ConnectionResetByPeer, + .DESTADDRREQ => unreachable, // The socket is not connection-mode, and no peer address is set. + .FAULT => unreachable, // An invalid user space address was specified for an argument. + .INTR => continue, + .INVAL => unreachable, // Invalid argument passed. + .ISCONN => unreachable, // connection-mode socket was connected already but a recipient was specified + .MSGSIZE => error.MessageTooBig, + .NOBUFS => error.SystemResources, + .NOMEM => error.SystemResources, + .NOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket. + .OPNOTSUPP => unreachable, // Some bit in the flags argument is inappropriate for the socket type. + .PIPE => error.BrokenPipe, + .AFNOSUPPORT => error.AddressFamilyNotSupported, + .LOOP => error.SymLinkLoop, + .NAMETOOLONG => error.NameTooLong, + .NOENT => error.FileNotFound, + .NOTDIR => error.NotDir, + .HOSTUNREACH => error.NetworkUnreachable, + .NETUNREACH => error.NetworkUnreachable, + .NOTCONN => error.SocketNotConnected, + .NETDOWN => error.NetworkSubsystemFailed, + else => |err| std.posix.unexpectedErrno(err), + }; + } +} + test "SocketThread: overload sendto" { const allocator = std.testing.allocator; diff --git a/src/replay/exec_async.zig b/src/replay/exec_async.zig index 2e19075f55..6ad735ec9a 100644 --- a/src/replay/exec_async.zig +++ b/src/replay/exec_async.zig @@ -164,7 +164,7 @@ pub const ReplaySlotFuture = struct { switch (poll_result) { .done => |val| return val, // TODO: consider futex-based wait like ResetEvent - .pending => std.time.sleep(100 * std.time.ns_per_ms), + .pending => std.time.sleep(0), } } } diff --git a/src/shred_network/repair_service.zig b/src/shred_network/repair_service.zig index ce6522d7b5..3ea85a2a5f 100644 --- a/src/shred_network/repair_service.zig +++ b/src/shred_network/repair_service.zig @@ -274,7 +274,9 @@ pub const RepairService = struct { /// Allows the number of repair requester threads to scale up when a lot of /// repairs are necessary, and scale down during normal operation. fn numRequesterThreads(num_requests: usize) usize { - const target_requests_per_thread = 100; + // serialising a repair request takes approx 50us + const target_requests_per_thread = 20000; + const target_threads = num_requests / target_requests_per_thread; return @max(1, @min(target_threads, maxRequesterThreads())); } @@ -282,8 +284,12 @@ fn numRequesterThreads(num_requests: usize) usize { /// Sets the maximum number of repair threads to either 16 or half the cpu /// count, whatever is less. fn maxRequesterThreads() u32 { - const cpu_count = std.Thread.getCpuCount() catch 1; - return @min(16, cpu_count / 2); + // Fast enough for sending ~60K requests/second in testing. We can tweak this later, but this + // will likely require either a) making our packet sender faster, or b) adding backpressure to + // our channels. Currently too many threads will overwhelm our request sender thread, and cause + // its channel to grow indefinitely. + + return 4; } /// Sleeps an appropriate duration after sending some repair requests. @@ -361,6 +367,7 @@ pub const RepairRequester = struct { exit: *Atomic(bool), ) !Self { const channel = try Channel(Packet).create(allocator); + channel.name = "repair requester channel(Packet)"; errdefer channel.destroy(); const thread = try SocketThread.spawnSender( @@ -371,6 +378,8 @@ pub const RepairRequester = struct { .{ .unordered = exit }, ); + thread.handle.setName("repreq spwnSndr") catch {}; + return .{ .allocator = allocator, .logger = .from(logger), diff --git a/src/shred_network/service.zig b/src/shred_network/service.zig index bc60f761df..3ecdf308d3 100644 --- a/src/shred_network/service.zig +++ b/src/shred_network/service.zig @@ -87,10 +87,15 @@ pub fn start( // channels (cant use arena as they need to alloc/free frequently & potentially from multiple sender threads) const unverified_shred_channel = try Channel(Packet).create(deps.allocator); + unverified_shred_channel.name = "unverified shred channel (Packet)"; try defers.deferCall(Channel(Packet).destroy, .{unverified_shred_channel}); + const shreds_to_insert_channel = try Channel(Packet).create(deps.allocator); + shreds_to_insert_channel.name = "shreds to insert channel (Packet)"; try defers.deferCall(Channel(Packet).destroy, .{shreds_to_insert_channel}); + const retransmit_channel = try Channel(Packet).create(deps.allocator); + retransmit_channel.name = "retransmit channel (Packet)"; try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel}); // receiver (threads) diff --git a/src/shred_network/shred_processor.zig b/src/shred_network/shred_processor.zig index ed2efe3c6a..dc5904bb03 100644 --- a/src/shred_network/shred_processor.zig +++ b/src/shred_network/shred_processor.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const tracy = @import("tracy"); const sig = @import("../sig.zig"); const shred_network = @import("lib.zig"); @@ -82,6 +83,9 @@ fn runShredProcessorOnceOver( metrics: Metrics, params: Params, ) !void { + const zone = tracy.Zone.init(@src(), .{ .name = "runShredProcessorOnceOver" }); + defer zone.deinit(); + for (shreds_buffer.items(.shred)) |shred| shred.deinit(); shreds_buffer.clearRetainingCapacity(); std.debug.assert(shreds_buffer.capacity >= MAX_SHREDS_PER_ITER); diff --git a/src/shred_network/shred_receiver.zig b/src/shred_network/shred_receiver.zig index 080e45a12c..21c82f1238 100644 --- a/src/shred_network/shred_receiver.zig +++ b/src/shred_network/shred_receiver.zig @@ -1,5 +1,6 @@ const std = @import("std"); const network = @import("zig-network"); +const tracy = @import("tracy"); const sig = @import("../sig.zig"); const shred_network = @import("lib.zig"); @@ -51,6 +52,7 @@ pub const ShredReceiver = struct { // Cretae pipe from response_sender -> repair_socket const response_sender = try Channel(Packet).create(self.allocator); + response_sender.name = "response sender channel (Packet)"; defer response_sender.destroy(); const response_sender_thread = try SocketThread.spawnSender( @@ -90,8 +92,18 @@ pub const ShredReceiver = struct { exit: ExitCondition, comptime is_repair: bool, ) !void { + const zone = tracy.Zone.init(@src(), .{ .name = std.fmt.comptimePrint( + "runPacketHandler ({s})", + .{if (is_repair) "repair" else "turbine"}, + ) }); + defer zone.deinit(); + // Setup a channel. const receiver = try Channel(Packet).create(self.allocator); + receiver.name = std.fmt.comptimePrint( + "{s} receiver channel (Packet)", + .{if (is_repair) "repair" else "turbine"}, + ); defer receiver.destroy(); // Receive from the socket into the channel. @@ -111,6 +123,15 @@ pub const ShredReceiver = struct { while (receiver.tryReceive()) |packet| { self.metrics.incReceived(is_repair); packet_count += 1; + tracy.plot( + u32, + std.fmt.comptimePrint( + "packets received ({s})", + .{if (is_repair) "repair" else "turbine"}, + ), + @intCast(packet_count), + ); + try self.handlePacket(packet, response_sender, is_repair); } self.metrics.observeBatchSize(is_repair, packet_count); diff --git a/src/sync/channel.zig b/src/sync/channel.zig index 8131fadb0f..655af8c9b8 100644 --- a/src/sync/channel.zig +++ b/src/sync/channel.zig @@ -16,6 +16,7 @@ pub fn Channel(T: type) type { allocator: Allocator, event: std.Thread.ResetEvent = .{}, send_hook: ?*SendHook = null, + name: [:0]const u8 = std.fmt.comptimePrint("channel ({s})", .{@typeName(T)}), pub const SendHook = struct { /// Called after the channel has pushed the value. @@ -151,6 +152,8 @@ pub fn Channel(T: type) type { const zone = tracy.Zone.init(@src(), .{ .name = "Channel.send" }); defer zone.deinit(); + defer tracy.plot(u32, channel.name, @intCast(channel.len())); + if (channel.closed.load(.monotonic)) { return error.ChannelClosed; } diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index 83f2a00053..24be101fd9 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -62,6 +62,7 @@ pub const Service = struct { exit: *AtomicBool, ) !Service { const send_channel = try Channel(Packet).create(allocator); + send_channel.name = "transaction sender channel(Packet)"; errdefer send_channel.destroy(); return .{ diff --git a/src/utils/lru.zig b/src/utils/lru.zig index ba9505009d..6f992f4e54 100644 --- a/src/utils/lru.zig +++ b/src/utils/lru.zig @@ -101,6 +101,7 @@ pub fn LruCacheCustom( .mux = if (kind == .locking) Mutex{} else undefined, .deinit_context = deinit_context, }; + errdefer self.hashmap.deinit(); // pre allocate enough capacity for max items since we will use // assumed capacity and non-clobber methods