Skip to content

Commit 2e2b3ee

Browse files
committed
Single threaded accountsdb manager & misc changes
1 parent 1b17081 commit 2e2b3ee

File tree

3 files changed

+96
-41
lines changed

3 files changed

+96
-41
lines changed

src/accountsdb/db.zig

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,8 +1650,9 @@ pub const AccountsDB = struct {
16501650
self.unrooted_accounts.readWithLock();
16511651
defer unrooted_accounts_lg.unlock();
16521652

1653-
const accounts = (unrooted_accounts.get(account_ref.slot) orelse
1654-
return error.SlotNotFound).items(.account);
1653+
const slots_and_accounts = unrooted_accounts.get(account_ref.slot) orelse
1654+
return error.SlotNotFound;
1655+
const accounts: []Account = slots_and_accounts.items(.account);
16551656
const account = accounts[ref_info.index];
16561657

16571658
return try account.cloneOwned(self.allocator);

src/accountsdb/fuzz.zig

Lines changed: 92 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ pub const TrackedAccount = struct {
4343

4444
pub const RunCmd = struct {
4545
max_slots: ?Slot,
46+
non_sequential_slots: bool,
47+
index_allocation: ?IndexAllocation,
48+
enable_manager: bool,
49+
50+
pub const IndexAllocation = enum { ram, disk };
4651

4752
pub const parser = cli.Parser(RunCmd, .{
4853
.help = .{
@@ -58,10 +63,36 @@ pub const RunCmd = struct {
5863
.config = {},
5964
.help = "The number of slots number which, when surpassed, will exit the fuzzer.",
6065
},
66+
.non_sequential_slots = .{
67+
.kind = .named,
68+
.name_override = null,
69+
.alias = .none,
70+
.default_value = false,
71+
.config = {},
72+
.help = "Enable non-sequential slots.",
73+
},
74+
.index_allocation = .{
75+
.kind = .named,
76+
.name_override = null,
77+
.alias = .none,
78+
.default_value = null,
79+
.config = {},
80+
.help = "Whether to use ram or disk for index allocation. Defaults to a random value based on the seed.",
81+
},
82+
.enable_manager = .{
83+
.kind = .named,
84+
.name_override = null,
85+
.alias = .none,
86+
.default_value = false,
87+
.config = {},
88+
.help = "Enable the accountsdb manager during fuzzer.",
89+
},
6190
},
6291
});
6392
};
6493

94+
const Logger = sig.trace.Logger("accountsdb.fuzz");
95+
6596
pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
6697
var prng_state: std.Random.DefaultPrng = .init(seed);
6798
const random = prng_state.random();
@@ -83,7 +114,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
83114
.max_buffer = 1 << 20,
84115
}, null);
85116
defer std_logger.deinit();
86-
const logger = std_logger.logger("accountsdb.fuzz");
117+
const logger: Logger = std_logger.logger("accountsdb.fuzz");
87118

88119
const run_cmd: RunCmd = cmd: {
89120
var argv_list: std.ArrayListUnmanaged([]const u8) = .empty;
@@ -102,6 +133,11 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
102133
};
103134

104135
const maybe_max_slots = run_cmd.max_slots;
136+
const non_sequential_slots = run_cmd.non_sequential_slots;
137+
const enable_manager = run_cmd.enable_manager;
138+
const index_allocation =
139+
run_cmd.index_allocation orelse
140+
random.enumValue(RunCmd.IndexAllocation);
105141

106142
var fuzz_data_dir = try std.fs.cwd().makeOpenPath(sig.FUZZ_DATA_DIR, .{});
107143
defer fuzz_data_dir.close();
@@ -126,18 +162,20 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
126162
};
127163
};
128164

129-
var last_full_snapshot_validated_slot: Slot = 0;
130-
var last_inc_snapshot_validated_slot: Slot = 0;
165+
logger.info().logf("enable manager: {}", .{enable_manager});
166+
logger.info().logf("index allocation: {s}", .{@tagName(index_allocation)});
167+
logger.info().logf("non-sequential slots: {}", .{non_sequential_slots});
131168

132-
const use_disk = random.boolean();
133-
logger.info().logf("use disk: {}", .{use_disk});
134169
var accounts_db: AccountsDB = try .init(.{
135170
.allocator = allocator,
136171
.logger = .from(logger),
137172
.snapshot_dir = main_accountsdb_dir,
138173
.geyser_writer = null,
139174
.gossip_view = null,
140-
.index_allocation = if (use_disk) .disk else .ram,
175+
.index_allocation = switch (index_allocation) {
176+
.ram => .ram,
177+
.disk => .disk,
178+
},
141179
.number_of_index_shards = sig.accounts_db.db.ACCOUNT_INDEX_SHARDS,
142180
});
143181
defer accounts_db.deinit();
@@ -146,18 +184,13 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
146184
try accounts_db.account_index.expandRefCapacity(1_000_000);
147185

148186
var manager_exit: std.atomic.Value(bool) = .init(false);
149-
const manager_handle: std.Thread = try .spawn(.{}, sig.accounts_db.manager.runLoop, .{
150-
&accounts_db, sig.accounts_db.manager.ManagerLoopConfig{
151-
.exit = &manager_exit,
152-
.slots_per_full_snapshot = 50_000,
153-
.slots_per_incremental_snapshot = 5_000,
154-
.zstd_nb_workers = @intCast(std.Thread.getCpuCount() catch 0),
155-
},
187+
var manager: sig.accounts_db.manager.Manager = try .init(allocator, &accounts_db, .{
188+
.exit = &manager_exit,
189+
.slots_per_full_snapshot = 50_000,
190+
.slots_per_incremental_snapshot = 5_000,
191+
.zstd_nb_workers = @intCast(std.Thread.getCpuCount() catch 0),
156192
});
157-
defer {
158-
manager_exit.store(true, .release);
159-
manager_handle.join();
160-
}
193+
defer manager.deinit(allocator);
161194

162195
var tracked_accounts_rw: sig.sync.RwMux(TrackedAccountsMap) = .init(.empty);
163196
defer {
@@ -185,32 +218,48 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
185218
// any validation (in the .get block of the main fuzzer
186219
// loop, we perform validation)
187220
threads.appendAssumeCapacity(try .spawn(.{}, readRandomAccounts, .{
221+
logger,
188222
&accounts_db,
189223
&tracked_accounts_rw,
190224
seed + thread_i,
191225
&reader_exit,
192226
thread_i,
193227
}));
194-
logger.debug().logf("started readRandomAccounts thread: {}", .{thread_i});
195228
}
196229

230+
var last_full_snapshot_validated_slot: Slot = 0;
231+
var last_inc_snapshot_validated_slot: Slot = 0;
197232
var largest_rooted_slot: Slot = 0;
198-
var slot: Slot = 0;
233+
var top_slot: Slot = 0;
199234

200235
var ancestors: sig.core.Ancestors = .EMPTY;
201236
defer ancestors.deinit(allocator);
202237

203238
// get/put a bunch of accounts
204239
while (true) {
205-
if (maybe_max_slots) |max_slots| if (slot >= max_slots) {
240+
if (maybe_max_slots) |max_slots| if (top_slot >= max_slots) {
206241
logger.info().logf("reached max slots: {}", .{max_slots});
207242
break;
208243
};
209-
defer switch (random.int(u2)) {
210-
0 => slot += random.intRangeAtMost(Slot, 1, 2),
211-
1, 2, 3 => {},
244+
const will_inc_slot = switch (random.int(u2)) {
245+
0 => true,
246+
1, 2, 3 => false,
247+
};
248+
defer if (will_inc_slot) {
249+
top_slot += random.intRangeAtMost(Slot, 1, 2);
250+
};
251+
try ancestors.addSlot(allocator, top_slot);
252+
253+
const current_slot = if (!non_sequential_slots) top_slot else slot: {
254+
const ancestor_slots: []const Slot = ancestors.ancestors.keys();
255+
std.debug.assert(ancestor_slots[ancestor_slots.len - 1] == top_slot);
256+
const ancestor_index = random.intRangeLessThan(
257+
usize,
258+
ancestor_slots.len -| 10,
259+
ancestor_slots.len,
260+
);
261+
break :slot ancestor_slots[ancestor_index];
212262
};
213-
try ancestors.addSlot(allocator, slot);
214263

215264
const action = random.enumValue(enum { put, get });
216265
switch (action) {
@@ -222,7 +271,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
222271
var pubkeys_this_slot: std.AutoHashMapUnmanaged(Pubkey, void) = .empty;
223272
defer pubkeys_this_slot.deinit(allocator);
224273
for (0..N_ACCOUNTS_PER_SLOT) |_| {
225-
var tracked_account = TrackedAccount.initRandom(random, slot);
274+
var tracked_account: TrackedAccount = .initRandom(random, current_slot);
226275

227276
const update_all_existing =
228277
tracked_accounts.count() > N_ACCOUNTS_MAX;
@@ -243,7 +292,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
243292

244293
const account_shared_data = try tracked_account.toAccount(allocator);
245294
defer account_shared_data.deinit(allocator);
246-
try accounts_db.putAccount(slot, pubkey, account_shared_data);
295+
try accounts_db.putAccount(current_slot, pubkey, account_shared_data);
247296

248297
// always overwrite the old slot
249298
try tracked_accounts.put(allocator, pubkey, tracked_account);
@@ -281,7 +330,10 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
281330

282331
const account =
283332
try accounts_db.getAccountWithAncestors(&pubkey, &ancestors_sub) orelse {
284-
logger.err().logf("accounts_db missing tracked account '{}': {}", .{ pubkey, tracked_account });
333+
logger.err().logf(
334+
"accounts_db missing tracked account '{}': {}",
335+
.{ pubkey, tracked_account },
336+
);
285337
return error.MissingAccount;
286338
};
287339
defer account.deinit(allocator);
@@ -297,13 +349,15 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
297349
},
298350
}
299351

300-
const create_new_root = random.boolean();
301-
if (create_new_root) {
302-
largest_rooted_slot = @min(slot, largest_rooted_slot + 2);
352+
const create_new_root =
353+
enable_manager and
354+
will_inc_slot and
355+
random.int(u8) == 0;
356+
if (create_new_root) snapshot_validation: {
357+
largest_rooted_slot = @min(top_slot, largest_rooted_slot + 2);
303358
accounts_db.largest_rooted_slot.store(largest_rooted_slot, .monotonic);
304-
}
359+
try manager.manage(allocator);
305360

306-
snapshot_validation: {
307361
// holding the lock here means that the snapshot archive(s) wont be deleted
308362
// since deletion requires a write lock
309363
const maybe_latest_snapshot_info, //
@@ -449,20 +503,21 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
449503
}
450504

451505
fn readRandomAccounts(
506+
logger: Logger,
452507
db: *AccountsDB,
453508
tracked_accounts_rw: *sig.sync.RwMux(TrackedAccountsMap),
454509
seed: u64,
455510
exit: *std.atomic.Value(bool),
456511
thread_id: usize,
457512
) void {
458-
var prng = std.Random.DefaultPrng.init(seed);
513+
logger.debug().logf("started readRandomAccounts thread: {}", .{thread_id});
514+
defer logger.debug().logf("finishing readRandomAccounts thread: {}", .{thread_id});
515+
516+
var prng: std.Random.DefaultPrng = .init(seed);
459517
const random = prng.random();
460518

461519
while (true) {
462-
if (exit.load(.seq_cst)) {
463-
std.debug.print("finishing readRandomAccounts thread: {}\n", .{thread_id});
464-
return;
465-
}
520+
if (exit.load(.seq_cst)) return;
466521

467522
var pubkeys: [50]Pubkey = undefined;
468523
{

src/accountsdb/manager.zig

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,7 @@ fn flushSlot(db: *AccountsDB, slot: Slot) !FileId {
301301
const unrooted_accounts, var unrooted_accounts_lg = db.unrooted_accounts.readWithLock();
302302
defer unrooted_accounts_lg.unlock();
303303

304-
const pubkeys_and_accounts = unrooted_accounts.get(slot) orelse
305-
return error.SlotNotFound;
304+
const pubkeys_and_accounts = unrooted_accounts.get(slot) orelse return error.SlotNotFound;
306305
break :blk pubkeys_and_accounts;
307306
};
308307

0 commit comments

Comments
 (0)