|
14 | 14 | import time
|
15 | 15 | import json
|
16 | 16 |
|
17 |
| -from ducktape.mark import ignore, ok_to_fail |
| 17 | +from ducktape.mark import ignore, ok_to_fail, parametrize |
18 | 18 | from ducktape.tests.test import TestContext
|
19 | 19 | from ducktape.utils.util import wait_until
|
20 | 20 | from rptest.clients.rpk import RpkTool
|
@@ -1153,38 +1153,92 @@ def producer_complete():
|
1153 | 1153 | consumer.stop()
|
1154 | 1154 | consumer.free()
|
1155 | 1155 |
|
1156 |
| - @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) |
1157 |
| - def test_ht004_minpartomb(self): |
1158 |
| - validator_overrides = { |
1159 |
| - OMBSampleConfigurations.E2E_LATENCY_50PCT: |
1160 |
| - [OMBSampleConfigurations.lte(51)], |
1161 |
| - OMBSampleConfigurations.E2E_LATENCY_AVG: |
1162 |
| - [OMBSampleConfigurations.lte(145)], |
1163 |
| - } |
1164 |
| - partitions_per_topic = self.config.partitions_max_scaled |
1165 |
| - workload = { |
| 1156 | + def _prepare_omb_workload(self, ramp_time, duration, partitions, rate, |
| 1157 | + msg_size): |
| 1158 | + return { |
1166 | 1159 | "name": "HT004-MINPARTOMB",
|
1167 | 1160 | "topics": 1,
|
1168 |
| - "partitions_per_topic": partitions_per_topic, |
| 1161 | + "partitions_per_topic": partitions, |
1169 | 1162 | "subscriptions_per_topic": 1,
|
1170 | 1163 | "consumer_per_subscription": 3,
|
1171 | 1164 | "producers_per_topic": 1,
|
1172 |
| - "producer_rate": int(self.config.ingress_rate_scaled / 8), |
1173 |
| - "message_size": 8 * KiB, |
| 1165 | + "producer_rate": rate, |
| 1166 | + "message_size": msg_size, |
1174 | 1167 | "consumer_backlog_size_GB": 0,
|
1175 |
| - "test_duration_minutes": 1, |
1176 |
| - "warmup_duration_minutes": 1, |
| 1168 | + "test_duration_minutes": duration, |
| 1169 | + "warmup_duration_minutes": ramp_time, |
1177 | 1170 | "use_randomized_payloads": True,
|
1178 | 1171 | "random_bytes_ratio": 0.5,
|
1179 | 1172 | "randomized_payload_pool_size": 100,
|
1180 | 1173 | }
|
1181 | 1174 |
|
1182 |
| - benchmark = OpenMessagingBenchmark( |
| 1175 | + def _run_bench(self, workload, validator_overrides): |
| 1176 | + _bench = OpenMessagingBenchmark( |
1183 | 1177 | self._ctx, self.redpanda, "SIMPLE_DRIVER",
|
1184 | 1178 | (workload, OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
|
1185 | 1179 | | validator_overrides))
|
| 1180 | + _bench.start() |
| 1181 | + benchmark_time_min = _bench.benchmark_time() + 1 |
| 1182 | + _bench.wait(timeout_sec=benchmark_time_min * 60) |
| 1183 | + _metrics = json.loads(_bench.node.account.ssh_output(_bench.chart_cmd)) |
| 1184 | + return _bench, list(_metrics.values())[0] |
| 1185 | + |
| 1186 | + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) |
| 1187 | + @parametrize(partitions="min") |
| 1188 | + @parametrize(partitions="max") |
| 1189 | + def test_htt_partitions_omb(self, partitions): |
| 1190 | + def _format_metrics(idle, tier): |
| 1191 | + keys = idle.keys() |
| 1192 | + return "\n".join([f"{k} = {idle[k]} / {tier[k]} " for k in keys]) |
| 1193 | + |
| 1194 | + # Get values for almost idle cluster load |
| 1195 | + _min_idle_lat = 1000 |
| 1196 | + # Assume we have 1 partition per shard, |
| 1197 | + # then number of CPU should be equal to min number of partitions |
| 1198 | + # to get idle-like activity |
| 1199 | + _num_partitions = 8 |
| 1200 | + |
| 1201 | + if partitions not in ["min", "max"]: |
| 1202 | + raise RuntimeError("Test parameter for partitions invalid") |
| 1203 | + |
| 1204 | + idle_validators = { |
| 1205 | + OMBSampleConfigurations.E2E_LATENCY_50PCT: |
| 1206 | + [OMBSampleConfigurations.lte(_min_idle_lat)], |
| 1207 | + OMBSampleConfigurations.E2E_LATENCY_AVG: |
| 1208 | + [OMBSampleConfigurations.lte(_min_idle_lat * 3)], |
| 1209 | + } |
| 1210 | + idle_workload = self._prepare_omb_workload(1, 2, _num_partitions, |
| 1211 | + 1 * MiB, 8 * KiB) |
| 1212 | + _, idle_metrics = self._run_bench(idle_workload, idle_validators) |
1186 | 1213 |
|
1187 |
| - benchmark.start() |
1188 |
| - benchmark_time_min = benchmark.benchmark_time() + 1 |
1189 |
| - benchmark.wait(timeout_sec=benchmark_time_min * 60) |
| 1214 | + # Get values for idle workload |
| 1215 | + k_e2e_50pct = idle_metrics[OMBSampleConfigurations.E2E_LATENCY_50PCT] |
| 1216 | + k_e2e_avg = idle_metrics[OMBSampleConfigurations.E2E_LATENCY_AVG] |
| 1217 | + |
| 1218 | + # Calculate target throughput latencies |
| 1219 | + target_e2e_50pct = k_e2e_50pct + 51 |
| 1220 | + target_e2e_avg = k_e2e_avg + 145 |
| 1221 | + |
| 1222 | + # Measure with target load |
| 1223 | + validator_overrides = { |
| 1224 | + OMBSampleConfigurations.E2E_LATENCY_50PCT: |
| 1225 | + [OMBSampleConfigurations.lte(target_e2e_50pct)], |
| 1226 | + OMBSampleConfigurations.E2E_LATENCY_AVG: |
| 1227 | + [OMBSampleConfigurations.lte(target_e2e_avg)], |
| 1228 | + } |
| 1229 | + # Select number of partitions |
| 1230 | + if partitions == "min": |
| 1231 | + _num_partitions = self.tier_config.partitions_min |
| 1232 | + elif partitions == "max": |
| 1233 | + _num_partitions = self.tier_config.partitions_upper_limit |
| 1234 | + |
| 1235 | + workload = self._prepare_omb_workload(1, 2, _num_partitions, |
| 1236 | + self.tier_config.ingress_rate, |
| 1237 | + 8 * KiB) |
| 1238 | + benchmark, metrics = self._run_bench(workload, validator_overrides) |
1190 | 1239 | benchmark.check_succeed()
|
| 1240 | + |
| 1241 | + # Tier metrics should not diviate from idle |
| 1242 | + # metrics more than 145 ms on the average |
| 1243 | + self.logger.info('Workload metrics (idle/tier): ' |
| 1244 | + '"{}"'.format(_format_metrics(idle_metrics, metrics))) |
0 commit comments