Skip to content

Commit

Permalink
rptest: Updating parameters and test structure
Browse files Browse the repository at this point in the history
  Test will run OMB with minimal workload to measure idle latency and
  then use target partitions number to measure desired parameters
  • Loading branch information
savex committed Sep 20, 2023
1 parent 512ee6c commit 37612c4
Showing 1 changed file with 114 additions and 26 deletions.
140 changes: 114 additions & 26 deletions tests/rptest/redpanda_cloud_tests/high_throughput_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import time
import json

from ducktape.mark import ignore, ok_to_fail
from ducktape.mark import ignore, ok_to_fail, parametrize
from ducktape.tests.test import TestContext
from ducktape.utils.util import wait_until
from rptest.clients.rpk import RpkTool
Expand Down Expand Up @@ -163,6 +163,22 @@ def traffic_generator(context, redpanda, tier_cfg, *args, **kwargs):
) >= tier_cfg.egress_rate, f"Observed consumer throughput {consumer_throughput} too low, expected: {tier_cfg.egress_rate}"


@contextmanager
def omb_runner(context, redpanda, driver, workload, omb_config):
bench = OpenMessagingBenchmark(context, redpanda, driver,
(workload, omb_config))
bench.start()
try:
benchmark_time_min = bench.benchmark_time() + 1
bench.wait(timeout_sec=benchmark_time_min * 60)
yield bench
except:
redpanda.logger.exception("Exception within OMB")
raise
finally:
bench.stop()


def get_globals_value(globals, key_name, default=None):
_config = {}
if RedpandaServiceCloud.GLOBAL_CLOUD_CLUSTER_CONFIG in globals:
Expand Down Expand Up @@ -231,7 +247,12 @@ def tearDown(self):
# the same cluster is used. Therefore state between runs will still exist,
# remove the topic after each test run to clean out old state.
if RedpandaServiceCloud.GLOBAL_CLOUD_CLUSTER_CONFIG in self._ctx.globals:
self.rpk.delete_topic(self.topic)
try:
self.rpk.delete_topic(self.topic)
except:
# Not really important what type of error is
# Its either topic or cluster already deleted
pass

def load_many_segments(self):
"""
Expand Down Expand Up @@ -1153,38 +1174,105 @@ def producer_complete():
consumer.stop()
consumer.free()

@cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_ht004_minpartomb(self):
validator_overrides = {
OMBSampleConfigurations.E2E_LATENCY_50PCT:
[OMBSampleConfigurations.lte(51)],
OMBSampleConfigurations.E2E_LATENCY_AVG:
[OMBSampleConfigurations.lte(145)],
}
partitions_per_topic = self.config.partitions_max_scaled
workload = {
def _prepare_omb_workload(self, ramp_time, duration, partitions, rate,
msg_size):
return {
"name": "HT004-MINPARTOMB",
"topics": 1,
"partitions_per_topic": partitions_per_topic,
"partitions_per_topic": partitions,
"subscriptions_per_topic": 1,
"consumer_per_subscription": 3,
"producers_per_topic": 1,
"producer_rate": int(self.config.ingress_rate_scaled / 8),
"message_size": 8 * KiB,
"producer_rate": rate,
"message_size": msg_size,
"consumer_backlog_size_GB": 0,
"test_duration_minutes": 1,
"warmup_duration_minutes": 1,
"test_duration_minutes": duration,
"warmup_duration_minutes": ramp_time,
"use_randomized_payloads": True,
"random_bytes_ratio": 0.5,
"randomized_payload_pool_size": 100,
}

benchmark = OpenMessagingBenchmark(
self._ctx, self.redpanda, "SIMPLE_DRIVER",
(workload, OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
| validator_overrides))

benchmark.start()
benchmark_time_min = benchmark.benchmark_time() + 1
benchmark.wait(timeout_sec=benchmark_time_min * 60)
benchmark.check_succeed()
@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
@parametrize(partitions="min")
@parametrize(partitions="max")
def test_htt_partitions_omb(self, partitions):
def _format_metrics(idle, tier):
keys = idle.keys()
return "\n".join([f"{k} = {idle[k]} / {tier[k]} " for k in keys])

def _get_metrics(bench):
return list(
json.loads(bench.node.account.ssh_output(
bench.chart_cmd)).values())[0]

# Cleanup default topic
self.rpk.delete_topic(self.topic)

# Get values for almost idle cluster load
_min_idle_lat = 1000
rampup_time = 1
idle_runtime = 2
main_runtime = 30
idle_rate = 1 * MiB
main_rate = self.tier_config.ingress_rate
msg_size = 8 * KiB
# Assume we have 1 partition per shard,
# then number of CPU should be equal to min number of partitions
# to get idle-like activity
_num_partitions = 4

if partitions not in ["min", "max"]:
raise RuntimeError("Test parameter for partitions invalid")

idle_validators = {
OMBSampleConfigurations.E2E_LATENCY_50PCT:
[OMBSampleConfigurations.lte(_min_idle_lat)],
OMBSampleConfigurations.E2E_LATENCY_AVG:
[OMBSampleConfigurations.lte(_min_idle_lat * 3)],
}
idle_workload = self._prepare_omb_workload(rampup_time, idle_runtime,
_num_partitions, idle_rate,
msg_size)
with omb_runner(
self._ctx, self.redpanda, "SIMPLE_DRIVER", idle_workload,
OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
| idle_validators) as omb:
idle_metrics = _get_metrics(omb)

# Get values for idle workload
k_e2e_50pct = idle_metrics[OMBSampleConfigurations.E2E_LATENCY_50PCT]
k_e2e_avg = idle_metrics[OMBSampleConfigurations.E2E_LATENCY_AVG]

# Calculate target throughput latencies
target_e2e_50pct = k_e2e_50pct + 51
target_e2e_avg = k_e2e_avg + 145

# Measure with target load
validator_overrides = {
OMBSampleConfigurations.E2E_LATENCY_50PCT:
[OMBSampleConfigurations.lte(target_e2e_50pct)],
OMBSampleConfigurations.E2E_LATENCY_AVG:
[OMBSampleConfigurations.lte(target_e2e_avg)],
}
# Select number of partitions
if partitions == "min":
_num_partitions = self.tier_config.partitions_min
elif partitions == "max":
_num_partitions = self.tier_config.partitions_upper_limit

workload = self._prepare_omb_workload(rampup_time, main_runtime,
_num_partitions, main_rate,
msg_size)
with omb_runner(
self._ctx, self.redpanda, "SIMPLE_DRIVER", workload,
OMBSampleConfigurations.UNIT_TEST_LATENCY_VALIDATOR
| validator_overrides) as omb:
metrics = _get_metrics(omb)
# Tier metrics should not diviate from idle
# metrics more than 145 ms on the average
self.logger.info('Workload metrics (idle/tier): '
'"{}"'.format(
_format_metrics(idle_metrics, metrics)))
# Assert test results
omb.check_succeed()

0 comments on commit 37612c4

Please sign in to comment.