Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1421ad0
Upgrade Elastic search client libraries and urlib3.
fressi-elastic Jan 13, 2026
de1c485
Try enabling client compatibility mode to support ES version 8
fressi-elastic Jan 14, 2026
1011049
Update client compatibility version handling.
fressi-elastic Jan 19, 2026
31a59e3
Drop support for distribution version < 8, add support for version 9
fressi-elastic Jan 19, 2026
169050a
Remove obsolete unit test.
fressi-elastic Jan 19, 2026
494a501
Change track dependency installation: add pip to venv and deprecate u…
fressi-elastic Jan 19, 2026
478846c
It removes unnecessary changes.
fressi-elastic Jan 19, 2026
be8fe13
Skip installing dependencies, when all have been removed.
fressi-elastic Jan 19, 2026
2a05ff1
Use mimetype "application/x-ndjson" as default for bulk requests.
fressi-elastic Jan 19, 2026
46c6f19
Rewrite how headers are being reconbined in perform_request method
fressi-elastic Jan 19, 2026
e9018e7
Update requests version.
fressi-elastic Jan 19, 2026
933bc1d
It uses `docker compose` command instead of the now deprecated old `d…
fressi-elastic Jan 20, 2026
38aafd8
It removes unnecessary dependency.
fressi-elastic Jan 20, 2026
24f6479
It uses `docker compose` command instead of the now deprecated old `d…
fressi-elastic Jan 20, 2026
79e299b
Corrected capitalization of 'rally' to 'Rally'.
fressi-elastic Jan 20, 2026
4ec0a24
Simplify slice `dependency[len(deprecated) : len(deprecated) + 1]`
fressi-elastic Jan 20, 2026
63dabea
Refactor and test function `_filter_requirements`.
fressi-elastic Jan 20, 2026
5c86570
Restore `docker` as Python dependency.
fressi-elastic Jan 20, 2026
40def93
Fix Python executable path in `test_loader.py`
fressi-elastic Jan 20, 2026
f0ae21a
Update Docker client library to version 7.1.0
fressi-elastic Jan 20, 2026
b67e108
Revert wrong typo correction.
fressi-elastic Jan 22, 2026
002cabf
It reverts some changes to let client classes better match the origin…
fressi-elastic Jan 22, 2026
45309bd
Use compatibility mode version 8 when server version is unknown.
fressi-elastic Jan 22, 2026
d86e421
Update minimal supported version in CLI and project documentation.
fressi-elastic Jan 22, 2026
1f7eaa4
Set constructor `hosts` parameter default value to `None` for compati…
fressi-elastic Jan 22, 2026
73ea086
Remove workaround to forbid installation of deprecated packages durin…
fressi-elastic Jan 22, 2026
25db777
It fixes it/docker_dev_image_test.py error handling and update server…
fressi-elastic Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Once your changes and tests are ready to submit for review:

Ensure that all tests pass by running `make check-all`. This runs sequentially lint checks, unit tests and integration tests. These can be executed in isolation using `make lint`, `make test` and `make it` respectively, in case you need to iterate over a subset of tests.

Note: Integration tests are much slower than unit tests and require `docker-compose`.
Note: Integration tests are much slower than unit tests and require `docker compose`.

3. Sign the Contributor License Agreement

Expand Down
2 changes: 1 addition & 1 deletion docs/developing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Install the following software packages:

* `uv <https://docs.astral.sh/uv/getting-started/installation/>`_
* JDK version required to build Elasticsearch. Please refer to the `build setup requirements <https://github.com/elastic/elasticsearch/blob/main/CONTRIBUTING.md#contributing-to-the-elasticsearch-codebase>`_.
* `Docker <https://docs.docker.com/install/>`_ and on Linux additionally `docker-compose <https://docs.docker.com/compose/install/>`_.
* `Docker <https://docs.docker.com/install/>`_ and on Linux additionally `docker compose <https://docs.docker.com/compose/install/>`_.
* `jq <https://stedolan.github.io/jq/download/>`_
* git

Expand Down
47 changes: 21 additions & 26 deletions esrally/client/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL

from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query
from esrally.client.common import (
_WARNING_RE,
_quote_query,
combine_headers,
mimetype_headers_to_compat,
)
from esrally.client.context import RequestContextHolder
from esrally.utils import io, versions

Expand Down Expand Up @@ -330,12 +335,10 @@ async def put_lifecycle(self, *args, **kwargs):


class RallyAsyncElasticsearch(AsyncElasticsearch, RequestContextHolder):
def __init__(self, *args, **kwargs):
distribution_version = kwargs.pop("distribution_version", None)
distribution_flavor = kwargs.pop("distribution_flavor", None)
super().__init__(*args, **kwargs)
def __init__(self, hosts: Any = None, *, distribution_version: str | None = None, distribution_flavor: str | None = None, **kwargs):
super().__init__(hosts, **kwargs)
# skip verification at this point; we've already verified this earlier with the synchronous client.
# The async client is used in the hot code path and we use customized overrides (such as that we don't
# The async client is used in the hot code path, and we use customized overrides (such as that we don't
# parse response bodies in some cases for performance reasons, e.g. when using the bulk API).
self._verified_elasticsearch = True
self.distribution_version = distribution_version
Expand Down Expand Up @@ -364,33 +367,25 @@ async def perform_request(
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
# We need to ensure that we provide content-type and accept headers
headers = combine_headers(self._headers, headers)
assert isinstance(headers, dict)
if body is not None:
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}
else:
if headers.get("content-type") is None:
headers["content-type"] = "application/json"
if headers.get("accept") is None:
headers["accept"] = "application/json"

if headers:
request_headers = self._headers.copy()
request_headers.update(headers)
else:
request_headers = self._headers
# It ensures content-type and accept headers are set.
mimetype = "application/json"
if path.endswith("/_bulk"):
mimetype = "application/x-ndjson"
for header in ("content-type", "accept"):
headers.setdefault(header, mimetype)

# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
# see https://github.com/elastic/elasticsearch/issues/51816
# Not applicable to serverless
if not self.is_serverless:
if versions.is_version_identifier(self.distribution_version) and (
versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0")
):
_mimetype_header_to_compat("Accept", request_headers)
_mimetype_header_to_compat("Content-Type", request_headers)
mimetype_headers_to_compat(headers, self.distribution_version)

if params:
target = f"{path}?{_quote_query(params)}"
Expand All @@ -400,7 +395,7 @@ async def perform_request(
meta, resp_body = await self.transport.perform_request(
method,
target,
headers=request_headers,
headers=headers,
body=body,
request_timeout=self._request_timeout,
max_retries=self._max_retries,
Expand Down
42 changes: 28 additions & 14 deletions esrally/client/common.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
import re
from collections.abc import Mapping
from collections.abc import Mapping, MutableMapping
from datetime import date, datetime
from typing import Any

from elastic_transport.client_utils import percent_encode
from elasticsearch import VERSION


def _client_major_version_to_str(version: tuple) -> str:
return str(version[0])
from elasticsearch import VERSION as ES_VERSION

from esrally.utils import versions

_MAJOR_SERVER_VERSION = str(ES_VERSION[0])
_WARNING_RE = re.compile(r"\"([^\"]*)\"")
_COMPAT_MIMETYPE_TEMPLATE = "application/vnd.elasticsearch+%s; compatible-with=" + _client_major_version_to_str(VERSION)
_COMPAT_MIMETYPE_RE = re.compile(r"application/(json|x-ndjson|vnd\.mapbox-vector-tile)")
_COMPAT_MIMETYPE_SUB = _COMPAT_MIMETYPE_TEMPLATE % (r"\g<1>",)


def _mimetype_header_to_compat(header, request_headers):
# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
mimetype = request_headers.get(header, None) if request_headers else None
if mimetype:
request_headers[header] = _COMPAT_MIMETYPE_RE.sub(_COMPAT_MIMETYPE_SUB, mimetype)
def combine_headers(*args: Mapping[str, str] | None) -> dict[str, str]:
combined_headers: dict[str, str] = {}
for header_mapping in args:
if header_mapping:
combined_headers.update(header_mapping)
return combined_headers


def mimetype_headers_to_compat(headers: MutableMapping[str, str], distribution_version: str | None) -> None:
if not headers:
return
if not versions.is_version_identifier(distribution_version):
return
major_version = versions.Version.from_string(distribution_version).major
if major_version < 8:
return

for header in ("accept", "content-type"):
mimetype = headers.get(header)
if not mimetype:
continue
headers[header] = _COMPAT_MIMETYPE_RE.sub(
"application/vnd.elasticsearch+%s; compatible-with=%s" % (r"\g<1>", major_version), mimetype
)


def _escape(value: Any) -> str:
Expand Down
2 changes: 1 addition & 1 deletion esrally/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async def on_request_end(session, trace_config_ctx, params):
hosts=self.hosts,
transport_class=RallyAsyncTransport,
ssl_context=self.ssl_context,
maxsize=self.max_connections,
connections_per_node=self.max_connections,
**self.client_options,
)

Expand Down
44 changes: 21 additions & 23 deletions esrally/client/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
UnsupportedProductError,
)

from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query
from esrally.client.common import (
_WARNING_RE,
_quote_query,
combine_headers,
mimetype_headers_to_compat,
)
from esrally.utils import versions


Expand Down Expand Up @@ -148,25 +153,14 @@ def perform_request(
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
# We need to ensure that we provide content-type and accept headers
if body is not None:
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}
else:
if headers.get("content-type") is None:
headers["content-type"] = "application/json"
if headers.get("accept") is None:
headers["accept"] = "application/json"

if headers:
request_headers = self._headers.copy()
request_headers.update(headers)
else:
request_headers = self._headers
headers = combine_headers(self._headers, headers)
assert isinstance(headers, dict)

if self._verified_elasticsearch is None:
info = self.transport.perform_request(method="GET", target="/", headers=request_headers)
info = self.transport.perform_request(method="GET", target="/", headers=headers)
info_meta = info.meta
info_body = info.body

Expand All @@ -178,15 +172,19 @@ def perform_request(
if self._verified_elasticsearch is not True:
_ProductChecker.raise_error(self._verified_elasticsearch, info_meta, info_body)

if body is not None:
# It ensures content-type and accept headers are set.
mimetype = "application/json"
if path.endswith("/_bulk"):
mimetype = "application/x-ndjson"
for header in ("content-type", "accept"):
headers.setdefault(header, mimetype)

# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
# see https://github.com/elastic/elasticsearch/issues/51816
if not self.is_serverless:
if versions.is_version_identifier(self.distribution_version) and (
versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0")
):
_mimetype_header_to_compat("Accept", headers)
_mimetype_header_to_compat("Content-Type", headers)
mimetype_headers_to_compat(headers, self.distribution_version)

if params:
target = f"{path}?{_quote_query(params)}"
Expand All @@ -196,7 +194,7 @@ def perform_request(
meta, resp_body = self.transport.perform_request(
method,
target,
headers=request_headers,
headers=headers,
body=body,
request_timeout=self._request_timeout,
max_retries=self._max_retries,
Expand Down
2 changes: 1 addition & 1 deletion esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _start_process(self, binary_path):
self._wait_for_healthy_running_container(container_id, DockerLauncher.PROCESS_WAIT_TIMEOUT_SECONDS)

def _docker_compose(self, compose_config, cmd):
return "docker-compose -f {} {}".format(os.path.join(compose_config, "docker-compose.yml"), cmd)
return "docker compose -f {} {}".format(os.path.join(compose_config, "docker-compose.yml"), cmd)

def _get_container_id(self, compose_config):
compose_ps_cmd = self._docker_compose(compose_config, "ps -q")
Expand Down
60 changes: 46 additions & 14 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import tempfile
import urllib.error
from collections.abc import Callable, Generator
from collections.abc import Callable, Generator, Iterable

import jinja2
import jinja2.exceptions
Expand Down Expand Up @@ -231,19 +231,51 @@ def load_track(cfg: types.Config, install_dependencies=False):
return _load_single_track(cfg, repo, repo.track_name, install_dependencies)


def _install_dependencies(dependencies):
if dependencies:
log_path = os.path.join(paths.logs(), "dependency.log")
console.info(f"Installing track dependencies [{', '.join(dependencies)}]")
try:
with open(log_path, "ab") as install_log:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", *dependencies, "--upgrade", "--target", paths.libs()],
stdout=install_log,
stderr=install_log,
)
except subprocess.CalledProcessError:
raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See [{install_log.name}] for more information.")
DEPRECATED_DEPENDENCIES = [
"elasticsearch",
"elastic-transport",
]


def _remove_deprecated_dependency(dependencies: Iterable[str]) -> tuple[list[str], list[str]]:
invalid: list[str] = []
valid: list[str] = []
for dependency in dependencies:
for deprecated in DEPRECATED_DEPENDENCIES:
if dependency.startswith(deprecated) and dependency[len(deprecated) : len(deprecated) + 1] in "<=>!~":
invalid.append(deprecated)
break
else:
valid.append(dependency)
return valid, invalid


def _install_dependencies(dependencies: Iterable[str]):
dependencies, deprecated = _remove_deprecated_dependency(set(dependencies))
if deprecated:
message = (
f"Track dependencies are deprecated: {', '.join(deprecated)}. "
f"Please update the track to rely on the 'elasticsearch' library version required by rally. "
f"Rally no longer supports installing deprecated Elasticsearch client libraries."
)
LOG.warning(message)
console.warn(message)

if not dependencies:
LOG.debug("No track dependency to be installed.")
return

log_path = os.path.join(paths.logs(), "dependency.log")
console.info(f"Installing track dependencies [{', '.join(dependencies)}]")
try:
with open(log_path, "ab") as install_log:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", *dependencies, "--upgrade", "--target", paths.libs()],
stdout=install_log,
stderr=install_log,
)
except subprocess.CalledProcessError:
raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See [{install_log.name}] for more information.")


def _load_single_track(cfg: types.Config, track_repository, track_name, install_dependencies=False):
Expand Down
2 changes: 1 addition & 1 deletion esrally/utils/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _versions_pattern(strict):
return VERSIONS if strict else VERSIONS_OPTIONAL


def is_version_identifier(text, strict=True):
def is_version_identifier(text: str | None, strict: bool = True) -> bool:
return text is not None and _versions_pattern(strict).match(text) is not None


Expand Down
5 changes: 1 addition & 4 deletions it/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
from esrally.utils import process

CONFIG_NAMES = ["in-memory-it", "es-it"]
DISTRIBUTIONS = ["8.4.0"]
# There are no ARM distribution artefacts for 6.8.0, which can't be tested on Apple Silicon
if platform.machine() != "arm64":
DISTRIBUTIONS.insert(0, "6.8.0")
DISTRIBUTIONS = ["8.4.0", "9.2.4"]
TRACKS = ["geonames", "nyc_taxis", "http_logs", "nested"]
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))

Expand Down
2 changes: 1 addition & 1 deletion it/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def check_prerequisites():
print("Checking prerequisites...")
if process.run_subprocess_with_logging("docker ps") != 0:
raise AssertionError("Docker must be installed and the daemon must be up and running to run integration tests.")
if process.run_subprocess_with_logging("docker-compose --help") != 0:
if process.run_subprocess_with_logging("docker compose --help") != 0:
raise AssertionError("Docker Compose is required to run integration tests.")


Expand Down
6 changes: 3 additions & 3 deletions it/docker_dev_image_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_docker_override_cmd():
def run_docker_compose_test(test_command):
try:
if run_docker_compose_up(test_command) != 0:
raise AssertionError(f"The docker-compose test failed with test command: {test_command}")
raise AssertionError(f"The docker compose test failed with test command: {test_command}")
finally:
# Always ensure proper cleanup regardless of results
run_docker_compose_down()
Expand All @@ -64,11 +64,11 @@ def run_docker_compose_up(test_command):
env_variables["RALLY_VERSION_TAG"] = version.__version__

return process.run_subprocess_with_logging(
f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml up --abort-on-container-exit",
f"docker compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml up --abort-on-container-exit",
env=env_variables,
)


def run_docker_compose_down():
if process.run_subprocess_with_logging(f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml down -v") != 0:
if process.run_subprocess_with_logging(f"docker compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml down -v") != 0:
raise AssertionError("Failed to stop running containers from docker-compose-tests.yml")
Loading