Skip to content

Commit 25792ab

Browse files
committed
rptest: Updating parameters and test structure
Test will run OMB with minimal workload to measure idle latency and then use target partitions number to measure desired parameters
1 parent 512ee6c commit 25792ab

File tree

1 file changed

+108
-25
lines changed

1 file changed

+108
-25
lines changed

tests/rptest/redpanda_cloud_tests/high_throughput_test.py

+108-25
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import time
1515
import json
1616

17-
from ducktape.mark import ignore, ok_to_fail
17+
from ducktape.mark import ignore, ok_to_fail, parametrize
1818
from ducktape.tests.test import TestContext
1919
from ducktape.utils.util import wait_until
2020
from rptest.clients.rpk import RpkTool
@@ -163,6 +163,22 @@ def traffic_generator(context, redpanda, tier_cfg, *args, **kwargs):
163163
) >= tier_cfg.egress_rate, f"Observed consumer throughput {consumer_throughput} too low, expected: {tier_cfg.egress_rate}"
164164

165165

166+
@contextmanager
167+
def omb_runner(context, redpanda, driver, workload, omb_config):
168+
bench = OpenMessagingBenchmark(context, redpanda, driver,
169+
(workload, omb_config))
170+
bench.start()
171+
try:
172+
benchmark_time_min = bench.benchmark_time() + 1
173+
bench.wait(timeout_sec=benchmark_time_min * 60)
174+
yield bench
175+
except:
176+
redpanda.logger.exception("Exception within OMB")
177+
raise
178+
finally:
179+
bench.stop()
180+
181+
166182
def get_globals_value(globals, key_name, default=None):
167183
_config = {}
168184
if RedpandaServiceCloud.GLOBAL_CLOUD_CLUSTER_CONFIG in globals:
@@ -1153,38 +1169,105 @@ def producer_complete():
11531169
consumer.stop()
11541170
consumer.free()
11551171

1156-
@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
1157-
def test_ht004_minpartomb(self):
1158-
validator_overrides = {
1159-
OMBSampleConfigurations.E2E_LATENCY_50PCT:
1160-
[OMBSampleConfigurations.lte(51)],
1161-
OMBSampleConfigurations.E2E_LATENCY_AVG:
1162-
[OMBSampleConfigurations.lte(145)],
1163-
}
1164-
partitions_per_topic = self.config.partitions_max_scaled
1165-
workload = {
1172+
def _prepare_omb_workload(self, ramp_time, duration, partitions, rate,
1173+
msg_size):
1174+
return {
11661175
"name": "HT004-MINPARTOMB",
11671176
"topics": 1,
1168-
"partitions_per_topic": partitions_per_topic,
1177+
"partitions_per_topic": partitions,
11691178
"subscriptions_per_topic": 1,
11701179
"consumer_per_subscription": 3,
11711180
"producers_per_topic": 1,
1172-
"producer_rate": int(self.config.ingress_rate_scaled / 8),
1173-
"message_size": 8 * KiB,
1181+
"producer_rate": rate,
1182+
"message_size": msg_size,
11741183
"consumer_backlog_size_GB": 0,
1175-
"test_duration_minutes": 1,
1176-
"warmup_duration_minutes": 1,
1184+
"test_duration_minutes": duration,
1185+
"warmup_duration_minutes": ramp_time,
11771186
"use_randomized_payloads": True,
11781187
"random_bytes_ratio": 0.5,
11791188
"randomized_payload_pool_size": 100,
11801189
}
11811190

1182-
benchmark = OpenMessagingBenchmark(
1183-
self._ctx, self.redpanda, "SIMPLE_DRIVER",
1184-
(workload, OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
1185-
| validator_overrides))
1186-
1187-
benchmark.start()
1188-
benchmark_time_min = benchmark.benchmark_time() + 1
1189-
benchmark.wait(timeout_sec=benchmark_time_min * 60)
1190-
benchmark.check_succeed()
1191+
@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
1192+
@parametrize(partitions="min")
1193+
@parametrize(partitions="max")
1194+
def test_htt_partitions_omb(self, partitions):
1195+
def _format_metrics(idle, tier):
1196+
keys = idle.keys()
1197+
return "\n".join([f"{k} = {idle[k]} / {tier[k]} " for k in keys])
1198+
1199+
def _get_metrics(bench):
1200+
return list(
1201+
json.loads(bench.node.account.ssh_output(
1202+
bench.chart_cmd)).values())[0]
1203+
1204+
# Cleanup default topic
1205+
self.rpk.delete_topic(self.topic)
1206+
1207+
# Get values for almost idle cluster load
1208+
_min_idle_lat = 1000
1209+
rampup_time = 1
1210+
idle_runtime = 2
1211+
main_runtime = 30
1212+
idle_rate = 1 * MiB
1213+
main_rate = self.tier_config.ingress_rate
1214+
msg_size = 8 * KiB
1215+
# Assume we have 1 partition per shard,
1216+
# then number of CPU should be equal to min number of partitions
1217+
# to get idle-like activity
1218+
_num_partitions = 4
1219+
1220+
if partitions not in ["min", "max"]:
1221+
raise RuntimeError("Test parameter for partitions invalid")
1222+
1223+
idle_validators = {
1224+
OMBSampleConfigurations.E2E_LATENCY_50PCT:
1225+
[OMBSampleConfigurations.lte(_min_idle_lat)],
1226+
OMBSampleConfigurations.E2E_LATENCY_AVG:
1227+
[OMBSampleConfigurations.lte(_min_idle_lat * 3)],
1228+
}
1229+
idle_workload = self._prepare_omb_workload(rampup_time, idle_runtime,
1230+
_num_partitions, idle_rate,
1231+
msg_size)
1232+
with omb_runner(
1233+
self._ctx, self.redpanda, "SIMPLE_DRIVER", idle_workload,
1234+
OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
1235+
| idle_validators) as omb:
1236+
idle_metrics = _get_metrics(omb)
1237+
1238+
# Get values for idle workload
1239+
k_e2e_50pct = idle_metrics[OMBSampleConfigurations.E2E_LATENCY_50PCT]
1240+
k_e2e_avg = idle_metrics[OMBSampleConfigurations.E2E_LATENCY_AVG]
1241+
1242+
# Calculate target throughput latencies
1243+
target_e2e_50pct = k_e2e_50pct + 51
1244+
target_e2e_avg = k_e2e_avg + 145
1245+
1246+
# Measure with target load
1247+
validator_overrides = {
1248+
OMBSampleConfigurations.E2E_LATENCY_50PCT:
1249+
[OMBSampleConfigurations.lte(target_e2e_50pct)],
1250+
OMBSampleConfigurations.E2E_LATENCY_AVG:
1251+
[OMBSampleConfigurations.lte(target_e2e_avg)],
1252+
}
1253+
# Select number of partitions
1254+
if partitions == "min":
1255+
_num_partitions = self.tier_config.partitions_min
1256+
elif partitions == "max":
1257+
_num_partitions = self.tier_config.partitions_upper_limit
1258+
1259+
workload = self._prepare_omb_workload(rampup_time, main_runtime,
1260+
_num_partitions, main_rate,
1261+
msg_size)
1262+
with omb_runner(
1263+
self._ctx, self.redpanda, "SIMPLE_DRIVER", workload,
1264+
OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
1265+
| validator_overrides) as omb:
1266+
metrics = _get_metrics(omb)
1267+
# Tier metrics should not diviate from idle
1268+
# metrics more than 145 ms on the average
1269+
self.logger.info('Workload metrics (idle/tier): '
1270+
'"{}"'.format(
1271+
_format_metrics(idle_metrics, metrics)))
1272+
# Assert test results
1273+
omb.check_succeed()

0 commit comments

Comments
 (0)