From c146b013f8fd437672f7517f4f7a7a5cc3eada57 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Fri, 26 Jul 2024 14:06:13 +0200 Subject: [PATCH 01/10] Add http status to bulk request metrics --- esrally/driver/runner.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 137bf4a4d..cb7c03ee0 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -562,6 +562,7 @@ def _utf8len(line): total_document_size_bytes = 0 with_action_metadata = mandatory(params, "action-metadata-present", self) + request_status = response.meta.status if isinstance(params["body"], bytes): bulk_lines = params["body"].split(b"\n") elif isinstance(params["body"], str): @@ -598,6 +599,7 @@ def _utf8len(line): shards_histogram[sk]["item-count"] += 1 if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): bulk_error_count += 1 + request_status = max(request_status, data["status"]) self.extract_error_details(error_details, data) else: bulk_success_count += 1 @@ -610,6 +612,7 @@ def _utf8len(line): "shards_histogram": list(shards_histogram.values()), "bulk-request-size-bytes": bulk_request_size_bytes, "total-document-size-bytes": total_document_size_bytes, + "http-status": request_status, } if bulk_error_count > 0: stats["error-type"] = "bulk" @@ -624,6 +627,7 @@ def simple_stats(self, bulk_size, unit, response): bulk_success_count = bulk_size if unit == "docs" else None bulk_error_count = 0 error_details = set() + request_status = response.meta.status # parse lazily on the fast path props = parse(response, ["errors", "took"]) @@ -635,6 +639,7 @@ def simple_stats(self, bulk_size, unit, response): for item in parsed_response["items"]: data = next(iter(item.values())) if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): + request_status = max(request_status, data["status"]) bulk_error_count += 1 self.extract_error_details(error_details, data) else: @@ -644,6 +649,7 @@ def simple_stats(self, bulk_size, unit, response): "success": bulk_error_count == 0, "success-count": bulk_success_count, "error-count": bulk_error_count, + "http-status": request_status, } if bulk_error_count > 0: stats["error-type"] = "bulk" From f4e897d1b214d4c718b02699e3aa8105e790e532 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Fri, 26 Jul 2024 14:45:17 +0200 Subject: [PATCH 02/10] Check if is ApiResponse first --- esrally/driver/runner.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index cb7c03ee0..52537c159 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -32,6 +32,7 @@ from types import FunctionType from typing import List, Optional +from elastic_transport import ApiResponse import ijson from esrally import exceptions, track, types @@ -562,7 +563,10 @@ def _utf8len(line): total_document_size_bytes = 0 with_action_metadata = mandatory(params, "action-metadata-present", self) - request_status = response.meta.status + if isinstance(response, ApiResponse): + request_status = response.meta.status + else: + request_status = -1 if isinstance(params["body"], bytes): bulk_lines = params["body"].split(b"\n") elif isinstance(params["body"], str): @@ -627,7 +631,10 @@ def simple_stats(self, bulk_size, unit, response): bulk_success_count = bulk_size if unit == "docs" else None bulk_error_count = 0 error_details = set() - request_status = response.meta.status + if isinstance(response, ApiResponse): + request_status = response.meta.status + else: + request_status = -1 # parse lazily on the fast path props = parse(response, ["errors", "took"]) From b879ca8691444ae2b25ef984ac4dc1e477ef351e Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Mon, 29 Jul 2024 17:59:55 +0200 Subject: [PATCH 03/10] Add retry --- esrally/driver/runner.py | 54 +++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 52537c159..51d84bbc8 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -32,7 +32,6 @@ from types import FunctionType from typing import List, Optional -from elastic_transport import ApiResponse import ijson from esrally import exceptions, track, types @@ -506,7 +505,6 @@ async def __call__(self, es, params): """ detailed_results = params.get("detailed-results", False) api_kwargs = self._default_kw_params(params) - bulk_params = {} if "timeout" in params: bulk_params["timeout"] = params["timeout"] @@ -534,8 +532,23 @@ async def __call__(self, es, params): response = await es.bulk(params=bulk_params, **api_kwargs) else: response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) - - stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response) + retry_stats = {} + for i in range(3): # this can be configurable later + stats, lines_to_retry = ( + self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response, params) + ) + if len(lines_to_retry) == 0: + stats.update(retry_stats) + break + if len(lines_to_retry) % 2 > 0: + self.logger.warn(f"Lines to retry was not divisible by 2. Actual: {lines_to_retry}") + retry_stats[f"attempt_{i}"] = stats + api_kwargs["body"] = lines_to_retry + bulk_size = len(lines_to_retry) / 2 + response = await es.bulk(params=bulk_params, **api_kwargs) + request_status = response.meta.status + if request_status == 400: + self.logger.warn(f"400 after retry. Payload: {lines_to_retry}") meta_data = { "index": params.get("index"), @@ -562,11 +575,7 @@ def _utf8len(line): bulk_request_size_bytes = 0 total_document_size_bytes = 0 with_action_metadata = mandatory(params, "action-metadata-present", self) - - if isinstance(response, ApiResponse): - request_status = response.meta.status - else: - request_status = -1 + request_status = response.meta.status if isinstance(params["body"], bytes): bulk_lines = params["body"].split(b"\n") elif isinstance(params["body"], str): @@ -627,27 +636,38 @@ def _utf8len(line): return stats - def simple_stats(self, bulk_size, unit, response): + def simple_stats(self, bulk_size, unit, response, params): bulk_success_count = bulk_size if unit == "docs" else None bulk_error_count = 0 error_details = set() - if isinstance(response, ApiResponse): - request_status = response.meta.status - else: - request_status = -1 + request_status = response.meta.status # parse lazily on the fast path props = parse(response, ["errors", "took"]) - + if isinstance(params["body"], bytes): + bulk_lines = params["body"].split(b"\n") + elif isinstance(params["body"], str): + bulk_lines = params["body"].split("\n") + elif isinstance(params["body"], list): + bulk_lines = params["body"] + else: + raise exceptions.DataError("bulk body is not of type bytes, string, or list") + lines_to_retry = [] if props.get("errors", False): # determine success count regardless of unit because we need to iterate through all items anyway bulk_success_count = 0 # Reparse fully in case of errors - this will be slower parsed_response = json.loads(response.getvalue()) - for item in parsed_response["items"]: + for i, item in enumerate(parsed_response["items"]): data = next(iter(item.values())) if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): request_status = max(request_status, data["status"]) bulk_error_count += 1 + if data["status"] == 429: # don't retry other statuses right now. + possible_failed_item = bulk_lines[ + (i * 2) + ] # this is for the "action" line - its possible sometimes this may not be here, we will need to take account of that probably in main function + possible_failed_item = bulk_lines[(i * 2) + 1] + lines_to_retry.append(possible_failed_item) self.extract_error_details(error_details, data) else: bulk_success_count += 1 @@ -661,7 +681,7 @@ def simple_stats(self, bulk_size, unit, response): if bulk_error_count > 0: stats["error-type"] = "bulk" stats["error-description"] = self.error_description(error_details) - return stats + return stats, lines_to_retry def extract_error_details(self, error_details, data): error_data = data.get("error", {}) From bad2ba263c2fdebb5440ae0b52d2b8cfb9b22d38 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Wed, 31 Jul 2024 11:56:02 +0200 Subject: [PATCH 04/10] Fix bulk retry on 429 --- esrally/driver/runner.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 51d84bbc8..2605992f3 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -535,13 +535,11 @@ async def __call__(self, es, params): retry_stats = {} for i in range(3): # this can be configurable later stats, lines_to_retry = ( - self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response, params) + self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response, api_kwargs) ) if len(lines_to_retry) == 0: stats.update(retry_stats) break - if len(lines_to_retry) % 2 > 0: - self.logger.warn(f"Lines to retry was not divisible by 2. Actual: {lines_to_retry}") retry_stats[f"attempt_{i}"] = stats api_kwargs["body"] = lines_to_retry bulk_size = len(lines_to_retry) / 2 @@ -549,7 +547,6 @@ async def __call__(self, es, params): request_status = response.meta.status if request_status == 400: self.logger.warn(f"400 after retry. Payload: {lines_to_retry}") - meta_data = { "index": params.get("index"), "weight": bulk_size, @@ -641,6 +638,7 @@ def simple_stats(self, bulk_size, unit, response, params): bulk_error_count = 0 error_details = set() request_status = response.meta.status + doc_status = -1 # parse lazily on the fast path props = parse(response, ["errors", "took"]) if isinstance(params["body"], bytes): @@ -660,14 +658,11 @@ def simple_stats(self, bulk_size, unit, response, params): for i, item in enumerate(parsed_response["items"]): data = next(iter(item.values())) if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): - request_status = max(request_status, data["status"]) + doc_status = max(doc_status, data["status"]) bulk_error_count += 1 if data["status"] == 429: # don't retry other statuses right now. - possible_failed_item = bulk_lines[ - (i * 2) - ] # this is for the "action" line - its possible sometimes this may not be here, we will need to take account of that probably in main function - possible_failed_item = bulk_lines[(i * 2) + 1] - lines_to_retry.append(possible_failed_item) + lines_to_retry.append(bulk_lines[(i * 2)]) + lines_to_retry.append(bulk_lines[(i * 2) + 1]) self.extract_error_details(error_details, data) else: bulk_success_count += 1 @@ -676,8 +671,11 @@ def simple_stats(self, bulk_size, unit, response, params): "success": bulk_error_count == 0, "success-count": bulk_success_count, "error-count": bulk_error_count, - "http-status": request_status, + "request-status": request_status, } + if doc_status > 0: + stats["doc-status"] = doc_status # if we have not encountered any errors, we will never have inspected the status for each doc + # alternatively we could set it to the same as request-status, even though in reality they tend to be status 201 rather than 200 if bulk_error_count > 0: stats["error-type"] = "bulk" stats["error-description"] = self.error_description(error_details) From a1eba72fd6b1dabdc4aa6a480690d954d3ce3420 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Wed, 31 Jul 2024 20:04:18 +0200 Subject: [PATCH 05/10] Extra stats --- esrally/driver/runner.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 2605992f3..b337c2738 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -532,21 +532,32 @@ async def __call__(self, es, params): response = await es.bulk(params=bulk_params, **api_kwargs) else: response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) - retry_stats = {} + retry_stats = [] + total_success = total_error = total_time = 0 for i in range(3): # this can be configurable later stats, lines_to_retry = ( self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response, api_kwargs) ) + retry_stats.append(stats) if len(lines_to_retry) == 0: - stats.update(retry_stats) break - retry_stats[f"attempt_{i}"] = stats api_kwargs["body"] = lines_to_retry bulk_size = len(lines_to_retry) / 2 response = await es.bulk(params=bulk_params, **api_kwargs) request_status = response.meta.status if request_status == 400: self.logger.warn(f"400 after retry. Payload: {lines_to_retry}") + if len(retry_stats) == 1: + stats = retry_stats[0] + else: + for stats in retry_stats: + total_success += stats["success-count"] + total_error += stats["error-count"] + total_time += stats["took"] + retry_count = len(retry_stats) + + stats = {"success-count": total_success, "error-count": total_error, "retry-count": retry_count, "took": total_time, "success": True, "retried": True} + meta_data = { "index": params.get("index"), "weight": bulk_size, From e6518e43eec7371924431888333cd7615d3e32c3 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Fri, 16 Jan 2026 11:33:44 +0100 Subject: [PATCH 06/10] Improve approach --- esrally/driver/runner.py | 81 +++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 7db1c6936..f6fe5511d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -495,6 +495,7 @@ async def __call__(self, es, params): * ``refresh``: If ``"true"``, Elasticsearch will issue an async refresh to the index; i.e., ``?refresh=true``. If ``"wait_for"``, Elasticsearch issues a synchronous refresh to the index; i.e., ``?refresh=wait_for``. If ``"false""``, Elasticsearch will use refresh defaults; i.e., ``?refresh=false``. + * ``retries_on_429``: the number of times to retry the bulk request on document level 429s. Defaults to 0 (do not retry). """ detailed_results = params.get("detailed-results", False) api_kwargs = self._default_kw_params(params) @@ -514,6 +515,9 @@ async def __call__(self, es, params): with_action_metadata = mandatory(params, "action-metadata-present", self) bulk_size = mandatory(params, "bulk-size", self) unit = mandatory(params, "unit", self) + retries_on_429 = params.get("retries_on_429", 0) + if retries_on_429 > 0: + self.logger.info("Will retry up to %d times on document-level 429s.", retries_on_429) # parse responses lazily in the standard case - responses might be large thus parsing skews results and if no # errors have occurred we only need a small amount of information from the potentially large response. if not detailed_results: @@ -527,19 +531,28 @@ async def __call__(self, es, params): response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) retry_stats = [] total_success = total_error = total_time = 0 - for i in range(3): # this can be configurable later - stats, lines_to_retry = ( - self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response, api_kwargs) - ) + stats, lines_to_retry = ( + self.detailed_stats(params, response, emit_lines_to_retry=retries_on_429 > 0) + if detailed_results + else self.simple_stats(bulk_size, unit, response, api_kwargs, emit_lines_to_retry=retries_on_429 > 0) + ) + for _ in range(retries_on_429): retry_stats.append(stats) if len(lines_to_retry) == 0: break + self.logger.warning("Retrying %d documents that previously resulted in a 429.", len(lines_to_retry) / 2) api_kwargs["body"] = lines_to_retry - bulk_size = len(lines_to_retry) / 2 + bulk_size = len(lines_to_retry) / 2 # at this point the data always contains action metadata. response = await es.bulk(params=bulk_params, **api_kwargs) + stats, lines_to_retry = ( + self.detailed_stats(params, response, emit_lines_to_retry=True) + if detailed_results + else self.simple_stats(bulk_size, unit, response, api_kwargs, emit_lines_to_retry=True) + ) request_status = response.meta.status - if request_status == 400: - self.logger.warn(f"400 after retry. Payload: {lines_to_retry}") + if request_status not in (200, 201, 429): + self.logger.warning("%s after bulk request retry. Payload: %s", request_status, lines_to_retry) + break if len(retry_stats) == 1: stats = retry_stats[0] else: @@ -549,11 +562,18 @@ async def __call__(self, es, params): total_time += stats["took"] retry_count = len(retry_stats) - stats = {"success-count": total_success, "error-count": total_error, "retry-count": retry_count, "took": total_time, "success": True, "retried": True} + stats = { + "success-count": total_success, + "error-count": total_error, + "retry-count": retry_count, + "took": total_time, + "success": len(lines_to_retry) == 0, + "retried": retry_count > 0, + } meta_data = { "index": params.get("index"), - "weight": bulk_size, + "weight": params.get("bulk-size"), "unit": unit, } meta_data.update(stats) @@ -561,7 +581,7 @@ async def __call__(self, es, params): meta_data["error-type"] = "bulk" return meta_data - def detailed_stats(self, params, response): + def detailed_stats(self, params, response, emit_lines_to_retry=False): def _utf8len(line): if isinstance(line, bytes): return len(line) @@ -577,14 +597,8 @@ def _utf8len(line): total_document_size_bytes = 0 with_action_metadata = mandatory(params, "action-metadata-present", self) request_status = response.meta.status - if isinstance(params["body"], bytes): - bulk_lines = params["body"].split(b"\n") - elif isinstance(params["body"], str): - bulk_lines = params["body"].split("\n") - elif isinstance(params["body"], list): - bulk_lines = params["body"] - else: - raise exceptions.DataError("bulk body is not of type bytes, string, or list") + bulk_lines = self.get_bulk_lines(params) + lines_to_retry = [] for line_number, data in enumerate(bulk_lines): line_size = _utf8len(data) @@ -596,7 +610,7 @@ def _utf8len(line): bulk_request_size_bytes += line_size - for item in response["items"]: + for i, item in enumerate(response["items"]): # there is only one (top-level) item op, data = next(iter(item.items())) if op not in ops: @@ -615,6 +629,9 @@ def _utf8len(line): bulk_error_count += 1 request_status = max(request_status, data["status"]) self.extract_error_details(error_details, data) + if data["status"] == 429 and emit_lines_to_retry: # don't retry other statuses right now. + lines_to_retry.append(bulk_lines[(i * 2)]) + lines_to_retry.append(bulk_lines[(i * 2) + 1]) else: bulk_success_count += 1 stats = { @@ -635,9 +652,19 @@ def _utf8len(line): if "ingest_took" in response: stats["ingest_took"] = response["ingest_took"] - return stats + return stats, lines_to_retry + + def get_bulk_lines(self, params): + if isinstance(params["body"], bytes): + return params["body"].split(b"\n") + elif isinstance(params["body"], str): + return params["body"].split("\n") + elif isinstance(params["body"], list): + return params["body"] + else: + raise exceptions.DataError("bulk body is not of type bytes, string, or list") - def simple_stats(self, bulk_size, unit, response, params): + def simple_stats(self, bulk_size, unit, response, params, emit_lines_to_retry=False): bulk_success_count = bulk_size if unit == "docs" else None bulk_error_count = 0 error_details = set() @@ -645,16 +672,10 @@ def simple_stats(self, bulk_size, unit, response, params): doc_status = -1 # parse lazily on the fast path props = parse(response, ["errors", "took"]) - if isinstance(params["body"], bytes): - bulk_lines = params["body"].split(b"\n") - elif isinstance(params["body"], str): - bulk_lines = params["body"].split("\n") - elif isinstance(params["body"], list): - bulk_lines = params["body"] - else: - raise exceptions.DataError("bulk body is not of type bytes, string, or list") + lines_to_retry = [] if props.get("errors", False): + bulk_lines = self.get_bulk_lines(params) # determine success count regardless of unit because we need to iterate through all items anyway bulk_success_count = 0 # Reparse fully in case of errors - this will be slower @@ -664,7 +685,7 @@ def simple_stats(self, bulk_size, unit, response, params): if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): doc_status = max(doc_status, data["status"]) bulk_error_count += 1 - if data["status"] == 429: # don't retry other statuses right now. + if data["status"] == 429 and emit_lines_to_retry: # don't retry other statuses right now. lines_to_retry.append(bulk_lines[(i * 2)]) lines_to_retry.append(bulk_lines[(i * 2) + 1]) self.extract_error_details(error_details, data) From 2996be139a295dbb1ec9a62e032ce23c7b1f81ab Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Fri, 16 Jan 2026 22:55:39 +0100 Subject: [PATCH 07/10] Fixup tests, retain document size for detailed_stats --- esrally/driver/runner.py | 55 ++-- tests/driver/driver_test.py | 6 +- tests/driver/runner_test.py | 575 ++++++++++++++++++++---------------- 3 files changed, 367 insertions(+), 269 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 4071a663c..ba92d887d 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -530,14 +530,14 @@ async def __call__(self, es, params): else: response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) retry_stats = [] - total_success = total_error = total_time = 0 + total_success = total_error = total_time = sum_bulk_request_size_bytes = sum_total_document_size_bytes = 0 stats, lines_to_retry = ( self.detailed_stats(params, response, emit_lines_to_retry=retries_on_429 > 0) if detailed_results else self.simple_stats(bulk_size, unit, response, api_kwargs, emit_lines_to_retry=retries_on_429 > 0) ) + retry_stats.append(stats) for _ in range(retries_on_429): - retry_stats.append(stats) if len(lines_to_retry) == 0: break self.logger.warning("Retrying %d documents that previously resulted in a 429.", len(lines_to_retry) / 2) @@ -549,6 +549,7 @@ async def __call__(self, es, params): if detailed_results else self.simple_stats(bulk_size, unit, response, api_kwargs, emit_lines_to_retry=True) ) + retry_stats.append(stats) request_status = response.meta.status if request_status not in (200, 201, 429): self.logger.warning("%s after bulk request retry. Payload: %s", request_status, lines_to_retry) @@ -560,16 +561,31 @@ async def __call__(self, es, params): total_success += stats["success-count"] total_error += stats["error-count"] total_time += stats["took"] + sum_bulk_request_size_bytes += stats["bulk-request-size-bytes"] + sum_total_document_size_bytes += stats["total-document-size-bytes"] retry_count = len(retry_stats) - - stats = { - "success-count": total_success, - "error-count": total_error, - "retry-count": retry_count, - "took": total_time, - "success": len(lines_to_retry) == 0, - "retried": retry_count > 0, - } + if detailed_results: + stats = { + "success-count": total_success, + "error-count": total_error, + "retry-count": retry_count, + "took": total_time, + "success": len(lines_to_retry) == 0, + "retried": retry_count > 0, + "bulk-request-size-bytes": sum_bulk_request_size_bytes, + "total-document-size-bytes": sum_total_document_size_bytes, + "ops": {}, # detailed per-op stats are not aggregated over retries + "shards_histogram": [], # detailed per-shard stats are not aggregated over retries + } + else: + stats = { + "success-count": total_success, + "error-count": total_error, + "retry-count": retry_count, + "took": total_time, + "success": len(lines_to_retry) == 0, + "retried": retry_count > 0, + } meta_data = { "index": params.get("index"), @@ -609,7 +625,7 @@ def _utf8len(line): total_document_size_bytes += line_size bulk_request_size_bytes += line_size - + max_doc_status = -1 for i, item in enumerate(response["items"]): # there is only one (top-level) item op, data = next(iter(item.items())) @@ -627,7 +643,7 @@ def _utf8len(line): shards_histogram[sk]["item-count"] += 1 if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): bulk_error_count += 1 - request_status = max(request_status, data["status"]) + max_doc_status = max(max_doc_status, data["status"]) self.extract_error_details(error_details, data) if data["status"] == 429 and emit_lines_to_retry: # don't retry other statuses right now. lines_to_retry.append(bulk_lines[(i * 2)]) @@ -643,7 +659,8 @@ def _utf8len(line): "shards_histogram": list(shards_histogram.values()), "bulk-request-size-bytes": bulk_request_size_bytes, "total-document-size-bytes": total_document_size_bytes, - "http-status": request_status, + "request-status": request_status, + "max-doc-status": max_doc_status if max_doc_status > 0 else request_status, } if bulk_error_count > 0: stats["error-type"] = "bulk" @@ -669,7 +686,7 @@ def simple_stats(self, bulk_size, unit, response, params, emit_lines_to_retry=Fa bulk_error_count = 0 error_details = set() request_status = response.meta.status - doc_status = -1 + max_doc_status = -1 # parse lazily on the fast path props = parse(response, ["errors", "took"]) @@ -682,8 +699,8 @@ def simple_stats(self, bulk_size, unit, response, params, emit_lines_to_retry=Fa parsed_response = json.loads(response.getvalue()) for i, item in enumerate(parsed_response["items"]): data = next(iter(item.values())) + max_doc_status = max(max_doc_status, data["status"]) if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0): - doc_status = max(doc_status, data["status"]) bulk_error_count += 1 if data["status"] == 429 and emit_lines_to_retry: # don't retry other statuses right now. lines_to_retry.append(bulk_lines[(i * 2)]) @@ -697,10 +714,10 @@ def simple_stats(self, bulk_size, unit, response, params, emit_lines_to_retry=Fa "success-count": bulk_success_count, "error-count": bulk_error_count, "request-status": request_status, + "max-doc-status": ( + max_doc_status if max_doc_status > 0 else request_status + ), # if we have not encountered any errors, we will never have inspected the status for each doc } - if doc_status > 0: - stats["doc-status"] = doc_status # if we have not encountered any errors, we will never have inspected the status for each doc - # alternatively we could set it to the same as request-status, even though in reality they tend to be status 201 rather than 200 if bulk_error_count > 0: stats["error-type"] = "bulk" stats["error-description"] = self.error_description(error_details) diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index b35147ed6..784b7379a 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -1465,8 +1465,12 @@ def setup_method(self, method): async def test_execute_schedule_in_throughput_mode(self, es): task_start = time.perf_counter() es.new_request_context.return_value = self.StaticRequestTiming(task_start=task_start) + _headers = elastic_transport.HttpHeaders() + _headers["content-type"] = "application/json" + _node = elastic_transport.NodeConfig(scheme="http", host="localhost", port=9200) + _meta = elastic_transport.ApiResponseMeta(status=200, http_version="1.1", headers=_headers, duration=0.05, node=_node) - es.bulk = mock.AsyncMock(return_value=io.BytesIO(b'{"errors": false, "took": 8}')) + es.bulk = mock.AsyncMock(return_value=elastic_transport.ApiResponse(body=io.BytesIO(b'{"errors": false, "took": 8}'), meta=_meta)) params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource) test_track = track.Track(name="unittest", description="unittest track", indices=None, challenges=None) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index ae2875dd9..fc829b6ce 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -27,6 +27,7 @@ import elastic_transport import elasticsearch import pytest +from elastic_transport import ApiResponse, ApiResponseMeta, HttpHeaders, NodeConfig from esrally import client, config, exceptions from esrally.client.asynchronous import RallyAsyncElasticsearch @@ -408,6 +409,12 @@ def _build_bulk_body(*lines): class TestBulkIndexRunner: + _headers = HttpHeaders() + _headers["content-type"] = "application/json" + _node = NodeConfig(scheme="http", host="localhost", port=9200) + + BULK_RESPONSE_META = ApiResponseMeta(status=200, http_version="1.1", headers=_headers, duration=0.1, node=_node) + @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_bulk_index_missing_params(self, es): @@ -415,7 +422,9 @@ async def test_bulk_index_missing_params(self, es): "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -444,7 +453,10 @@ async def test_bulk_index_success_with_timeout(self, es): "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -466,6 +478,8 @@ async def test_bulk_index_success_with_timeout(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": None, "weight": 3, @@ -484,7 +498,9 @@ async def test_bulk_index_success_with_metadata(self, es): "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -505,6 +521,8 @@ async def test_bulk_index_success_with_metadata(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": None, "weight": 3, @@ -523,7 +541,9 @@ async def test_simple_bulk_with_timeout_and_headers(self, es): "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -546,6 +566,8 @@ async def test_simple_bulk_with_timeout_and_headers(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": "test1", "weight": 3, @@ -572,7 +594,9 @@ async def test_bulk_index_success_without_metadata_with_doc_type(self, es): "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() bulk_params = { @@ -591,6 +615,8 @@ async def test_bulk_index_success_without_metadata_with_doc_type(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": "test-index", "weight": 3, @@ -609,7 +635,9 @@ async def test_bulk_index_success_without_metadata_and_without_doc_type(self, es "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() bulk_params = { @@ -627,6 +655,8 @@ async def test_bulk_index_success_without_metadata_and_without_doc_type(self, es result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": "test-index", "weight": 3, @@ -641,7 +671,7 @@ async def test_bulk_index_success_without_metadata_and_without_doc_type(self, es @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_bulk_index_error(self, es): - bulk_response = { + bulk_response_body = { "took": 5, "errors": True, "items": [ @@ -650,8 +680,9 @@ async def test_bulk_index_error(self, es): {"index": {"status": 404, "_shards": {"total": 2, "successful": 0, "failed": 2}}}, ], } - - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response_body).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -673,6 +704,8 @@ async def test_bulk_index_error(self, es): result = await bulk(es, bulk_params) assert result == { + "max-doc-status": 500, + "request-status": 200, "took": 5, "index": "test", "weight": 3, @@ -723,7 +756,9 @@ async def test_bulk_index_error_no_shards(self, es): ], } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -746,6 +781,8 @@ async def test_bulk_index_error_no_shards(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 429, "took": 20, "index": "test", "weight": 3, @@ -819,7 +856,9 @@ async def test_mixed_bulk_with_simple_stats(self, es): }, ], } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() bulk_params = { @@ -843,6 +882,8 @@ async def test_mixed_bulk_with_simple_stats(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 500, "took": 30, "index": "test", "weight": 4, @@ -861,89 +902,92 @@ async def test_mixed_bulk_with_simple_stats(self, es): @pytest.mark.asyncio async def test_mixed_bulk_with_detailed_stats_body_as_string(self, es): es.bulk = mock.AsyncMock( - return_value={ - "took": 30, - "ingest_took": 20, - "errors": True, - "items": [ - { - "index": { - "_index": "test", - "_type": "type1", - "_id": "1", - "_version": 1, - "result": "created", - "_shards": {"total": 2, "successful": 1, "failed": 0}, - "created": True, - "status": 201, - "_seq_no": 0, - } - }, - { - "update": { - "_index": "test", - "_type": "type1", - "_id": "2", - "_version": 2, - "result": "updated", - "_shards": {"total": 2, "successful": 1, "failed": 0}, - "status": 200, - "_seq_no": 1, - } - }, - { - "index": { - "_index": "test", - "_type": "type1", - "_id": "3", - "_version": 1, - "result": "noop", - "_shards": {"total": 2, "successful": 0, "failed": 2}, - "created": False, - "status": 500, - "_seq_no": -2, - } - }, - { - "index": { - "_index": "test", - "_type": "type1", - "_id": "4", - "_version": 1, - "result": "noop", - "_shards": {"total": 2, "successful": 1, "failed": 1}, - "created": False, - "status": 500, - "_seq_no": -2, - } - }, - { - "index": { - "_index": "test", - "_type": "type1", - "_id": "5", - "_version": 1, - "result": "created", - "_shards": {"total": 2, "successful": 1, "failed": 0}, - "created": True, - "status": 201, - "_seq_no": 4, - } - }, - { - "update": { - "_index": "test", - "_type": "type1", - "_id": "6", - "_version": 2, - "result": "noop", - "_shards": {"total": 2, "successful": 0, "failed": 2}, - "status": 404, - "_seq_no": 5, - } - }, - ], - } + return_value=ApiResponse( + body={ + "took": 30, + "ingest_took": 20, + "errors": True, + "items": [ + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "created": True, + "status": 201, + "_seq_no": 0, + } + }, + { + "update": { + "_index": "test", + "_type": "type1", + "_id": "2", + "_version": 2, + "result": "updated", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "status": 200, + "_seq_no": 1, + } + }, + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "3", + "_version": 1, + "result": "noop", + "_shards": {"total": 2, "successful": 0, "failed": 2}, + "created": False, + "status": 500, + "_seq_no": -2, + } + }, + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "4", + "_version": 1, + "result": "noop", + "_shards": {"total": 2, "successful": 1, "failed": 1}, + "created": False, + "status": 500, + "_seq_no": -2, + } + }, + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "5", + "_version": 1, + "result": "created", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "created": True, + "status": 201, + "_seq_no": 4, + } + }, + { + "update": { + "_index": "test", + "_type": "type1", + "_id": "6", + "_version": 2, + "result": "noop", + "_shards": {"total": 2, "successful": 0, "failed": 2}, + "status": 404, + "_seq_no": 5, + } + }, + ], + }, + meta=self.BULK_RESPONSE_META, + ) ) bulk = runner.BulkIndex() @@ -972,6 +1016,8 @@ async def test_mixed_bulk_with_detailed_stats_body_as_string(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 500, "took": 30, "ingest_took": 20, "index": "test", @@ -1005,26 +1051,29 @@ async def test_mixed_bulk_with_detailed_stats_body_as_string(self, es): @pytest.mark.asyncio async def test_simple_bulk_with_detailed_stats_body_as_list(self, es): es.bulk = mock.AsyncMock( - return_value={ - "took": 30, - "ingest_took": 20, - "errors": False, - "items": [ - { - "index": { - "_index": "test", - "_type": "type1", - "_id": "1", - "_version": 1, - "result": "created", - "_shards": {"total": 2, "successful": 1, "failed": 0}, - "created": True, - "status": 201, - "_seq_no": 0, + return_value=ApiResponse( + body={ + "took": 30, + "ingest_took": 20, + "errors": False, + "items": [ + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "created": True, + "status": 201, + "_seq_no": 0, + } } - } - ], - } + ], + }, + meta=self.BULK_RESPONSE_META, + ) ) bulk = runner.BulkIndex() @@ -1043,6 +1092,8 @@ async def test_simple_bulk_with_detailed_stats_body_as_list(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 30, "ingest_took": 20, "index": "test", @@ -1074,26 +1125,29 @@ async def test_simple_bulk_with_detailed_stats_body_as_list(self, es): @pytest.mark.asyncio async def test_simple_bulk_with_detailed_stats_body_as_bytes(self, es): es.bulk = mock.AsyncMock( - return_value={ - "took": 30, - "ingest_took": 20, - "errors": False, - "items": [ - { - "index": { - "_index": "bytes", - "_type": "bytes1", - "_id": "1", - "_version": 1, - "result": "created", - "_shards": {"total": 1, "successful": 1, "failed": 0}, - "created": True, - "status": 201, - "_seq_no": 0, + return_value=ApiResponse( + body={ + "took": 30, + "ingest_took": 20, + "errors": False, + "items": [ + { + "index": { + "_index": "bytes", + "_type": "bytes1", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": {"total": 1, "successful": 1, "failed": 0}, + "created": True, + "status": 201, + "_seq_no": 0, + } } - } - ], - } + ], + }, + meta=self.BULK_RESPONSE_META, + ) ) bulk = runner.BulkIndex() @@ -1109,6 +1163,8 @@ async def test_simple_bulk_with_detailed_stats_body_as_bytes(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 30, "ingest_took": 20, "index": "test", @@ -1140,26 +1196,29 @@ async def test_simple_bulk_with_detailed_stats_body_as_bytes(self, es): @pytest.mark.asyncio async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, es): es.bulk = mock.AsyncMock( - return_value={ - "took": 30, - "ingest_took": 20, - "errors": False, - "items": [ - { - "index": { - "_index": "test", - "_type": "type1", - "_id": "1", - "_version": 1, - "result": "created", - "_shards": {"total": 2, "successful": 1, "failed": 0}, - "created": True, - "status": 201, - "_seq_no": 0, + return_value=ApiResponse( + body={ + "took": 30, + "ingest_took": 20, + "errors": False, + "items": [ + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "created": True, + "status": 201, + "_seq_no": 0, + } } - } - ], - } + ], + }, + meta=self.BULK_RESPONSE_META, + ) ) bulk = runner.BulkIndex() @@ -1183,25 +1242,28 @@ async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, e @pytest.mark.asyncio async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es): es.bulk = mock.AsyncMock( - return_value={ - "took": 5, - "errors": True, - "items": [ - { - "create": { - "_index": "test", - "_type": "_doc", - "_id": "6UNLsn0BfMD3e6iftbdV", - "status": 429, - "error": { - "type": "cluster_block_exception", - "reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded " - "flood-stage watermark, index has read-only-allow-delete block];", - }, + return_value=ApiResponse( + body={ + "took": 5, + "errors": True, + "items": [ + { + "create": { + "_index": "test", + "_type": "_doc", + "_id": "6UNLsn0BfMD3e6iftbdV", + "status": 429, + "error": { + "type": "cluster_block_exception", + "reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded " + "flood-stage watermark, index has read-only-allow-delete block];", + }, + } } - } - ], - } + ], + }, + meta=self.BULK_RESPONSE_META, + ) ) bulk = runner.BulkIndex() @@ -1223,6 +1285,8 @@ async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es): mocked_warning_logger.assert_has_calls([mock.call("Bulk request failed: [%s]", result["error-description"])]) assert result == { + "request-status": 200, + "max-doc-status": 429, "took": 5, "index": "test", "weight": 1, @@ -1247,83 +1311,86 @@ async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es): @pytest.mark.asyncio async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es): es.bulk = mock.AsyncMock( - return_value={ - "took": 5, - "errors": True, - "items": [ - { - "create": { - "_index": "test", - "status": 409, - "error": { - "type": "version_conflict_engine_exception", - "reason": "[1]: version conflict, document already exists (current version [1])", - }, - } - }, - { - "create": { - "_index": "test", - "status": 409, - "error": { - "type": "version_conflict_engine_exception", - "reason": "[2]: version conflict, document already exists (current version [1])", - }, - } - }, - { - "create": { - "_index": "test", - "status": 409, - "error": { - "type": "version_conflict_engine_exception", - "reason": "[3]: version conflict, document already exists (current version [1])", - }, - } - }, - { - "create": { - "_index": "test", - "status": 409, - "error": { - "type": "version_conflict_engine_exception", - "reason": "[4]: version conflict, document already exists (current version [1])", - }, - } - }, - { - "create": { - "_index": "test", - "status": 409, - "error": { - "type": "version_conflict_engine_exception", - "reason": "[5]: version conflict, document already exists (current version [1])", - }, - } - }, - { - "create": { - "_index": "test", - "status": 409, - "error": { - "type": "version_conflict_engine_exception", - "reason": "[6]: version conflict, document already exists (current version [1])", - }, - } - }, - { - "create": { - "_index": "test", - "status": 429, - "error": { - "type": "cluster_block_exception", - "reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded " - "flood-stage watermark, index has read-only-allow-delete block];", - }, - } - }, - ], - } + return_value=ApiResponse( + body={ + "took": 5, + "errors": True, + "items": [ + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[1]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[2]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[3]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[4]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[5]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 409, + "error": { + "type": "version_conflict_engine_exception", + "reason": "[6]: version conflict, document already exists (current version [1])", + }, + } + }, + { + "create": { + "_index": "test", + "status": 429, + "error": { + "type": "cluster_block_exception", + "reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded " + "flood-stage watermark, index has read-only-allow-delete block];", + }, + } + }, + ], + }, + meta=self.BULK_RESPONSE_META, + ) ) bulk = runner.BulkIndex() @@ -1357,6 +1424,8 @@ async def test_bulk_index_error_produces_detailed_stats_body_with_limit(self, es mocked_warning_logger.assert_has_calls([mock.call("Bulk request failed: [%s]", result["error-description"])]) assert result == { + "request-status": 200, + "max-doc-status": 429, "took": 5, "index": "test", "weight": 7, @@ -1389,7 +1458,7 @@ async def test_bulk_index_success_with_refresh_default(self, es): "took": 8, "items": [{"create": {"_index": "test", "result": "created", "status": 201}}], } - es.bulk = mock.AsyncMock(return_value=bulk_response) + es.bulk = mock.AsyncMock(return_value=ApiResponse(body=bulk_response, meta=self.BULK_RESPONSE_META)) bulk = runner.BulkIndex() @@ -1407,6 +1476,8 @@ async def test_bulk_index_success_with_refresh_default(self, es): result = await bulk(es, dict(bulk_params)) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": "test", "weight": 1, @@ -1430,7 +1501,7 @@ async def test_bulk_index_success_with_refresh_true(self, es): "took": 8, "items": [{"create": {"_index": "test", "result": "created", "status": 201, "forced_refresh": True}}], } - es.bulk = mock.AsyncMock(return_value=bulk_response) + es.bulk = mock.AsyncMock(return_value=ApiResponse(body=bulk_response, meta=self.BULK_RESPONSE_META)) bulk = runner.BulkIndex() @@ -1448,6 +1519,8 @@ async def test_bulk_index_success_with_refresh_true(self, es): result = await bulk(es, dict(bulk_params)) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": "test", "weight": 1, @@ -1470,7 +1543,9 @@ async def test_bulk_index_success_with_refresh_wait_for(self, es): "errors": False, "took": 8, } - es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode())) + es.bulk = mock.AsyncMock( + return_value=ApiResponse(body=io.BytesIO(json.dumps(bulk_response).encode()), meta=self.BULK_RESPONSE_META) + ) bulk = runner.BulkIndex() @@ -1487,6 +1562,8 @@ async def test_bulk_index_success_with_refresh_wait_for(self, es): result = await bulk(es, bulk_params) assert result == { + "request-status": 200, + "max-doc-status": 200, "took": 8, "index": "test", "weight": 1, From 4f75b96a14977723c0481df1367c0218c047702d Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Tue, 20 Jan 2026 10:55:14 +0100 Subject: [PATCH 08/10] Remove excessive logging --- esrally/driver/runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index ba92d887d..bbdb6b4fa 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -516,8 +516,6 @@ async def __call__(self, es, params): bulk_size = mandatory(params, "bulk-size", self) unit = mandatory(params, "unit", self) retries_on_429 = params.get("retries_on_429", 0) - if retries_on_429 > 0: - self.logger.info("Will retry up to %d times on document-level 429s.", retries_on_429) # parse responses lazily in the standard case - responses might be large thus parsing skews results and if no # errors have occurred we only need a small amount of information from the potentially large response. if not detailed_results: From 04879dbb744934119bde082d1486230778c38b89 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Wed, 21 Jan 2026 13:53:09 +0100 Subject: [PATCH 09/10] Fix for simple --- esrally/driver/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index bbdb6b4fa..91a7c67dc 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -559,8 +559,8 @@ async def __call__(self, es, params): total_success += stats["success-count"] total_error += stats["error-count"] total_time += stats["took"] - sum_bulk_request_size_bytes += stats["bulk-request-size-bytes"] - sum_total_document_size_bytes += stats["total-document-size-bytes"] + sum_bulk_request_size_bytes += stats.get("bulk-request-size-bytes", 0) + sum_total_document_size_bytes += stats.get("total-document-size-bytes", 0) retry_count = len(retry_stats) if detailed_results: stats = { From c2f1704e9d8fc817aaacb1975a62e812478d182c Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Wed, 21 Jan 2026 14:02:34 +0100 Subject: [PATCH 10/10] Remove misleading comment --- esrally/driver/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 91a7c67dc..887ae0d84 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -523,7 +523,6 @@ async def __call__(self, es, params): if with_action_metadata: api_kwargs.pop("index", None) - # only half of the lines are documents response = await es.bulk(params=bulk_params, **api_kwargs) else: response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)