Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ pub const GossipService = struct {
self.gossip_socket,
self.packet_incoming_channel,
exit_condition,
.empty,
);
exit_condition.ordered.exit_index += 1;

Expand Down Expand Up @@ -435,6 +436,7 @@ pub const GossipService = struct {
self.gossip_socket,
self.packet_outgoing_channel,
exit_condition,
.empty,
);
exit_condition.ordered.exit_index += 1;

Expand Down
4 changes: 3 additions & 1 deletion src/net/packet.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ pub const Packet = struct {
buffer: [DATA_SIZE]u8,
size: usize,
addr: network.EndPoint,
flags: BitFlags(Flag),
flags: Flags,

pub const Flags = BitFlags(Flag);

/// Maximum over-the-wire size of a Transaction
/// 1280 is IPv6 minimum MTU
Expand Down
13 changes: 11 additions & 2 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ const PerThread = struct {
if (bytes_read == 0) return error.SocketClosed;
packet.addr = recv_meta.sender;
packet.size = bytes_read;
packet.flags = st.flags;
try st.channel.send(packet);
}
}
Expand Down Expand Up @@ -437,6 +438,7 @@ pub const SocketThread = struct {
exit: ExitCondition,
direction: Direction,
handle: SocketBackend.Handle,
flags: Packet.Flags,

const Direction = enum { sender, receiver };

Expand All @@ -446,8 +448,9 @@ pub const SocketThread = struct {
socket: UdpSocket,
outgoing_channel: *Channel(Packet),
exit: ExitCondition,
flags: Packet.Flags,
) !*SocketThread {
return spawn(allocator, logger, socket, outgoing_channel, exit, .sender);
return spawn(allocator, logger, socket, outgoing_channel, exit, .sender, flags);
}

pub fn spawnReceiver(
Expand All @@ -456,8 +459,9 @@ pub const SocketThread = struct {
socket: UdpSocket,
incoming_channel: *Channel(Packet),
exit: ExitCondition,
flags: Packet.Flags,
) !*SocketThread {
return spawn(allocator, logger, socket, incoming_channel, exit, .receiver);
return spawn(allocator, logger, socket, incoming_channel, exit, .receiver, flags);
}

fn spawn(
Expand All @@ -467,6 +471,7 @@ pub const SocketThread = struct {
channel: *Channel(Packet),
exit: ExitCondition,
direction: Direction,
flags: Packet.Flags,
) !*SocketThread {
const self = try allocator.create(SocketThread);
errdefer allocator.destroy(self);
Expand All @@ -479,6 +484,7 @@ pub const SocketThread = struct {
.exit = exit,
.direction = direction,
.handle = undefined,
.flags = flags,
};

try SocketBackend.spawn(self);
Expand Down Expand Up @@ -507,6 +513,7 @@ test "SocketThread: overload sendto" {
socket,
&send_channel,
.{ .unordered = &exit },
.empty,
);
defer st.join();
defer exit.store(true, .release);
Expand Down Expand Up @@ -561,6 +568,7 @@ pub const BenchmarkPacketProcessing = struct {
socket,
&incoming_channel,
exit_condition,
.empty,
);
defer incoming_pipe.join();

Expand Down Expand Up @@ -602,6 +610,7 @@ pub const BenchmarkPacketProcessing = struct {
socket,
&outgoing_channel,
exit_condition,
.empty,
);
defer outgoing_pipe.join();

Expand Down
1 change: 0 additions & 1 deletion src/shred_network/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub const repair_message = @import("repair_message.zig");
pub const repair_service = @import("repair_service.zig");
pub const service = @import("service.zig");
pub const shred_deduper = @import("shred_deduper.zig");
pub const shred_processor = @import("shred_processor.zig");
pub const shred_receiver = @import("shred_receiver.zig");
pub const shred_retransmitter = @import("shred_retransmitter.zig");
pub const shred_tracker = @import("shred_tracker.zig");
Expand Down
1 change: 1 addition & 0 deletions src/shred_network/repair_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ pub const RepairRequester = struct {
udp_send_socket,
channel,
.{ .unordered = exit },
.empty,
);

return .{
Expand Down
83 changes: 25 additions & 58 deletions src/shred_network/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const RepairPeerProvider = shred_network.repair_service.RepairPeerProvider;
const RepairRequester = shred_network.repair_service.RepairRequester;
const RepairService = shred_network.repair_service.RepairService;
const ShredReceiver = shred_network.shred_receiver.ShredReceiver;
const ShredReceiverMetrics = shred_network.shred_receiver.ShredReceiverMetrics;

/// Settings which instruct the Shred Network how to behave.
pub const ShredNetworkConfig = struct {
Expand Down Expand Up @@ -85,44 +84,6 @@ pub fn start(
const repair_socket = try bindUdpReusable(conf.repair_port);
const turbine_socket = try bindUdpReusable(conf.turbine_recv_port);

// channels (cant use arena as they need to alloc/free frequently & potentially from multiple sender threads)
const unverified_shred_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{unverified_shred_channel});
const shreds_to_insert_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{shreds_to_insert_channel});
const retransmit_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel});

// receiver (threads)
const shred_receiver = try arena.create(ShredReceiver);
shred_receiver.* = .{
.allocator = deps.allocator,
.keypair = deps.my_keypair,
.exit = deps.exit,
.logger = .from(deps.logger),
.repair_socket = repair_socket,
.turbine_socket = turbine_socket,
.unverified_shred_sender = unverified_shred_channel,
.shred_version = deps.my_shred_version,
.metrics = try deps.registry.initStruct(ShredReceiverMetrics),
.root_slot = conf.root_slot,
};
try service_manager.spawn("Shred Receiver", ShredReceiver.run, .{shred_receiver});

// verifier (thread)
try service_manager.spawn(
"Shred Verifier",
shred_network.shred_verifier.runShredVerifier,
.{
deps.exit,
deps.registry,
unverified_shred_channel,
shreds_to_insert_channel,
if (conf.retransmit) retransmit_channel else null,
deps.epoch_context_mgr.slotLeaders(),
},
);

// tracker (shared state, internal to Shred Network)
const shred_tracker = try arena.create(BasicShredTracker);
shred_tracker.* = try BasicShredTracker.init(
Expand All @@ -133,32 +94,38 @@ pub fn start(
);
try defers.deferCall(BasicShredTracker.deinit, .{shred_tracker});

const shred_inserter = try arena.create(sig.ledger.ShredInserter);
shred_inserter.* = try sig.ledger.ShredInserter.init(
var shred_inserter = try sig.ledger.ShredInserter.init(
deps.allocator,
.from(deps.logger),
deps.registry,
deps.ledger_db,
);
try defers.deferCall(sig.ledger.ShredInserter.deinit, .{shred_inserter});
errdefer shred_inserter.deinit();

// processor (thread)
const processor = shred_network.shred_processor;
// channels (cant use arena as they need to alloc/free frequently &
// potentially from multiple sender threads)
const retransmit_channel = try Channel(Packet).create(deps.allocator);
try defers.deferCall(Channel(Packet).destroy, .{retransmit_channel});

// receiver (threads)
const shred_receiver = try arena.create(ShredReceiver);
shred_receiver.* = try .init(deps.allocator, .from(deps.logger), deps.registry, .{
.keypair = deps.my_keypair,
.exit = deps.exit,
.repair_socket = repair_socket,
.turbine_socket = turbine_socket,
.shred_version = deps.my_shred_version,
.root_slot = conf.root_slot,
.maybe_retransmit_shred_sender = if (conf.retransmit) retransmit_channel else null,
.leader_schedule = deps.epoch_context_mgr.slotLeaders(),
.tracker = shred_tracker,
.inserter = shred_inserter,
});
try defers.deferCall(ShredReceiver.deinit, .{ shred_receiver, deps.allocator });
try service_manager.spawn(
"Shred Processor",
processor.runShredProcessor,
.{
deps.allocator,
processor.Logger.from(deps.logger),
deps.registry,
deps.exit,
processor.Params{
.verified_shred_receiver = shreds_to_insert_channel,
.tracker = shred_tracker,
.inserter = shred_inserter,
.leader_schedule = deps.epoch_context_mgr.slotLeaders(),
},
},
"Shred Receiver",
ShredReceiver.run,
.{ shred_receiver, deps.allocator },
);

// retransmitter (thread)
Expand Down
Loading