Skip to content
119 changes: 98 additions & 21 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,10 @@ async def __call__(self, es, params):
* ``refresh``: If ``"true"``, Elasticsearch will issue an async refresh to the index; i.e., ``?refresh=true``.
If ``"wait_for"``, Elasticsearch issues a synchronous refresh to the index; i.e., ``?refresh=wait_for``.
If ``"false""``, Elasticsearch will use refresh defaults; i.e., ``?refresh=false``.
* ``retries_on_429``: the number of times to retry the bulk request on document level 429s. Defaults to 0 (do not retry).
"""
detailed_results = params.get("detailed-results", False)
api_kwargs = self._default_kw_params(params)

bulk_params = {}
if "timeout" in params:
bulk_params["timeout"] = params["timeout"]
Expand All @@ -515,31 +515,86 @@ async def __call__(self, es, params):
with_action_metadata = mandatory(params, "action-metadata-present", self)
bulk_size = mandatory(params, "bulk-size", self)
unit = mandatory(params, "unit", self)
retries_on_429 = params.get("retries_on_429", 0)
# parse responses lazily in the standard case - responses might be large thus parsing skews results and if no
# errors have occurred we only need a small amount of information from the potentially large response.
if not detailed_results:
es.return_raw_response()

if with_action_metadata:
api_kwargs.pop("index", None)
# only half of the lines are documents
response = await es.bulk(params=bulk_params, **api_kwargs)
else:
response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)

stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response)
retry_stats = []
total_success = total_error = total_time = sum_bulk_request_size_bytes = sum_total_document_size_bytes = 0
stats, lines_to_retry = (
self.detailed_stats(params, response, emit_lines_to_retry=retries_on_429 > 0)
if detailed_results
else self.simple_stats(bulk_size, unit, response, api_kwargs, emit_lines_to_retry=retries_on_429 > 0)
)
retry_stats.append(stats)
for _ in range(retries_on_429):
if len(lines_to_retry) == 0:
break
self.logger.warning("Retrying %d documents that previously resulted in a 429.", len(lines_to_retry) / 2)
api_kwargs["body"] = lines_to_retry
bulk_size = len(lines_to_retry) / 2 # at this point the data always contains action metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point the data always contains action metadata

This is a slight off-topic:

I've spent some time digging once I saw this comment. I think bulk runner always receives action-and-metadata lines in the body param today (see here). If corpus does not include them they are generated in earlier processing stages. I don't quite understand this from the above code:

        if with_action_metadata:
            api_kwargs.pop("index", None)
            # only half of the lines are documents
            response = await es.bulk(params=bulk_params, **api_kwargs)
        else:
            response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)

The only half of the lines are documents comment suggests the else clause is different, but it isn't. There's nothing in bulk() method of ES client that would magically add action-and-metadata lines. Also doc_type is ignored I think, it's a leftover from old ES versions.

I think we could simplify / remove this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - it seems at one point we calculated the number of documents here, but that was removed and the comment not removed. I'll remove the comment for now, I think i would need to test a little more whether we have code paths that use the with_action_metadata or not

response = await es.bulk(params=bulk_params, **api_kwargs)
stats, lines_to_retry = (
self.detailed_stats(params, response, emit_lines_to_retry=True)
if detailed_results
else self.simple_stats(bulk_size, unit, response, api_kwargs, emit_lines_to_retry=True)
)
retry_stats.append(stats)
request_status = response.meta.status
if request_status not in (200, 201, 429):
self.logger.warning("%s after bulk request retry. Payload: %s", request_status, lines_to_retry)
break
if len(retry_stats) == 1:
stats = retry_stats[0]
else:
for stats in retry_stats:
total_success += stats["success-count"]
total_error += stats["error-count"]
total_time += stats["took"]
sum_bulk_request_size_bytes += stats.get("bulk-request-size-bytes", 0)
sum_total_document_size_bytes += stats.get("total-document-size-bytes", 0)
retry_count = len(retry_stats)
if detailed_results:
stats = {
"success-count": total_success,
"error-count": total_error,
"retry-count": retry_count,
"took": total_time,
"success": len(lines_to_retry) == 0,
"retried": retry_count > 0,
"bulk-request-size-bytes": sum_bulk_request_size_bytes,
"total-document-size-bytes": sum_total_document_size_bytes,
"ops": {}, # detailed per-op stats are not aggregated over retries
"shards_histogram": [], # detailed per-shard stats are not aggregated over retries
}
else:
stats = {
"success-count": total_success,
"error-count": total_error,
"retry-count": retry_count,
"took": total_time,
"success": len(lines_to_retry) == 0,
"retried": retry_count > 0,
}
Comment on lines +564 to +585
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exposes the details of stats at _call__() level which were previously hidden either in simple_stats() or detailed_stats(). Can we avoid this? We could have a method that iterates through response documents, and:

  • calls another method (passed as an argument) that increases stats counters for each document,
  • builds a retry list (optionally).


meta_data = {
"index": params.get("index"),
"weight": bulk_size,
"weight": params.get("bulk-size"),
"unit": unit,
}
meta_data.update(stats)
if not stats["success"]:
meta_data["error-type"] = "bulk"
return meta_data

def detailed_stats(self, params, response):
def detailed_stats(self, params, response, emit_lines_to_retry=False):
def _utf8len(line):
if isinstance(line, bytes):
return len(line)
Expand All @@ -554,15 +609,9 @@ def _utf8len(line):
bulk_request_size_bytes = 0
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)

if isinstance(params["body"], bytes):
bulk_lines = params["body"].split(b"\n")
elif isinstance(params["body"], str):
bulk_lines = params["body"].split("\n")
elif isinstance(params["body"], list):
bulk_lines = params["body"]
else:
raise exceptions.DataError("bulk body is not of type bytes, string, or list")
request_status = response.meta.status
bulk_lines = self.get_bulk_lines(params)
lines_to_retry = []

for line_number, data in enumerate(bulk_lines):
line_size = _utf8len(data)
Expand All @@ -573,8 +622,8 @@ def _utf8len(line):
total_document_size_bytes += line_size

bulk_request_size_bytes += line_size

for item in response["items"]:
max_doc_status = -1
for i, item in enumerate(response["items"]):
# there is only one (top-level) item
op, data = next(iter(item.items()))
if op not in ops:
Expand All @@ -591,7 +640,11 @@ def _utf8len(line):
shards_histogram[sk]["item-count"] += 1
if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0):
bulk_error_count += 1
max_doc_status = max(max_doc_status, data["status"])
self.extract_error_details(error_details, data)
if data["status"] == 429 and emit_lines_to_retry: # don't retry other statuses right now.
lines_to_retry.append(bulk_lines[(i * 2)])
lines_to_retry.append(bulk_lines[(i * 2) + 1])
else:
bulk_success_count += 1
stats = {
Expand All @@ -603,6 +656,8 @@ def _utf8len(line):
"shards_histogram": list(shards_histogram.values()),
"bulk-request-size-bytes": bulk_request_size_bytes,
"total-document-size-bytes": total_document_size_bytes,
"request-status": request_status,
"max-doc-status": max_doc_status if max_doc_status > 0 else request_status,
}
if bulk_error_count > 0:
stats["error-type"] = "bulk"
Expand All @@ -611,24 +666,42 @@ def _utf8len(line):
if "ingest_took" in response:
stats["ingest_took"] = response["ingest_took"]

return stats
return stats, lines_to_retry

def get_bulk_lines(self, params):
if isinstance(params["body"], bytes):
return params["body"].split(b"\n")
elif isinstance(params["body"], str):
return params["body"].split("\n")
elif isinstance(params["body"], list):
return params["body"]
else:
raise exceptions.DataError("bulk body is not of type bytes, string, or list")

def simple_stats(self, bulk_size, unit, response):
def simple_stats(self, bulk_size, unit, response, params, emit_lines_to_retry=False):
bulk_success_count = bulk_size if unit == "docs" else None
bulk_error_count = 0
error_details = set()
request_status = response.meta.status
max_doc_status = -1
# parse lazily on the fast path
props = parse(response, ["errors", "took"])

lines_to_retry = []
if props.get("errors", False):
bulk_lines = self.get_bulk_lines(params)
# determine success count regardless of unit because we need to iterate through all items anyway
bulk_success_count = 0
# Reparse fully in case of errors - this will be slower
parsed_response = json.loads(response.getvalue())
for item in parsed_response["items"]:
for i, item in enumerate(parsed_response["items"]):
data = next(iter(item.values()))
max_doc_status = max(max_doc_status, data["status"])
if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0):
bulk_error_count += 1
if data["status"] == 429 and emit_lines_to_retry: # don't retry other statuses right now.
lines_to_retry.append(bulk_lines[(i * 2)])
lines_to_retry.append(bulk_lines[(i * 2) + 1])
self.extract_error_details(error_details, data)
else:
bulk_success_count += 1
Expand All @@ -637,11 +710,15 @@ def simple_stats(self, bulk_size, unit, response):
"success": bulk_error_count == 0,
"success-count": bulk_success_count,
"error-count": bulk_error_count,
"request-status": request_status,
"max-doc-status": (
max_doc_status if max_doc_status > 0 else request_status
), # if we have not encountered any errors, we will never have inspected the status for each doc
}
if bulk_error_count > 0:
stats["error-type"] = "bulk"
stats["error-description"] = self.error_description(error_details)
return stats
return stats, lines_to_retry

def extract_error_details(self, error_details, data):
error_data = data.get("error", {})
Expand Down
6 changes: 5 additions & 1 deletion tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1465,8 +1465,12 @@ def setup_method(self, method):
async def test_execute_schedule_in_throughput_mode(self, es):
task_start = time.perf_counter()
es.new_request_context.return_value = self.StaticRequestTiming(task_start=task_start)
_headers = elastic_transport.HttpHeaders()
_headers["content-type"] = "application/json"
_node = elastic_transport.NodeConfig(scheme="http", host="localhost", port=9200)
_meta = elastic_transport.ApiResponseMeta(status=200, http_version="1.1", headers=_headers, duration=0.05, node=_node)

es.bulk = mock.AsyncMock(return_value=io.BytesIO(b'{"errors": false, "took": 8}'))
es.bulk = mock.AsyncMock(return_value=elastic_transport.ApiResponse(body=io.BytesIO(b'{"errors": false, "took": 8}'), meta=_meta))

params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource)
test_track = track.Track(name="unittest", description="unittest track", indices=None, challenges=None)
Expand Down
Loading
Loading