diff --git a/src/gossip/service.zig b/src/gossip/service.zig index c8ac4c7ac..5827ef012 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -404,6 +404,7 @@ pub const GossipService = struct { self.gossip_socket, self.packet_incoming_channel, exit_condition, + .empty, ); exit_condition.ordered.exit_index += 1; @@ -435,6 +436,7 @@ pub const GossipService = struct { self.gossip_socket, self.packet_outgoing_channel, exit_condition, + .empty, ); exit_condition.ordered.exit_index += 1; diff --git a/src/net/packet.zig b/src/net/packet.zig index a2009cb9f..f96561ec4 100644 --- a/src/net/packet.zig +++ b/src/net/packet.zig @@ -8,7 +8,9 @@ pub const Packet = struct { buffer: [DATA_SIZE]u8, size: usize, addr: network.EndPoint, - flags: BitFlags(Flag), + flags: Flags, + + pub const Flags = BitFlags(Flag); /// Maximum over-the-wire size of a Transaction /// 1280 is IPv6 minimum MTU diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 97a7ee47d..e50570553 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -384,6 +384,7 @@ const PerThread = struct { if (bytes_read == 0) return error.SocketClosed; packet.addr = recv_meta.sender; packet.size = bytes_read; + packet.flags = st.flags; try st.channel.send(packet); } } @@ -437,6 +438,7 @@ pub const SocketThread = struct { exit: ExitCondition, direction: Direction, handle: SocketBackend.Handle, + flags: Packet.Flags, const Direction = enum { sender, receiver }; @@ -446,8 +448,9 @@ pub const SocketThread = struct { socket: UdpSocket, outgoing_channel: *Channel(Packet), exit: ExitCondition, + flags: Packet.Flags, ) !*SocketThread { - return spawn(allocator, logger, socket, outgoing_channel, exit, .sender); + return spawn(allocator, logger, socket, outgoing_channel, exit, .sender, flags); } pub fn spawnReceiver( @@ -456,8 +459,9 @@ pub const SocketThread = struct { socket: UdpSocket, incoming_channel: *Channel(Packet), exit: ExitCondition, + flags: Packet.Flags, ) !*SocketThread { - return spawn(allocator, logger, socket, incoming_channel, exit, .receiver); + return spawn(allocator, logger, socket, incoming_channel, exit, .receiver, flags); } fn spawn( @@ -467,6 +471,7 @@ pub const SocketThread = struct { channel: *Channel(Packet), exit: ExitCondition, direction: Direction, + flags: Packet.Flags, ) !*SocketThread { const self = try allocator.create(SocketThread); errdefer allocator.destroy(self); @@ -479,6 +484,7 @@ pub const SocketThread = struct { .exit = exit, .direction = direction, .handle = undefined, + .flags = flags, }; try SocketBackend.spawn(self); @@ -507,6 +513,7 @@ test "SocketThread: overload sendto" { socket, &send_channel, .{ .unordered = &exit }, + .empty, ); defer st.join(); defer exit.store(true, .release); @@ -561,6 +568,7 @@ pub const BenchmarkPacketProcessing = struct { socket, &incoming_channel, exit_condition, + .empty, ); defer incoming_pipe.join(); @@ -602,6 +610,7 @@ pub const BenchmarkPacketProcessing = struct { socket, &outgoing_channel, exit_condition, + .empty, ); defer outgoing_pipe.join(); diff --git a/src/shred_network/lib.zig b/src/shred_network/lib.zig index 75ed15007..29afbabe5 100644 --- a/src/shred_network/lib.zig +++ b/src/shred_network/lib.zig @@ -2,7 +2,6 @@ pub const repair_message = @import("repair_message.zig"); pub const repair_service = @import("repair_service.zig"); pub const service = @import("service.zig"); pub const shred_deduper = @import("shred_deduper.zig"); -pub const shred_processor = @import("shred_processor.zig"); pub const shred_receiver = @import("shred_receiver.zig"); pub const shred_retransmitter = @import("shred_retransmitter.zig"); pub const shred_tracker = @import("shred_tracker.zig"); diff --git a/src/shred_network/repair_service.zig b/src/shred_network/repair_service.zig index f151dc6da..280bb0598 100644 --- a/src/shred_network/repair_service.zig +++ b/src/shred_network/repair_service.zig @@ -369,6 +369,7 @@ pub const RepairRequester = struct { udp_send_socket, channel, .{ .unordered = exit }, + .empty, ); return .{ diff --git a/src/shred_network/service.zig b/src/shred_network/service.zig index bc60f761d..baae81bf2 100644 --- a/src/shred_network/service.zig +++ b/src/shred_network/service.zig @@ -26,7 +26,6 @@ const RepairPeerProvider = shred_network.repair_service.RepairPeerProvider; const RepairRequester = shred_network.repair_service.RepairRequester; const RepairService = shred_network.repair_service.RepairService; const ShredReceiver = shred_network.shred_receiver.ShredReceiver; -const ShredReceiverMetrics = shred_network.shred_receiver.ShredReceiverMetrics; /// Settings which instruct the Shred Network how to behave. pub const ShredNetworkConfig = struct { @@ -85,44 +84,6 @@ pub fn start( const repair_socket = try bindUdpReusable(conf.repair_port); const turbine_socket = try bindUdpReusable(conf.turbine_recv_port); - // channels (cant use arena as they need to alloc/free frequently & potentially from multiple sender threads) - const unverified_shred_channel = try Channel(Packet).create(deps.allocator); - try defers.deferCall(Channel(Packet).destroy, .{unverified_shred_channel}); - const shreds_to_insert_channel = try Channel(Packet).create(deps.allocator); - try defers.deferCall(Channel(Packet).destroy, .{shreds_to_insert_channel}); - const retransmit_channel = try Channel(Packet).create(deps.allocator); - try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel}); - - // receiver (threads) - const shred_receiver = try arena.create(ShredReceiver); - shred_receiver.* = .{ - .allocator = deps.allocator, - .keypair = deps.my_keypair, - .exit = deps.exit, - .logger = .from(deps.logger), - .repair_socket = repair_socket, - .turbine_socket = turbine_socket, - .unverified_shred_sender = unverified_shred_channel, - .shred_version = deps.my_shred_version, - .metrics = try deps.registry.initStruct(ShredReceiverMetrics), - .root_slot = conf.root_slot, - }; - try service_manager.spawn("Shred Receiver", ShredReceiver.run, .{shred_receiver}); - - // verifier (thread) - try service_manager.spawn( - "Shred Verifier", - shred_network.shred_verifier.runShredVerifier, - .{ - deps.exit, - deps.registry, - unverified_shred_channel, - shreds_to_insert_channel, - if (conf.retransmit) retransmit_channel else null, - deps.epoch_context_mgr.slotLeaders(), - }, - ); - // tracker (shared state, internal to Shred Network) const shred_tracker = try arena.create(BasicShredTracker); shred_tracker.* = try BasicShredTracker.init( @@ -133,32 +94,38 @@ pub fn start( ); try defers.deferCall(BasicShredTracker.deinit, .{shred_tracker}); - const shred_inserter = try arena.create(sig.ledger.ShredInserter); - shred_inserter.* = try sig.ledger.ShredInserter.init( + var shred_inserter = try sig.ledger.ShredInserter.init( deps.allocator, .from(deps.logger), deps.registry, deps.ledger_db, ); - try defers.deferCall(sig.ledger.ShredInserter.deinit, .{shred_inserter}); + errdefer shred_inserter.deinit(); - // processor (thread) - const processor = shred_network.shred_processor; + // channels (cant use arena as they need to alloc/free frequently & + // potentially from multiple sender threads) + const retransmit_channel = try Channel(Packet).create(deps.allocator); + try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel}); + + // receiver (threads) + const shred_receiver = try arena.create(ShredReceiver); + shred_receiver.* = try .init(deps.allocator, .from(deps.logger), deps.registry, .{ + .keypair = deps.my_keypair, + .exit = deps.exit, + .repair_socket = repair_socket, + .turbine_socket = turbine_socket, + .shred_version = deps.my_shred_version, + .root_slot = conf.root_slot, + .maybe_retransmit_shred_sender = if (conf.retransmit) retransmit_channel else null, + .leader_schedule = deps.epoch_context_mgr.slotLeaders(), + .tracker = shred_tracker, + .inserter = shred_inserter, + }); + try defers.deferCall(ShredReceiver.deinit, .{ shred_receiver, deps.allocator }); try service_manager.spawn( - "Shred Processor", - processor.runShredProcessor, - .{ - deps.allocator, - processor.Logger.from(deps.logger), - deps.registry, - deps.exit, - processor.Params{ - .verified_shred_receiver = shreds_to_insert_channel, - .tracker = shred_tracker, - .inserter = shred_inserter, - .leader_schedule = deps.epoch_context_mgr.slotLeaders(), - }, - }, + "Shred Receiver", + ShredReceiver.run, + .{ shred_receiver, deps.allocator }, ); // retransmitter (thread) diff --git a/src/shred_network/shred_processor.zig b/src/shred_network/shred_processor.zig deleted file mode 100644 index ed2efe3c6..000000000 --- a/src/shred_network/shred_processor.zig +++ /dev/null @@ -1,183 +0,0 @@ -const std = @import("std"); -const sig = @import("../sig.zig"); -const shred_network = @import("lib.zig"); - -const layout = sig.ledger.shred.layout; - -const Allocator = std.mem.Allocator; -const Atomic = std.atomic.Value; - -const BasicShredTracker = shred_network.shred_tracker.BasicShredTracker; -const Channel = sig.sync.Channel; -const Counter = sig.prometheus.Counter; -const Histogram = sig.prometheus.Histogram; -const Packet = sig.net.Packet; -const Registry = sig.prometheus.Registry; -const Shred = sig.ledger.shred.Shred; -const ShredInserter = sig.ledger.ShredInserter; -const SlotOutOfBounds = shred_network.shred_tracker.SlotOutOfBounds; -const VariantCounter = sig.prometheus.VariantCounter; - -// The identifier for the scoped logger used in this file. -pub const Logger = sig.trace.Logger("shred_processor"); - -pub const Params = struct { - /// shred verifier --> me - verified_shred_receiver: *Channel(Packet), - tracker: *BasicShredTracker, - inserter: *ShredInserter, - leader_schedule: sig.core.leader_schedule.SlotLeaders, -}; - -/// Analogous to [WindowService](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/window_service.rs#L395) -pub fn runShredProcessor( - allocator: Allocator, - logger: Logger, - registry: *Registry(.{}), - exit: *Atomic(bool), - params: Params, -) !void { - const metrics = try registry.initStruct(Metrics); - - var shreds_buffer: ShredsBuffer = .empty; - defer shreds_buffer.deinit(allocator); - defer for (shreds_buffer.items(.shred)) |shred| shred.deinit(); - try shreds_buffer.ensureTotalCapacity(allocator, MAX_SHREDS_PER_ITER); - - while (true) { - params.verified_shred_receiver.waitToReceive(.{ .unordered = exit }) catch break; - try runShredProcessorOnceOver( - allocator, - logger, - &shreds_buffer, - metrics, - params, - ); - } -} - -const Metrics = struct { - passed_to_inserter_count: *Counter, - skipped_slot_count: *Counter, - insertion_batch_size: *Histogram, - register_shred_error: *VariantCounter(SlotOutOfBounds), - skip_slots_error: *VariantCounter(SlotOutOfBounds), - set_last_shred_error: *VariantCounter(SlotOutOfBounds), - - pub const prefix = "shred_processor"; - pub const histogram_buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); -}; - -const ShredsBuffer = std.MultiArrayList(struct { - shred: Shred, - is_required: bool, -}); - -const MAX_SHREDS_PER_ITER = 1024; - -fn runShredProcessorOnceOver( - allocator: Allocator, - logger: Logger, - shreds_buffer: *ShredsBuffer, - metrics: Metrics, - params: Params, -) !void { - for (shreds_buffer.items(.shred)) |shred| shred.deinit(); - shreds_buffer.clearRetainingCapacity(); - std.debug.assert(shreds_buffer.capacity >= MAX_SHREDS_PER_ITER); - - while (params.verified_shred_receiver.tryReceive()) |packet| { - const shred_payload = layout.getShred(&packet) orelse return error.InvalidVerifiedShred; - const shred = Shred.fromPayload(allocator, shred_payload) catch |e| { - logger.err().logf( - "failed to process verified shred {?}.{?}: {}", - .{ layout.getSlot(shred_payload), layout.getIndex(shred_payload), e }, - ); - continue; - }; - shreds_buffer.appendAssumeCapacity(.{ - .shred = shred, - .is_required = packet.flags.isSet(.repair), - }); - if (shreds_buffer.len == MAX_SHREDS_PER_ITER) break; - } - - metrics.insertion_batch_size.observe(shreds_buffer.len); - metrics.passed_to_inserter_count.add(shreds_buffer.len); - const result = try params.inserter.insertShreds( - shreds_buffer.items(.shred), - shreds_buffer.items(.is_required), - .{ - .slot_leaders = params.leader_schedule, - .shred_tracker = params.tracker, - }, - ); - result.deinit(); -} - -test runShredProcessorOnceOver { - const allocator = std.testing.allocator; - - var ledger_db = try sig.ledger.tests.TestDB.init(@src()); - defer ledger_db.deinit(); - - var registry: Registry(.{}) = .init(allocator); - defer registry.deinit(); - - const verified_shred_channel: *Channel(Packet) = try .create(allocator); - defer verified_shred_channel.destroy(); - - var shred_tracker: BasicShredTracker = try .init(allocator, 0, .noop, ®istry); - defer shred_tracker.deinit(); - - var shred_inserter: ShredInserter = try .init(allocator, .noop, ®istry, ledger_db); - defer shred_inserter.deinit(); - - const dummy_leader_schedule: sig.core.leader_schedule.SlotLeaders = .{ - .state = undefined, - .getFn = struct { - fn getSlotLeader(_: *anyopaque, _: sig.core.Slot) ?sig.core.Pubkey { - return null; - } - }.getSlotLeader, - }; - std.debug.assert(dummy_leader_schedule.get(0) == null); - - const params: Params = .{ - .verified_shred_receiver = verified_shred_channel, - .tracker = &shred_tracker, - .inserter = &shred_inserter, - .leader_schedule = dummy_leader_schedule, - }; - - var shreds_buffer: ShredsBuffer = .empty; - defer shreds_buffer.deinit(allocator); - defer for (shreds_buffer.items(.shred)) |shred| shred.deinit(); - try shreds_buffer.ensureTotalCapacity(allocator, MAX_SHREDS_PER_ITER); - - const metrics = try registry.initStruct(Metrics); - - try std.testing.expectEqual( - {}, - runShredProcessorOnceOver(allocator, .noop, &shreds_buffer, metrics, params), - ); - - try verified_shred_channel.send(.ANY_EMPTY); - try std.testing.expectEqual( - {}, - runShredProcessorOnceOver(allocator, .noop, &shreds_buffer, metrics, params), - ); - - for (0..MAX_SHREDS_PER_ITER * 3) |_| { - try verified_shred_channel.send(.ANY_EMPTY); - } - try std.testing.expectEqual( - {}, - runShredProcessorOnceOver(allocator, .noop, &shreds_buffer, metrics, params), - ); - - try std.testing.expectEqual( - {}, - runShredProcessorOnceOver(allocator, .noop, &shreds_buffer, metrics, params), - ); -} diff --git a/src/shred_network/shred_receiver.zig b/src/shred_network/shred_receiver.zig index 13926f25a..0182ffef2 100644 --- a/src/shred_network/shred_receiver.zig +++ b/src/shred_network/shred_receiver.zig @@ -5,12 +5,14 @@ const shred_network = @import("lib.zig"); const bincode = sig.bincode; const layout = sig.ledger.shred.layout; +const shred_verifier = shred_network.shred_verifier; const Allocator = std.mem.Allocator; const Atomic = std.atomic.Value; const KeyPair = sig.identity.KeyPair; const Socket = network.Socket; +const BasicShredTracker = shred_network.shred_tracker.BasicShredTracker; const Channel = sig.sync.Channel; const Counter = sig.prometheus.Counter; const Histogram = sig.prometheus.Histogram; @@ -18,136 +20,224 @@ const Packet = sig.net.Packet; const Ping = sig.gossip.Ping; const Pong = sig.gossip.Pong; const RepairMessage = shred_network.repair_message.RepairMessage; +const Shred = sig.ledger.shred.Shred; +const ShredInserter = sig.ledger.ShredInserter; const Slot = sig.core.Slot; +const SlotLeaders = sig.core.leader_schedule.SlotLeaders; const SocketThread = sig.net.SocketThread; const ExitCondition = sig.sync.ExitCondition; const VariantCounter = sig.prometheus.VariantCounter; const Logger = sig.trace.Logger("shred_receiver"); +const VerifiedMerkleRoots = sig.utils.lru.LruCache(.non_locking, sig.core.Hash, void); /// Analogous to [ShredFetchStage](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/shred_fetch_stage.rs#L34) pub const ShredReceiver = struct { - allocator: Allocator, - keypair: *const KeyPair, - exit: *Atomic(bool), + params: Params, logger: Logger, - repair_socket: Socket, - turbine_socket: Socket, - /// me --> shred verifier - unverified_shred_sender: *Channel(Packet), - shred_version: *const Atomic(u16), + + incoming_shreds: Channel(Packet), + outgoing_pongs: Channel(Packet), + metrics: ShredReceiverMetrics, - root_slot: Slot, // TODO: eventually, this should be handled by BankForks + verifier_metrics: shred_verifier.Metrics, + + verified_merkle_roots: VerifiedMerkleRoots, + shred_batch: std.MultiArrayList(struct { shred: Shred, is_repair: bool }), + + const Params = struct { + keypair: *const KeyPair, + exit: *Atomic(bool), + + repair_socket: Socket, + turbine_socket: Socket, + + /// me --> retransmit service + maybe_retransmit_shred_sender: ?*Channel(Packet), + + shred_version: *const Atomic(u16), + + root_slot: Slot, + leader_schedule: SlotLeaders, + + /// shared with repair + tracker: *BasicShredTracker, + inserter: ShredInserter, + }; + + pub fn init( + allocator: Allocator, + logger: Logger, + registry: *sig.prometheus.Registry(.{}), + params: Params, + ) !ShredReceiver { + var incoming_shreds = try Channel(Packet).init(allocator); + errdefer incoming_shreds.deinit(); + + var outgoing_pongs = try Channel(Packet).init(allocator); + errdefer outgoing_pongs.deinit(); + + var verified_merkle_roots = try VerifiedMerkleRoots.init(allocator, 1024); + errdefer verified_merkle_roots.deinit(); + + const metrics = try registry.initStruct(ShredReceiverMetrics); + const verifier_metrics = try registry.initStruct(shred_verifier.Metrics); + + return ShredReceiver{ + .params = params, + .logger = logger, + .incoming_shreds = incoming_shreds, + .outgoing_pongs = outgoing_pongs, + .metrics = metrics, + .verifier_metrics = verifier_metrics, + .verified_merkle_roots = verified_merkle_roots, + .shred_batch = .empty, + }; + } - const Self = @This(); + pub fn deinit(self: *ShredReceiver, allocator: Allocator) void { + self.incoming_shreds.deinit(); + self.outgoing_pongs.deinit(); + self.verified_merkle_roots.deinit(); + self.shred_batch.deinit(allocator); + } /// Run threads to listen/send over socket and handle all incoming packets. /// Returns when exit is set to true. - pub fn run(self: *Self) !void { + pub fn run(self: *ShredReceiver, allocator: Allocator) !void { defer self.logger.info().log("exiting shred receiver"); errdefer self.logger.err().log("error in shred receiver"); - const exit = ExitCondition{ .unordered = self.exit }; - - // Cretae pipe from response_sender -> repair_socket - const response_sender = try Channel(Packet).create(self.allocator); - defer response_sender.destroy(); + const exit = ExitCondition{ .unordered = self.params.exit }; + // Create pipe from outgoing_pongs -> repair_socket const response_sender_thread = try SocketThread.spawnSender( - self.allocator, + allocator, .from(self.logger), - self.repair_socket, - response_sender, + self.params.repair_socket, + &self.outgoing_pongs, exit, + .empty, ); defer response_sender_thread.join(); - // Run a packetHandler thread which pipes from repair_socket -> handlePacket. - const response_thread = try std.Thread.spawn(.{}, runPacketHandler, .{ - self, - response_sender, - self.repair_socket, - exit, - true, // is_repair - }); - defer response_thread.join(); - - // Run a packetHandler thread which pipes from turbine_socket -> handlePacket. - const turbine_thread = try std.Thread.spawn(.{}, runPacketHandler, .{ - self, - response_sender, - self.turbine_socket, + // Create pipe from repair_socket -> incoming_shreds tagged .repair + const repair_receiver_thread = try SocketThread.spawnReceiver( + allocator, + .from(self.logger), + self.params.repair_socket, + &self.incoming_shreds, exit, - false, // is_repair - }); - defer turbine_thread.join(); - } + .from(.repair), + ); + defer repair_receiver_thread.join(); - fn runPacketHandler( - self: *Self, - response_sender: *Channel(Packet), - receiver_socket: Socket, - exit: ExitCondition, - comptime is_repair: bool, - ) !void { - // Setup a channel. - const receiver = try Channel(Packet).create(self.allocator); - defer receiver.destroy(); - - // Receive from the socket into the channel. - const receiver_thread = try SocketThread.spawnReceiver( - self.allocator, + // Create pipe from turbine_socket -> incoming_shreds without a tagging. + const turbine_receiver_thread = try SocketThread.spawnReceiver( + allocator, .from(self.logger), - receiver_socket, - receiver, + self.params.turbine_socket, + &self.incoming_shreds, exit, + .empty, ); - defer receiver_thread.join(); + defer turbine_receiver_thread.join(); - // Handle packets from the channel. + // Handle all incoming shreds from the channel. while (true) { - receiver.waitToReceive(exit) catch break; - var packet_count: usize = 0; - while (receiver.tryReceive()) |packet| { - self.metrics.incReceived(is_repair); - packet_count += 1; - try self.handlePacket(packet, response_sender, is_repair); + self.incoming_shreds.waitToReceive(exit) catch break; + try self.handleBatch(allocator); + } + } + + fn handleBatch(self: *ShredReceiver, allocator: Allocator) !void { + defer { + for (self.shred_batch.items(.shred)) |shred| shred.deinit(); + self.shred_batch.clearRetainingCapacity(); + } + + while (self.incoming_shreds.tryReceive()) |packet| { + const is_repair = packet.flags.isSet(.repair); + self.metrics.incReceived(is_repair); + + if (try self.handlePacket(allocator, packet)) |shred| { + try self.shred_batch.append(allocator, .{ + .shred = shred, + .is_repair = is_repair, + }); + if (self.shred_batch.len == MAX_SHREDS_PER_ITER) break; } - self.metrics.observeBatchSize(is_repair, packet_count); } + + const result = try self.params.inserter.insertShreds( + self.shred_batch.items(.shred), + self.shred_batch.items(.is_repair), + .{ + .slot_leaders = self.params.leader_schedule, + .shred_tracker = self.params.tracker, + }, + ); + self.metrics.passed_to_inserter_count.add(self.shred_batch.len); + result.deinit(); + + self.metrics.batch_size.observe(self.shred_batch.len); } - /// Handle a single packet and return. - fn handlePacket( - self: Self, - packet: Packet, - response_sender: *Channel(Packet), - comptime is_repair: bool, - ) !void { + const MAX_SHREDS_PER_ITER = 1024; + + /// Handle a single packet and return a shred if it's a valid shred. + fn handlePacket(self: *ShredReceiver, allocator: Allocator, packet: Packet) !?Shred { if (packet.size == REPAIR_RESPONSE_SERIALIZED_PING_BYTES) { - if (try self.handlePing(&packet)) |pong_packet| { - try response_sender.send(pong_packet); + if (try handlePing( + allocator, + &packet, + self.metrics, + self.params.keypair, + )) |pong_packet| { + try self.outgoing_pongs.send(pong_packet); self.metrics.pong_sent_count.inc(); } + return null; } else { const max_slot = std.math.maxInt(Slot); // TODO agave uses BankForks for this - validateShred(&packet, self.root_slot, self.shred_version, max_slot) catch |err| { + validateShred( + &packet, + self.params.root_slot, + self.params.shred_version, + max_slot, + ) catch |err| { self.metrics.discard.observe(err); - return; + return null; }; - var our_packet = packet; - if (is_repair) our_packet.flags.set(.repair); self.metrics.satisfactory_shred_count.inc(); - try self.unverified_shred_sender.send(our_packet); - } - } - /// Handle a ping message and returns the repair message. - fn handlePing(self: *const Self, packet: *const Packet) !?Packet { - return handlePingInner(self.allocator, packet, self.metrics, self.keypair); + shred_verifier.verifyShred( + &packet, + self.params.leader_schedule, + &self.verified_merkle_roots, + self.verifier_metrics, + ) catch |err| { + self.verifier_metrics.fail.observe(err); + return null; + }; + self.verifier_metrics.verified_count.inc(); + + if (self.params.maybe_retransmit_shred_sender) |retransmit_shred_sender| { + try retransmit_shred_sender.send(packet); + } + + const shred_payload = layout.getShred(&packet) orelse return error.InvalidVerifiedShred; + return Shred.fromPayload(allocator, shred_payload) catch |err| { + self.logger.err().logf( + "failed to deserialize verified shred {?}.{?}: {}", + .{ layout.getSlot(shred_payload), layout.getIndex(shred_payload), err }, + ); + return null; + }; + } } - fn handlePingInner( + fn handlePing( allocator: std.mem.Allocator, packet: *const Packet, metrics: ShredReceiverMetrics, @@ -180,6 +270,88 @@ pub const ShredReceiver = struct { } }; +test "handleBatch/handlePacket" { + const allocator = std.testing.allocator; + const keypair = try sig.identity.KeyPair.generateDeterministic(.{1} ** 32); + const root_slot = 0; + const invalid_socket = Socket{ + .family = .ipv4, + .internal = -1, + .endpoint = null, + }; + + var registry = sig.prometheus.Registry(.{}).init(allocator); + defer registry.deinit(); + + var epoch_ctx = try sig.adapter.EpochContextManager.init(allocator, .DEFAULT); + defer epoch_ctx.deinit(); + + var ledger_db = try sig.ledger.tests.TestDB.init(@src()); + defer ledger_db.deinit(); + + var shred_tracker = try sig.shred_network.shred_tracker.BasicShredTracker.init( + allocator, + root_slot + 1, + .noop, + ®istry, + ); + defer shred_tracker.deinit(); + + var shred_inserter = try sig.ledger.ShredInserter.init( + allocator, + .noop, + ®istry, + ledger_db, + ); + defer shred_inserter.deinit(); + + var exit = Atomic(bool).init(false); + const shred_version = Atomic(u16).init(0); + + var shred_receiver = try ShredReceiver.init(allocator, .noop, ®istry, .{ + .keypair = &keypair, + .exit = &exit, + .repair_socket = invalid_socket, + .turbine_socket = invalid_socket, + .shred_version = &shred_version, + .root_slot = root_slot, + .maybe_retransmit_shred_sender = null, + .leader_schedule = epoch_ctx.slotLeaders(), + .tracker = &shred_tracker, + .inserter = shred_inserter, + }); + defer shred_receiver.deinit(allocator); + + // test repair packet + { + const ping = try Ping.init(.{1} ** 32, &keypair); + const addr = sig.net.SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 88); + var packet = try Packet.initFromBincode(addr, RepairPing{ .ping = ping }); + packet.flags = .from(.repair); + try shred_receiver.incoming_shreds.send(packet); + } + + // test shred packet + { + const shreds = try sig.ledger.tests.loadShredsFromFile( + allocator, + sig.TEST_DATA_DIR ++ "shreds/merkle_root_metas_coding_test_shreds_3_1228.bin", + ); + defer sig.ledger.tests.deinitShreds(allocator, shreds); + const shred_data = shreds[0].payload(); + + var packet: Packet = undefined; + @memcpy(packet.buffer[0..shred_data.len], shred_data); + packet.size = @intCast(shred_data.len); + packet.addr = .{ .address = .{ .ipv4 = .init(0, 0, 0, 0) }, .port = 0 }; + packet.flags = .{}; + + try shred_receiver.incoming_shreds.send(packet); + } + + try shred_receiver.handleBatch(allocator); +} + test "handlePing" { const allocator = std.testing.allocator; var metrics_registry = sig.prometheus.Registry(.{}).init(allocator); @@ -195,7 +367,7 @@ test "handlePing" { const input_ping_packet = try Packet.initFromBincode(addr, RepairPing{ .ping = ping }); const expected_pong_packet = try Packet.initFromBincode(addr, RepairMessage{ .pong = pong }); - const actual_pong_packet = try ShredReceiver.handlePingInner( + const actual_pong_packet = try ShredReceiver.handlePing( allocator, &input_ping_packet, shred_metrics, @@ -208,7 +380,7 @@ test "handlePing" { var evil_ping = ping; evil_ping.from = sig.core.Pubkey.fromPublicKey(&evil_keypair.public_key); const evil_ping_packet = try Packet.initFromBincode(addr, RepairPing{ .ping = evil_ping }); - try std.testing.expectEqual(null, try ShredReceiver.handlePingInner( + try std.testing.expectEqual(null, try ShredReceiver.handlePing( allocator, &evil_ping_packet, shred_metrics, @@ -277,12 +449,12 @@ pub const ShredReceiverMetrics = struct { turbine_received_count: *Counter, repair_received_count: *Counter, satisfactory_shred_count: *Counter, + passed_to_inserter_count: *Counter, valid_ping_count: *Counter, ping_deserialize_fail_count: *Counter, ping_verify_fail_count: *Counter, pong_sent_count: *Counter, - repair_batch_size: *Histogram, - turbine_batch_size: *Histogram, + batch_size: *Histogram, discard: *VariantCounter(ShredValidationError), pub const prefix = "shred_receiver"; @@ -296,18 +468,6 @@ pub const ShredReceiverMetrics = struct { self.turbine_received_count.inc(); } } - - pub fn observeBatchSize( - self: *const ShredReceiverMetrics, - is_repair: bool, - packet_count: usize, - ) void { - if (is_repair) { - self.repair_batch_size.observe(packet_count); - } else { - self.turbine_batch_size.observe(packet_count); - } - } }; /// Something about the shred was unexpected, so we will discard it. diff --git a/src/shred_network/shred_retransmitter.zig b/src/shred_network/shred_retransmitter.zig index 7809460e4..d9f96e245 100644 --- a/src/shred_network/shred_retransmitter.zig +++ b/src/shred_network/shred_retransmitter.zig @@ -122,6 +122,7 @@ pub fn runShredRetransmitter(params: ShredRetransmitterParams) !void { retransmit_socket, &retransmit_to_socket_channel, .{ .unordered = params.exit }, + .empty, ); defer sender_thread.join(); diff --git a/src/shred_network/shred_verifier.zig b/src/shred_network/shred_verifier.zig index 257085648..7e001bc70 100644 --- a/src/shred_network/shred_verifier.zig +++ b/src/shred_network/shred_verifier.zig @@ -1,67 +1,17 @@ -const std = @import("std"); const sig = @import("../sig.zig"); const shred_layout = sig.ledger.shred.layout; -const Atomic = std.atomic.Value; - -const Channel = sig.sync.Channel; const Counter = sig.prometheus.Counter; const Histogram = sig.prometheus.Histogram; const Packet = sig.net.Packet; -const Registry = sig.prometheus.Registry; const SlotLeaders = sig.core.leader_schedule.SlotLeaders; const VariantCounter = sig.prometheus.VariantCounter; const VerifiedMerkleRoots = sig.utils.lru.LruCache(.non_locking, sig.core.Hash, void); -/// Analogous to [run_shred_sigverify](https://github.com/anza-xyz/agave/blob/8c5a33a81a0504fd25d0465bed35d153ff84819f/turbine/src/sigverify_shreds.rs#L82) -pub fn runShredVerifier( - exit: *Atomic(bool), - registry: *Registry(.{}), - /// shred receiver --> me - unverified_shred_receiver: *Channel(Packet), - /// me --> shred processor - verified_shred_sender: *Channel(Packet), - /// me --> retransmit service - maybe_retransmit_shred_sender: ?*Channel(Packet), - leader_schedule: SlotLeaders, -) !void { - const metrics = try registry.initStruct(Metrics); - var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024); - while (true) { - unverified_shred_receiver.waitToReceive(.{ .unordered = exit }) catch break; - - var packet_count: usize = 0; - while (unverified_shred_receiver.tryReceive()) |packet| { - packet_count += 1; - metrics.received_count.inc(); - if (verifyShred(&packet, leader_schedule, &verified_merkle_roots, metrics)) |_| { - metrics.verified_count.inc(); - try verified_shred_sender.send(packet); - if (maybe_retransmit_shred_sender) |retransmit_shred_sender| { - try retransmit_shred_sender.send(packet); - } - } else |err| { - metrics.fail.observe(err); - } - } - metrics.batch_size.observe(packet_count); - } -} - -pub const ShredVerificationFailure = error{ - InsufficientShredSize, - SlotMissing, - SignatureMissing, - SignedDataMissing, - LeaderUnknown, - FailedVerification, - FailedCaching, -}; - /// Analogous to [verify_shred_cpu](https://github.com/anza-xyz/agave/blob/83e7d84bcc4cf438905d07279bc07e012a49afd9/ledger/src/sigverify_shreds.rs#L35) -fn verifyShred( +pub fn verifyShred( packet: *const Packet, leader_schedule: SlotLeaders, verified_merkle_roots: *VerifiedMerkleRoots, @@ -81,7 +31,17 @@ fn verifyShred( verified_merkle_roots.insert(signed_data, {}) catch return error.FailedCaching; } -const Metrics = struct { +pub const ShredVerificationFailure = error{ + InsufficientShredSize, + SlotMissing, + SignatureMissing, + SignedDataMissing, + LeaderUnknown, + FailedVerification, + FailedCaching, +}; + +pub const Metrics = struct { received_count: *Counter, verified_count: *Counter, cache_miss_count: *Counter, diff --git a/src/utils/bitflags.zig b/src/utils/bitflags.zig index 19ae07b6a..efc3d4136 100644 --- a/src/utils/bitflags.zig +++ b/src/utils/bitflags.zig @@ -6,6 +6,8 @@ pub fn BitFlags(comptime FlagEnum: type) type { pub const Flag = FlagEnum; + pub const empty = Self{ .state = 0 }; + pub fn from(flag: FlagEnum) Self { return .{ .state = @intFromEnum(flag) }; }