diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 442bec544..b7c114d1d 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1018,6 +1018,20 @@ def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_ self.throughput_calculator = ThroughputCalculator() self.downsample_factor = downsample_factor + def put_sample(self, sample, name, unit, meta_data): + self.metrics_store.put_value_cluster_level( + name=name, + value=convert.seconds_to_ms(getattr(sample, name)), + unit=unit, + task=sample.task.name, + operation=sample.operation_name, + operation_type=sample.operation_type, + sample_type=sample.sample_type, + absolute_time=sample.absolute_time, + relative_time=sample.relative_time, + meta_data=meta_data, + ) + def __call__(self, raw_samples): if len(raw_samples) == 0: return @@ -1036,58 +1050,40 @@ def __call__(self, raw_samples): sample.request_meta_data, client_id_meta_data, ) - - self.metrics_store.put_value_cluster_level( - name="latency", - value=convert.seconds_to_ms(sample.latency), - unit="ms", - task=sample.task.name, - operation=sample.operation_name, - operation_type=sample.operation_type, - sample_type=sample.sample_type, - absolute_time=sample.absolute_time, - relative_time=sample.relative_time, - meta_data=meta_data, + self.put_sample( + sample, + "latency", + "ms", + meta_data, ) - - self.metrics_store.put_value_cluster_level( - name="service_time", - value=convert.seconds_to_ms(sample.service_time), - unit="ms", - task=sample.task.name, - operation=sample.operation_name, - operation_type=sample.operation_type, - sample_type=sample.sample_type, - absolute_time=sample.absolute_time, - relative_time=sample.relative_time, - meta_data=meta_data, + self.put_sample( + sample, + "service_time", + "ms", + meta_data, ) - - self.metrics_store.put_value_cluster_level( - name="processing_time", - value=convert.seconds_to_ms(sample.processing_time), - unit="ms", - task=sample.task.name, - operation=sample.operation_name, - operation_type=sample.operation_type, - sample_type=sample.sample_type, - absolute_time=sample.absolute_time, - relative_time=sample.relative_time, - meta_data=meta_data, + self.put_sample( + sample, + "processing_time", + "ms", + meta_data, ) for timing in sample.dependent_timings: - self.metrics_store.put_value_cluster_level( - name="service_time", - value=convert.seconds_to_ms(timing.service_time), - unit="ms", - task=timing.task.name, - operation=timing.operation_name, - operation_type=timing.operation_type, - sample_type=timing.sample_type, - absolute_time=timing.absolute_time, - relative_time=timing.relative_time, - meta_data=self.merge(timing.request_meta_data, client_id_meta_data), + if isinstance(timing, dict) and "dependent_timings" in timing: + for sub_timing in timing.get("dependent_timings", []): + self.put_sample( + sub_timing, + "service_time", + "ms", + self.merge(sub_timing.request_meta_data, client_id_meta_data), + ) + + self.put_sample( + timing, + "service_time", + "ms", + self.merge(timing.request_meta_data, client_id_meta_data), ) end = time.perf_counter() @@ -1546,26 +1542,51 @@ def dependent_timings(self): for t in self._dependent_timing: timing = t.pop("dependent_timing") meta_data = self._merge(self.request_meta_data, t) - yield Sample( - self.client_id, - timing["absolute_time"], - timing["request_start"], - self.task_start, - self.task, - self.sample_type, - meta_data, - 0, - timing["service_time"], - 0, - 0, - self.total_ops, - self.total_ops_unit, - self.time_period, - self.percent_completed, - None, - timing["operation"], - timing["operation-type"], - ) + if isinstance(timing, dict): + yield Sample( + self.client_id, + timing["absolute_time"], + timing["request_start"], + self.task_start, + self.task, + self.sample_type, + meta_data, + 0, + timing["service_time"], + 0, + 0, + self.total_ops, + self.total_ops_unit, + self.time_period, + self.percent_completed, + None, + timing["operation"], + timing["operation-type"], + ) + else: + for sub_timing in timing: + st = sub_timing.pop("dependent_timing", None) + sub_meta_data = self._merge(meta_data, sub_timing) + yield Sample( + self.client_id, + st["absolute_time"], + st["request_start"], + self.task_start, + self.task, + self.sample_type, + sub_meta_data, + 0, + st["service_time"], + 0, + 0, + self.total_ops, + self.total_ops_unit, + self.time_period, + self.percent_completed, + None, + st["operation"], + st["operation-type"], + ) def __repr__(self, *args, **kwargs): return ( diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index ce2d00262..f059a28c1 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2654,6 +2654,7 @@ def __init__(self, *args, **kwargs): "delete-async-search", "field-caps", ] + self.operations_without_request_timing = ["composite"] async def run_stream(self, es, stream, connection_limit): streams = [] @@ -2674,7 +2675,10 @@ async def run_stream(self, es, stream, connection_limit): raise exceptions.RallyAssertionError( f"Unsupported operation-type [{op_type}]. Use one of [{', '.join(self.supported_op_types)}]." ) - runner = RequestTiming(runner_for(op_type)) + if op_type not in self.operations_without_request_timing: + runner = RequestTiming(runner_for(op_type)) + else: + runner = runner_for(op_type) async with connection_limit: async with runner: response = await runner({"default": es}, item)