Skip to content

Commit eb8c1bf

Browse files
committed
Single threaded accountsdb manager & misc changes
1 parent 52470a2 commit eb8c1bf

File tree

3 files changed

+99
-41
lines changed

3 files changed

+99
-41
lines changed

src/accountsdb/db.zig

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,8 +1650,9 @@ pub const AccountsDB = struct {
16501650
self.unrooted_accounts.readWithLock();
16511651
defer unrooted_accounts_lg.unlock();
16521652

1653-
const accounts = (unrooted_accounts.get(account_ref.slot) orelse
1654-
return error.SlotNotFound).items(.account);
1653+
const slots_and_accounts = unrooted_accounts.get(account_ref.slot) orelse
1654+
return error.SlotNotFound;
1655+
const accounts: []Account = slots_and_accounts.items(.account);
16551656
const account = accounts[ref_info.index];
16561657

16571658
return try account.cloneOwned(self.allocator);

src/accountsdb/fuzz.zig

Lines changed: 95 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ pub const TrackedAccount = struct {
4343

4444
pub const RunCmd = struct {
4545
max_slots: ?Slot,
46+
non_sequential_slots: bool,
47+
index_allocation: ?IndexAllocation,
48+
enable_manager: bool,
49+
50+
pub const IndexAllocation = enum { ram, disk };
4651

4752
pub const parser = cli.Parser(RunCmd, .{
4853
.help = .{
@@ -58,10 +63,39 @@ pub const RunCmd = struct {
5863
.config = {},
5964
.help = "The number of slots number which, when surpassed, will exit the fuzzer.",
6065
},
66+
.non_sequential_slots = .{
67+
.kind = .named,
68+
.name_override = null,
69+
.alias = .none,
70+
.default_value = false,
71+
.config = {},
72+
.help = "Enable non-sequential slots.",
73+
},
74+
.index_allocation = .{
75+
.kind = .named,
76+
.name_override = null,
77+
.alias = .none,
78+
.default_value = null,
79+
.config = {},
80+
.help =
81+
\\Whether to use ram or disk for index allocation.
82+
\\Defaults to a random value based on the seed.
83+
,
84+
},
85+
.enable_manager = .{
86+
.kind = .named,
87+
.name_override = null,
88+
.alias = .none,
89+
.default_value = false,
90+
.config = {},
91+
.help = "Enable the accountsdb manager during fuzzer.",
92+
},
6193
},
6294
});
6395
};
6496

97+
const Logger = sig.trace.Logger("accountsdb.fuzz");
98+
6599
pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
66100
var prng_state: std.Random.DefaultPrng = .init(seed);
67101
const random = prng_state.random();
@@ -83,7 +117,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
83117
.max_buffer = 1 << 20,
84118
}, null);
85119
defer std_logger.deinit();
86-
const logger = std_logger.logger("accountsdb.fuzz");
120+
const logger: Logger = std_logger.logger("accountsdb.fuzz");
87121

88122
const run_cmd: RunCmd = cmd: {
89123
var argv_list: std.ArrayListUnmanaged([]const u8) = .empty;
@@ -102,6 +136,11 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
102136
};
103137

104138
const maybe_max_slots = run_cmd.max_slots;
139+
const non_sequential_slots = run_cmd.non_sequential_slots;
140+
const enable_manager = run_cmd.enable_manager;
141+
const index_allocation =
142+
run_cmd.index_allocation orelse
143+
random.enumValue(RunCmd.IndexAllocation);
105144

106145
var fuzz_data_dir = try std.fs.cwd().makeOpenPath(sig.FUZZ_DATA_DIR, .{});
107146
defer fuzz_data_dir.close();
@@ -126,18 +165,20 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
126165
};
127166
};
128167

129-
var last_full_snapshot_validated_slot: Slot = 0;
130-
var last_inc_snapshot_validated_slot: Slot = 0;
168+
logger.info().logf("enable manager: {}", .{enable_manager});
169+
logger.info().logf("index allocation: {s}", .{@tagName(index_allocation)});
170+
logger.info().logf("non-sequential slots: {}", .{non_sequential_slots});
131171

132-
const use_disk = random.boolean();
133-
logger.info().logf("use disk: {}", .{use_disk});
134172
var accounts_db: AccountsDB = try .init(.{
135173
.allocator = allocator,
136174
.logger = .from(logger),
137175
.snapshot_dir = main_accountsdb_dir,
138176
.geyser_writer = null,
139177
.gossip_view = null,
140-
.index_allocation = if (use_disk) .disk else .ram,
178+
.index_allocation = switch (index_allocation) {
179+
.ram => .ram,
180+
.disk => .disk,
181+
},
141182
.number_of_index_shards = sig.accounts_db.db.ACCOUNT_INDEX_SHARDS,
142183
});
143184
defer accounts_db.deinit();
@@ -146,18 +187,13 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
146187
try accounts_db.account_index.expandRefCapacity(1_000_000);
147188

148189
var manager_exit: std.atomic.Value(bool) = .init(false);
149-
const manager_handle: std.Thread = try .spawn(.{}, sig.accounts_db.manager.runLoop, .{
150-
&accounts_db, sig.accounts_db.manager.ManagerLoopConfig{
151-
.exit = &manager_exit,
152-
.slots_per_full_snapshot = 50_000,
153-
.slots_per_incremental_snapshot = 5_000,
154-
.zstd_nb_workers = @intCast(std.Thread.getCpuCount() catch 0),
155-
},
190+
var manager: sig.accounts_db.manager.Manager = try .init(allocator, &accounts_db, .{
191+
.exit = &manager_exit,
192+
.slots_per_full_snapshot = 50_000,
193+
.slots_per_incremental_snapshot = 5_000,
194+
.zstd_nb_workers = @intCast(std.Thread.getCpuCount() catch 0),
156195
});
157-
defer {
158-
manager_exit.store(true, .release);
159-
manager_handle.join();
160-
}
196+
defer manager.deinit(allocator);
161197

162198
var tracked_accounts_rw: sig.sync.RwMux(TrackedAccountsMap) = .init(.empty);
163199
defer {
@@ -185,32 +221,48 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
185221
// any validation (in the .get block of the main fuzzer
186222
// loop, we perform validation)
187223
threads.appendAssumeCapacity(try .spawn(.{}, readRandomAccounts, .{
224+
logger,
188225
&accounts_db,
189226
&tracked_accounts_rw,
190227
seed + thread_i,
191228
&reader_exit,
192229
thread_i,
193230
}));
194-
logger.debug().logf("started readRandomAccounts thread: {}", .{thread_i});
195231
}
196232

233+
var last_full_snapshot_validated_slot: Slot = 0;
234+
var last_inc_snapshot_validated_slot: Slot = 0;
197235
var largest_rooted_slot: Slot = 0;
198-
var slot: Slot = 0;
236+
var top_slot: Slot = 0;
199237

200238
var ancestors: sig.core.Ancestors = .EMPTY;
201239
defer ancestors.deinit(allocator);
202240

203241
// get/put a bunch of accounts
204242
while (true) {
205-
if (maybe_max_slots) |max_slots| if (slot >= max_slots) {
243+
if (maybe_max_slots) |max_slots| if (top_slot >= max_slots) {
206244
logger.info().logf("reached max slots: {}", .{max_slots});
207245
break;
208246
};
209-
defer switch (random.int(u2)) {
210-
0 => slot += random.intRangeAtMost(Slot, 1, 2),
211-
1, 2, 3 => {},
247+
const will_inc_slot = switch (random.int(u2)) {
248+
0 => true,
249+
1, 2, 3 => false,
250+
};
251+
defer if (will_inc_slot) {
252+
top_slot += random.intRangeAtMost(Slot, 1, 2);
253+
};
254+
try ancestors.addSlot(allocator, top_slot);
255+
256+
const current_slot = if (!non_sequential_slots) top_slot else slot: {
257+
const ancestor_slots: []const Slot = ancestors.ancestors.keys();
258+
std.debug.assert(ancestor_slots[ancestor_slots.len - 1] == top_slot);
259+
const ancestor_index = random.intRangeLessThan(
260+
usize,
261+
ancestor_slots.len -| 10,
262+
ancestor_slots.len,
263+
);
264+
break :slot ancestor_slots[ancestor_index];
212265
};
213-
try ancestors.addSlot(allocator, slot);
214266

215267
const action = random.enumValue(enum { put, get });
216268
switch (action) {
@@ -222,7 +274,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
222274
var pubkeys_this_slot: std.AutoHashMapUnmanaged(Pubkey, void) = .empty;
223275
defer pubkeys_this_slot.deinit(allocator);
224276
for (0..N_ACCOUNTS_PER_SLOT) |_| {
225-
var tracked_account = TrackedAccount.initRandom(random, slot);
277+
var tracked_account: TrackedAccount = .initRandom(random, current_slot);
226278

227279
const update_all_existing =
228280
tracked_accounts.count() > N_ACCOUNTS_MAX;
@@ -243,7 +295,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
243295

244296
const account_shared_data = try tracked_account.toAccount(allocator);
245297
defer account_shared_data.deinit(allocator);
246-
try accounts_db.putAccount(slot, pubkey, account_shared_data);
298+
try accounts_db.putAccount(current_slot, pubkey, account_shared_data);
247299

248300
// always overwrite the old slot
249301
try tracked_accounts.put(allocator, pubkey, tracked_account);
@@ -281,7 +333,10 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
281333

282334
const account =
283335
try accounts_db.getAccountWithAncestors(&pubkey, &ancestors_sub) orelse {
284-
logger.err().logf("accounts_db missing tracked account '{}': {}", .{ pubkey, tracked_account });
336+
logger.err().logf(
337+
"accounts_db missing tracked account '{}': {}",
338+
.{ pubkey, tracked_account },
339+
);
285340
return error.MissingAccount;
286341
};
287342
defer account.deinit(allocator);
@@ -297,13 +352,15 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
297352
},
298353
}
299354

300-
const create_new_root = random.boolean();
301-
if (create_new_root) {
302-
largest_rooted_slot = @min(slot, largest_rooted_slot + 2);
355+
const create_new_root =
356+
enable_manager and
357+
will_inc_slot and
358+
random.int(u8) == 0;
359+
if (create_new_root) snapshot_validation: {
360+
largest_rooted_slot = @min(top_slot, largest_rooted_slot + 2);
303361
accounts_db.largest_rooted_slot.store(largest_rooted_slot, .monotonic);
304-
}
362+
try manager.manage(allocator);
305363

306-
snapshot_validation: {
307364
// holding the lock here means that the snapshot archive(s) wont be deleted
308365
// since deletion requires a write lock
309366
const maybe_latest_snapshot_info, //
@@ -449,20 +506,21 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
449506
}
450507

451508
fn readRandomAccounts(
509+
logger: Logger,
452510
db: *AccountsDB,
453511
tracked_accounts_rw: *sig.sync.RwMux(TrackedAccountsMap),
454512
seed: u64,
455513
exit: *std.atomic.Value(bool),
456514
thread_id: usize,
457515
) void {
458-
var prng = std.Random.DefaultPrng.init(seed);
516+
logger.debug().logf("started readRandomAccounts thread: {}", .{thread_id});
517+
defer logger.debug().logf("finishing readRandomAccounts thread: {}", .{thread_id});
518+
519+
var prng: std.Random.DefaultPrng = .init(seed);
459520
const random = prng.random();
460521

461522
while (true) {
462-
if (exit.load(.seq_cst)) {
463-
std.debug.print("finishing readRandomAccounts thread: {}\n", .{thread_id});
464-
return;
465-
}
523+
if (exit.load(.seq_cst)) return;
466524

467525
var pubkeys: [50]Pubkey = undefined;
468526
{

src/accountsdb/manager.zig

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,7 @@ fn flushSlot(db: *AccountsDB, slot: Slot) !FileId {
293293
const unrooted_accounts, var unrooted_accounts_lg = db.unrooted_accounts.readWithLock();
294294
defer unrooted_accounts_lg.unlock();
295295

296-
const pubkeys_and_accounts = unrooted_accounts.get(slot) orelse
297-
return error.SlotNotFound;
296+
const pubkeys_and_accounts = unrooted_accounts.get(slot) orelse return error.SlotNotFound;
298297
break :blk pubkeys_and_accounts;
299298
};
300299

0 commit comments

Comments
 (0)