Skip to content
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
.hash = "secp256k1-0.5.0-2oLmGRAHAACjbp4BBLAoCU6Sotb_gqChbYOIIvBejN0V",
},
.tracy = .{
.url = "git+https://github.com/Syndica/tracy-zig#8fcc7521aa47e6dc8bf75de888f4ed4ed503c142",
.hash = "zig_tracy-0.12.2-4TLLRxpkAABhUEzSm2wXjioLd_-qruRviTnehDTbhZYR",
.url = "git+https://github.com/Syndica/tracy-zig#e4dfba6d9b2c6cbae9575c8f0df1794ca674514f",
.hash = "zig_tracy-0.12.2-4TLLRxFkAADqlI3jJRIfVpqKW99BKIjSi4aLKYGJnIYM",
},
},
}
6 changes: 3 additions & 3 deletions src/accountsdb/snapshot/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ pub const FullSnapshotFileInfo = struct {
}

const str_max_len = std.fmt.count("{d}", .{std.math.maxInt(Slot)});
const end_max = @max(filename.len, start + str_max_len + 1);
const end_max = @min(filename.len, start + str_max_len + 1);
const filename_trunc = filename[0..end_max];
const end = std.mem.indexOfScalarPos(u8, filename_trunc, start + 1, '-') orelse
return error.MissingSlotDelimiter;
Expand Down Expand Up @@ -1086,7 +1086,7 @@ pub const IncrementalSnapshotFileInfo = struct {
}

const str_max_len = std.fmt.count("{d}", .{std.math.maxInt(Slot)});
const end_max = @max(filename.len, start + str_max_len + 1);
const end_max = @min(filename.len, start + str_max_len + 1);
const filename_trunc = filename[0..end_max];
const end = std.mem.indexOfScalarPos(u8, filename_trunc, start + 1, '-') orelse
return error.MissingSlotDelimiter;
Expand All @@ -1107,7 +1107,7 @@ pub const IncrementalSnapshotFileInfo = struct {
}

const str_max_len = std.fmt.count("{d}", .{std.math.maxInt(Slot)});
const end_max = @max(filename.len, start + str_max_len + 1);
const end_max = @min(filename.len, start + str_max_len + 1);
const filename_trunc = filename[0..end_max];
const end = std.mem.indexOfScalarPos(u8, filename_trunc, start + 1, '-') orelse
return error.MissingSlotDelimiter;
Expand Down
3 changes: 3 additions & 0 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ pub const LegacyContactInfo = struct {
/// call ContactInfo.deinit to free
pub fn toContactInfo(self: *const LegacyContactInfo, allocator: std.mem.Allocator) !ContactInfo {
var ci = ContactInfo.init(allocator, self.id, self.wallclock, self.shred_version);
errdefer ci.deinit();

try ci.setSocket(.gossip, self.gossip);
try ci.setSocket(.turbine_recv, self.turbine_recv);
try ci.setSocket(.turbine_recv_quic, self.turbine_recv_quic);
Expand All @@ -567,6 +569,7 @@ pub const LegacyContactInfo = struct {
try ci.setSocket(.rpc, self.rpc);
try ci.setSocket(.rpc_pubsub, self.rpc_pubsub);
try ci.setSocket(.serve_repair, self.serve_repair);

return ci;
}

Expand Down
16 changes: 13 additions & 3 deletions src/gossip/ping_pong.zig
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,22 @@ pub const PingCache = struct {
cache_capacity: usize,
) error{OutOfMemory}!Self {
std.debug.assert(rate_limit_delay.asNanos() <= ttl.asNanos() / 2);

var pings = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity);
errdefer pings.deinit();

var pongs = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity);
errdefer pongs.deinit();

var pending_cache = try LruCache(.non_locking, Hash, PubkeyAndSocketAddr).init(allocator, cache_capacity);
errdefer pending_cache.deinit();

return Self{
.ttl = ttl,
.rate_limit_delay = rate_limit_delay,
.pings = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity),
.pongs = try LruCache(.non_locking, PubkeyAndSocketAddr, std.time.Instant).init(allocator, cache_capacity),
.pending_cache = try LruCache(.non_locking, Hash, PubkeyAndSocketAddr).init(allocator, cache_capacity),
.pings = pings,
.pongs = pongs,
.pending_cache = pending_cache,
.allocator = allocator,
};
}
Expand Down
46 changes: 42 additions & 4 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,15 @@ pub const GossipService = struct {
// setup channels for communication between threads
var packet_incoming_channel = try Channel(Packet).create(allocator);
errdefer packet_incoming_channel.destroy();
packet_incoming_channel.name = "gossip packet incoming channel";

var packet_outgoing_channel = try Channel(Packet).create(allocator);
errdefer packet_outgoing_channel.destroy();
packet_outgoing_channel.name = "gossip packet outgoing channel";

var verified_incoming_channel = try Channel(GossipMessageWithEndpoint).create(allocator);
errdefer verified_incoming_channel.destroy();
packet_outgoing_channel.name = "gossip verified incoming channel";

// setup the socket (bind with read-timeout)
const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified;
Expand All @@ -249,10 +252,13 @@ pub const GossipService = struct {
errdefer gossip_table.deinit();

// setup the active set for push messages
const active_set = ActiveSet.init(allocator);
var active_set = ActiveSet.init(allocator);
errdefer active_set.deinit();

// setup entrypoints
var entrypoints = ArrayList(Entrypoint).init(allocator);
errdefer entrypoints.deinit();

if (maybe_entrypoints) |entrypoint_addrs| {
try entrypoints.ensureTotalCapacityPrecise(entrypoint_addrs.len);
for (entrypoint_addrs) |entrypoint_addr| {
Expand All @@ -261,25 +267,32 @@ pub const GossipService = struct {
}

// setup ping/pong cache
const ping_cache = try PingCache.init(
var ping_cache = try PingCache.init(
allocator,
PING_CACHE_TTL,
PING_CACHE_RATE_LIMIT_DELAY,
PING_CACHE_CAPACITY,
);
errdefer ping_cache.deinit();

const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key);
const my_shred_version = my_contact_info.shred_version;
const failed_pull_hashes = HashTimeQueue.init(allocator);

var failed_pull_hashes = HashTimeQueue.init(allocator);
errdefer failed_pull_hashes.deinit();

const metrics = try GossipMetrics.init();

const exit_counter = try allocator.create(Atomic(u64));
errdefer allocator.destroy(exit_counter);
exit_counter.* = Atomic(u64).init(0);

const exit = try allocator.create(Atomic(bool));
errdefer allocator.destroy(exit);
exit.* = Atomic(bool).init(false);

const service_manager = ServiceManager.init(allocator, .from(logger), exit, "gossip", .{});
var service_manager = ServiceManager.init(allocator, .from(logger), exit, "gossip", .{});
errdefer service_manager.deinit();

return .{
.allocator = allocator,
Expand Down Expand Up @@ -3324,6 +3337,30 @@ test "init, exit, and deinit" {
}
}

test "leak checked gossip init" {
const testfn = struct {
fn f(allocator: std.mem.Allocator) !void {
var my_keypair = try KeyPair.generateDeterministic(@splat(1));
const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key);
const contact_info = try localhostTestContactInfo(my_pubkey);
errdefer contact_info.deinit();

var gossip_service = try GossipService.init(
allocator,
allocator,
contact_info,
my_keypair,
null,
.FOR_TESTS,
);
gossip_service.shutdown();
gossip_service.deinit();
}
}.f;

try std.testing.checkAllAllocationFailures(std.testing.allocator, testfn, .{});
}

const fuzz_service = sig.gossip.fuzz_service;

pub const BenchmarkGossipServiceGeneral = struct {
Expand Down Expand Up @@ -3576,6 +3613,7 @@ pub const BenchmarkGossipServicePullRequests = struct {
fn localhostTestContactInfo(id: Pubkey) !ContactInfo {
comptime std.debug.assert(@import("builtin").is_test); // should only be used for testin
var contact_info = try LegacyContactInfo.default(id).toContactInfo(std.testing.allocator);
errdefer contact_info.deinit();
try contact_info.setSocket(.gossip, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 0));
return contact_info;
}
Expand Down
Loading