14
14
import time
15
15
import json
16
16
17
- from ducktape .mark import ignore , ok_to_fail
17
+ from ducktape .mark import ignore , ok_to_fail , parametrize
18
18
from ducktape .tests .test import TestContext
19
19
from ducktape .utils .util import wait_until
20
20
from rptest .clients .rpk import RpkTool
30
30
from rptest .services .openmessaging_benchmark_configs import \
31
31
OMBSampleConfigurations
32
32
from rptest .services .producer_swarm import ProducerSwarm
33
- from rptest .services .redpanda_cloud import AdvertisedTierConfigs , CloudTierName
33
+ from rptest .services .redpanda_cloud import AdvertisedTierConfigs , CloudTierName , get_tier_name
34
34
from rptest .services .redpanda import (RESTART_LOG_ALLOW_LIST , MetricsEndpoint ,
35
35
SISettings , RedpandaServiceCloud )
36
36
from rptest .services .rpk_consumer import RpkConsumer
@@ -163,13 +163,13 @@ def traffic_generator(context, redpanda, tier_cfg, *args, **kwargs):
163
163
) >= tier_cfg .egress_rate , f"Observed consumer throughput { consumer_throughput } too low, expected: { tier_cfg .egress_rate } "
164
164
165
165
166
- def get_globals_value (globals , key_name , default = None ):
166
+ def get_cloud_globals (globals ):
167
167
_config = {}
168
168
if RedpandaServiceCloud .GLOBAL_CLOUD_CLUSTER_CONFIG in globals :
169
169
# Load needed config values from cloud section
170
170
# of globals prior to actual cluster creation
171
171
_config = globals [RedpandaServiceCloud .GLOBAL_CLOUD_CLUSTER_CONFIG ]
172
- return _config . get ( key_name , default )
172
+ return _config
173
173
174
174
175
175
class HighThroughputTest (RedpandaTest ):
@@ -179,11 +179,8 @@ class HighThroughputTest(RedpandaTest):
179
179
180
180
def __init__ (self , test_ctx : TestContext , * args , ** kwargs ):
181
181
self ._ctx = test_ctx
182
- # Get tier value
183
- cloud_tier_str = get_globals_value (self ._ctx .globals ,
184
- "config_profile_name" ,
185
- default = "tier-1-aws" )
186
- cloud_tier = CloudTierName (cloud_tier_str )
182
+ # Get tier name
183
+ cloud_tier = get_tier_name (get_cloud_globals (self ._ctx .globals ))
187
184
extra_rp_conf = None
188
185
num_brokers = None
189
186
@@ -1152,3 +1149,93 @@ def producer_complete():
1152
1149
1153
1150
consumer .stop ()
1154
1151
consumer .free ()
1152
+
1153
+ def _prepare_omb_workload (self , ramp_time , duration , partitions , rate ,
1154
+ msg_size ):
1155
+ return {
1156
+ "name" : "HT004-MINPARTOMB" ,
1157
+ "topics" : 1 ,
1158
+ "partitions_per_topic" : partitions ,
1159
+ "subscriptions_per_topic" : 1 ,
1160
+ "consumer_per_subscription" : 3 ,
1161
+ "producers_per_topic" : 1 ,
1162
+ "producer_rate" : rate ,
1163
+ "message_size" : msg_size ,
1164
+ "consumer_backlog_size_GB" : 0 ,
1165
+ "test_duration_minutes" : duration ,
1166
+ "warmup_duration_minutes" : ramp_time ,
1167
+ "use_randomized_payloads" : True ,
1168
+ "random_bytes_ratio" : 0.5 ,
1169
+ "randomized_payload_pool_size" : 100 ,
1170
+ }
1171
+
1172
+ def _run_bench (self , workload , validator_overrides ):
1173
+ _bench = OpenMessagingBenchmark (
1174
+ self ._ctx , self .redpanda , "SIMPLE_DRIVER" ,
1175
+ (workload , OMBSampleConfigurations .UNIT_TEST_LATENCY_VALIDATOR
1176
+ | validator_overrides ))
1177
+ _bench .start ()
1178
+ benchmark_time_min = _bench .benchmark_time () + 1
1179
+ _bench .wait (timeout_sec = benchmark_time_min * 60 )
1180
+ _metrics = json .loads (_bench .node .account .ssh_output (_bench .chart_cmd ))
1181
+ return _bench , list (_metrics .values ())[0 ]
1182
+
1183
+ @cluster (num_nodes = 6 , log_allow_list = RESTART_LOG_ALLOW_LIST )
1184
+ @parametrize (partitions = "min" )
1185
+ @parametrize (partitions = "max" )
1186
+ def test_htt_partitions_omb (self , partitions ):
1187
+ def _format_metrics (idle , tier ):
1188
+ keys = idle .keys ()
1189
+ return "\n " .join ([f"{ k } = { idle [k ]} / { tier [k ]} " for k in keys ])
1190
+
1191
+ # Get values for almost idle cluster load
1192
+ _min_idle_lat = 1000
1193
+ # Assume we have 1 partition per shard,
1194
+ # then number of CPU should be equal to min number of partitions
1195
+ # to get idle-like activity
1196
+ _num_partitions = 8
1197
+
1198
+ if partitions not in ["min" , "max" ]:
1199
+ raise RuntimeError ("Test parameter for partitions invalid" )
1200
+
1201
+ idle_validators = {
1202
+ OMBSampleConfigurations .E2E_LATENCY_50PCT :
1203
+ [OMBSampleConfigurations .lte (_min_idle_lat )],
1204
+ OMBSampleConfigurations .E2E_LATENCY_AVG :
1205
+ [OMBSampleConfigurations .lte (_min_idle_lat * 3 )],
1206
+ }
1207
+ idle_workload = self ._prepare_omb_workload (1 , 2 , _num_partitions ,
1208
+ 1 * MiB , 8 * KiB )
1209
+ _ , idle_metrics = self ._run_bench (idle_workload , idle_validators )
1210
+
1211
+ # Get values for idle workload
1212
+ k_e2e_50pct = idle_metrics [OMBSampleConfigurations .E2E_LATENCY_50PCT ]
1213
+ k_e2e_avg = idle_metrics [OMBSampleConfigurations .E2E_LATENCY_AVG ]
1214
+
1215
+ # Calculate target throughput latencies
1216
+ target_e2e_50pct = k_e2e_50pct + 51
1217
+ target_e2e_avg = k_e2e_avg + 145
1218
+
1219
+ # Measure with target load
1220
+ validator_overrides = {
1221
+ OMBSampleConfigurations .E2E_LATENCY_50PCT :
1222
+ [OMBSampleConfigurations .lte (target_e2e_50pct )],
1223
+ OMBSampleConfigurations .E2E_LATENCY_AVG :
1224
+ [OMBSampleConfigurations .lte (target_e2e_avg )],
1225
+ }
1226
+ # Select number of partitions
1227
+ if partitions == "min" :
1228
+ _num_partitions = self .tier_config .partitions_min
1229
+ elif partitions == "max" :
1230
+ _num_partitions = self .tier_config .partitions_upper_limit
1231
+
1232
+ workload = self ._prepare_omb_workload (1 , 2 , _num_partitions ,
1233
+ self .tier_config .ingress_rate ,
1234
+ 8 * KiB )
1235
+ benchmark , metrics = self ._run_bench (workload , validator_overrides )
1236
+ benchmark .check_succeed ()
1237
+
1238
+ # Tier metrics should not diviate from idle
1239
+ # metrics more than 145 ms on the average
1240
+ self .logger .info ('Workload metrics (idle/tier): '
1241
+ '"{}"' .format (_format_metrics (idle_metrics , metrics )))
0 commit comments