diff --git a/esrally/track/params.py b/esrally/track/params.py index 141cf2977..55894fbd5 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -594,6 +594,8 @@ def __init__(self, track, params, **kwargs): self.on_conflict = None self.recency = None + self.use_create = params.get("use-create", False) + self.corpora = self.used_corpora(track, params) if len(self.corpora) == 0: @@ -644,6 +646,7 @@ def __init__(self, track, params, **kwargs): self.on_conflict, self.recency, self.pipeline, + self.use_create, self._params, ) @@ -705,6 +708,7 @@ def __init__( on_conflict, recency, pipeline=None, + use_create=False, original_params=None, ): """ @@ -719,6 +723,8 @@ def __init__( :param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones. May be None. :param pipeline: The name of the ingest pipeline to run. + :param use_create: Should generated bulk operations "create" so that duplicate `_id`s fail (True) or use "index" so that duplicate + `_id`s overwrite (False). :param original_params: The original dict passed to the parent parameter source. """ self.corpora = corpora @@ -732,6 +738,7 @@ def __init__( self.on_conflict = on_conflict self.recency = recency self.pipeline = pipeline + self.use_create = use_create self.original_params = original_params # this is only intended for unit-testing self.create_reader = original_params.pop("__create_reader", create_default_reader) @@ -778,6 +785,7 @@ def _init_internal_params(self): self.recency, self.pipeline, self.original_params, + self.use_create, self.create_reader, ) @@ -912,11 +920,10 @@ def chain(*iterables): def create_default_reader( - docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency + docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, use_create ): source = Slice(io.MmapSource, offset, num_lines) target = None - use_create = False if docs.target_index: target = docs.target_index elif docs.target_data_stream: @@ -952,6 +959,7 @@ def create_readers( conflict_probability: float, on_conflict: str, recency: str, + use_create: bool, create_reader: Callable[..., IndexDataReader], ) -> list[IndexDataReader]: """ @@ -982,7 +990,7 @@ def create_readers( ) if num_docs > 0: reader: IndexDataReader = create_reader( - docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency + docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, use_create ) reader_queue.append(reader) total_readers += 1 @@ -1065,6 +1073,7 @@ def bulk_data_based( recency, pipeline, original_params, + use_create, create_reader=create_default_reader, ): """ @@ -1098,6 +1107,7 @@ def bulk_data_based( conflict_probability, on_conflict, recency, + use_create, create_reader, ) return bulk_generator(chain(*readers), pipeline, original_params) diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 2ca5d21cc..d3d9e1bab 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -1355,6 +1355,7 @@ def test_generate_two_bulks(self): on_conflict=None, recency=None, pipeline=None, + use_create=False, original_params={"my-custom-parameter": "foo", "my-custom-parameter-2": True}, create_reader=self.create_test_reader([["1", "2", "3", "4", "5"], ["6", "7", "8"]]), ) @@ -1431,6 +1432,7 @@ def test_generate_bulks_from_multiple_corpora(self): on_conflict=None, recency=None, pipeline=None, + use_create=False, original_params={"my-custom-parameter": "foo", "my-custom-parameter-2": True}, create_reader=self.create_test_reader([["1", "2", "3", "4", "5"]]), ) @@ -1553,6 +1555,7 @@ def test_internal_params_take_precedence(self): on_conflict=None, recency=None, pipeline=None, + use_create=False, original_params={"body": "foo", "custom-param": "bar"}, create_reader=self.create_test_reader([["1", "2", "3"]]), )