Skip to content

Commit 7c32062

Browse files
committed
feat: enable targeted-load for 2PC loadgen
This sets up the 2PC load-generator to read the config values the Test Controller already supports* to limit throughput to a set amount, optionally increasing it according to a definable schedule. Among other benefits, this allows the tester to constrain load-generators directly rather than by relying on the preseed count to artifically reduce their load. This also includes the relevant config changes to enable respected per-loadgen log-levels. Signed-off-by: Sam Stuewe <[email protected]>
1 parent d792c5e commit 7c32062

File tree

3 files changed

+155
-2
lines changed

3 files changed

+155
-2
lines changed

src/util/common/config.cpp

+24
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,13 @@ namespace cbdc::config {
227227
return ss.str();
228228
}
229229

230+
auto get_loadgen_loglevel_key(size_t loadgen_id) -> std::string {
231+
std::stringstream ss;
232+
ss << loadgen_prefix << loadgen_id << config_separator
233+
<< loglevel_postfix;
234+
return ss.str();
235+
}
236+
230237
auto get_sentinel_private_key_key(size_t sentinel_id) -> std::string {
231238
auto ss = std::stringstream();
232239
get_sentinel_key_prefix(ss, sentinel_id);
@@ -614,6 +621,23 @@ namespace cbdc::config {
614621

615622
opts.m_loadgen_count
616623
= cfg.get_ulong(loadgen_count_key).value_or(opts.m_loadgen_count);
624+
opts.m_loadgen_tps_target = cfg.get_ulong(tps_target_key)
625+
.value_or(opts.m_loadgen_tps_target);
626+
opts.m_loadgen_tps_step_time
627+
= cfg.get_ulong(tps_steptime_key)
628+
.value_or(opts.m_loadgen_tps_step_time);
629+
opts.m_loadgen_tps_step_size
630+
= cfg.get_ulong(tps_stepsize_key)
631+
.value_or(opts.m_loadgen_tps_step_size);
632+
opts.m_loadgen_tps_initial = cfg.get_ulong(tps_initial_key)
633+
.value_or(opts.m_loadgen_tps_initial);
634+
for(size_t i{0}; i < opts.m_loadgen_count; ++i) {
635+
const auto loadgen_loglevel_key = get_loadgen_loglevel_key(i);
636+
const auto loadgen_loglevel
637+
= cfg.get_loglevel(loadgen_loglevel_key)
638+
.value_or(defaults::log_level);
639+
opts.m_loadgen_loglevels.push_back(loadgen_loglevel);
640+
}
617641
}
618642

619643
auto read_options(const std::string& config_file)

src/util/common/config.hpp

+17
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ namespace cbdc::config {
100100
static constexpr auto output_count_key = "loadgen_sendtx_output_count";
101101
static constexpr auto invalid_rate_key = "loadgen_invalid_tx_rate";
102102
static constexpr auto fixed_tx_rate_key = "loadgen_fixed_tx_rate";
103+
static constexpr auto loadgen_prefix = "loadgen";
104+
static constexpr auto tps_target_key = "loadgen_tps_target";
105+
static constexpr auto tps_steptime_key = "loadgen_tps_step_time";
106+
static constexpr auto tps_stepsize_key = "loadgen_tps_step_percentage";
107+
static constexpr auto tps_initial_key = "loadgen_tps_step_start";
103108
static constexpr auto archiver_count_key = "archiver_count";
104109
static constexpr auto watchtower_count_key = "watchtower_count";
105110
static constexpr auto watchtower_prefix = "watchtower";
@@ -250,6 +255,18 @@ namespace cbdc::config {
250255
/// Number of load generators over which to split pre-seeded UTXOs.
251256
size_t m_loadgen_count{0};
252257

258+
/// List of loadgen log levels, ordered by loadgen ID.
259+
std::vector<logging::log_level> m_loadgen_loglevels;
260+
261+
/// Maximum Tx/s the loadgens should produce
262+
size_t m_loadgen_tps_target{0};
263+
/// Initial Tx/s to send at test-start
264+
size_t m_loadgen_tps_initial{0};
265+
/// Tx/s to increase on each step
266+
size_t m_loadgen_tps_step_size{0};
267+
/// Time (in miliseconds) to wait before the next step up
268+
size_t m_loadgen_tps_step_time{0};
269+
253270
/// Private keys for sentinels.
254271
std::unordered_map<size_t, privkey_t> m_sentinel_private_keys;
255272

tools/bench/twophase_gen.cpp

+114-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,14 @@ auto main(int argc, char** argv) -> int {
3434
auto cfg = std::get<cbdc::config::options>(cfg_or_err);
3535

3636
auto gen_id = std::stoull(args[2]);
37-
auto logger
38-
= std::make_shared<cbdc::logging::log>(cbdc::logging::log_level::info);
37+
if(gen_id >= cfg.m_loadgen_count) {
38+
std::cerr << "Attempted to run more loadgens than configured"
39+
<< std::endl;
40+
return -1;
41+
}
42+
43+
auto logger = std::make_shared<cbdc::logging::log>(
44+
cfg.m_loadgen_loglevels[gen_id]);
3945

4046
auto sha2_impl = SHA256AutoDetect();
4147
logger->info("using sha2: ", sha2_impl);
@@ -115,6 +121,25 @@ auto main(int argc, char** argv) -> int {
115121
logger->info("Mint confirmed");
116122
}
117123

124+
size_t per_gen_send_limit = 0;
125+
size_t per_gen_step_size = 0;
126+
size_t per_gen_send_tgt = 0;
127+
if(cfg.m_loadgen_tps_target != 0) {
128+
per_gen_send_tgt = cfg.m_loadgen_tps_target / cfg.m_loadgen_count;
129+
per_gen_send_limit = cfg.m_loadgen_tps_initial / cfg.m_loadgen_count;
130+
size_t range = cfg.m_loadgen_tps_target - cfg.m_loadgen_tps_initial;
131+
size_t per_gen_range = range / cfg.m_loadgen_count;
132+
per_gen_step_size = std::max(
133+
std::min(static_cast<size_t>(
134+
per_gen_range * (cfg.m_loadgen_tps_step_size * .01)),
135+
per_gen_range),
136+
size_t{1});
137+
}
138+
139+
if(cfg.m_loadgen_tps_step_time == 0) {
140+
per_gen_send_limit = per_gen_send_tgt;
141+
}
142+
118143
static constexpr auto lookup_timeout = std::chrono::milliseconds(5000);
119144
auto status_client = cbdc::locking_shard::rpc::status_client(
120145
cfg.m_locking_shard_readonly_endpoints,
@@ -161,7 +186,13 @@ auto main(int argc, char** argv) -> int {
161186

162187
constexpr auto send_amt = 5;
163188

189+
uint64_t send_gap{};
164190
uint64_t gen_avg{};
191+
auto ramp_timer_full
192+
= std::chrono::nanoseconds(cfg.m_loadgen_tps_step_time * 1000000);
193+
auto ramp_timer = ramp_timer_full;
194+
auto ramping
195+
= ramp_timer.count() != 0 && per_gen_send_limit != per_gen_send_tgt;
165196
auto gen_thread = std::thread([&]() {
166197
while(running) {
167198
// Determine if we should attempt to send a double-spending
@@ -219,8 +250,51 @@ auto main(int argc, char** argv) -> int {
219250
gen_avg = static_cast<uint64_t>(
220251
(static_cast<double>(gen_t.count()) * average_factor)
221252
+ (static_cast<double>(gen_avg) * (1.0 - average_factor)));
253+
if(ramping) {
254+
if(gen_t >= ramp_timer) {
255+
logger->debug(
256+
"Ramp Timer Exhausted (gen_t). Resetting");
257+
ramp_timer = ramp_timer_full;
258+
per_gen_send_limit
259+
= std::min(per_gen_send_tgt,
260+
per_gen_send_limit + per_gen_step_size);
261+
logger->debug("New Send Limit:", per_gen_send_limit);
262+
if(per_gen_send_limit == per_gen_send_tgt) {
263+
ramping = false;
264+
logger->info("Reached Target Throughput");
265+
}
266+
} else {
267+
ramp_timer -= gen_t;
268+
}
269+
}
270+
auto total_send_time
271+
= std::chrono::nanoseconds(gen_avg * per_gen_send_limit);
272+
if(total_send_time < std::chrono::seconds(1)) {
273+
send_gap
274+
= (std::chrono::seconds(1) - total_send_time).count()
275+
/ per_gen_send_limit;
276+
logger->trace("New send-gap:", send_gap);
277+
}
222278
} else {
223279
std::this_thread::sleep_for(std::chrono::nanoseconds(gen_avg));
280+
if(ramping) {
281+
auto avg = std::chrono::nanoseconds(gen_avg);
282+
if(avg >= ramp_timer) {
283+
logger->debug("Ramp Timer Exhausted (dbl-spend "
284+
"gen_avg). Resetting");
285+
ramp_timer = ramp_timer_full;
286+
per_gen_send_limit
287+
= std::min(per_gen_send_tgt,
288+
per_gen_send_limit + per_gen_step_size);
289+
logger->debug("New Send Limit:", per_gen_send_limit);
290+
if(per_gen_send_limit == per_gen_send_tgt) {
291+
ramping = false;
292+
logger->info("Reached Target Throughput");
293+
}
294+
} else {
295+
ramp_timer -= avg;
296+
}
297+
}
224298
}
225299

226300
// We couldn't send a double-spend or a newly generated valid
@@ -234,6 +308,23 @@ auto main(int argc, char** argv) -> int {
234308
// instead.
235309
static constexpr auto send_delay = std::chrono::seconds(1);
236310
std::this_thread::sleep_for(send_delay);
311+
if(ramping) {
312+
if(send_delay >= ramp_timer) {
313+
logger->debug(
314+
"Ramp Timer Exhausted (send_delay). Resetting");
315+
ramp_timer = ramp_timer_full;
316+
per_gen_send_limit
317+
= std::min(per_gen_send_tgt,
318+
per_gen_send_limit + per_gen_step_size);
319+
logger->debug("New Send Limit:", per_gen_send_limit);
320+
if(per_gen_send_limit == per_gen_send_tgt) {
321+
ramping = false;
322+
logger->info("Reached Target Throughput");
323+
}
324+
} else {
325+
ramp_timer -= send_delay;
326+
}
327+
}
237328
continue;
238329
}
239330

@@ -280,6 +371,27 @@ auto main(int argc, char** argv) -> int {
280371
logger->error("Failure sending transaction to sentinel");
281372
wallet.confirm_inputs(tx.value().m_inputs);
282373
}
374+
375+
auto gap = std::chrono::nanoseconds(send_gap);
376+
if(gap < std::chrono::seconds(1)) {
377+
std::this_thread::sleep_for(gap);
378+
if(ramping) {
379+
if(gap >= ramp_timer) {
380+
logger->debug("Ramp Timer Exhausted (gap). Resetting");
381+
ramp_timer = ramp_timer_full;
382+
per_gen_send_limit
383+
= std::min(per_gen_send_tgt,
384+
per_gen_send_limit + per_gen_step_size);
385+
logger->debug("New Send Limit:", per_gen_send_limit);
386+
if(per_gen_send_limit == per_gen_send_tgt) {
387+
ramping = false;
388+
logger->info("Reached Target Throughput");
389+
}
390+
} else {
391+
ramp_timer -= gap;
392+
}
393+
}
394+
}
283395
}
284396
});
285397

0 commit comments

Comments
 (0)