Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1421ad0
Upgrade Elastic search client libraries and urlib3.
fressi-elastic Jan 13, 2026
de1c485
Try enabling client compatibility mode to support ES version 8
fressi-elastic Jan 14, 2026
1011049
Update client compatibility version handling.
fressi-elastic Jan 19, 2026
31a59e3
Drop support for distribution version < 8, add support for version 9
fressi-elastic Jan 19, 2026
169050a
Remove obsolete unit test.
fressi-elastic Jan 19, 2026
494a501
Change track dependency installation: add pip to venv and deprecate u…
fressi-elastic Jan 19, 2026
478846c
It removes unnecessary changes.
fressi-elastic Jan 19, 2026
be8fe13
Skip installing dependencies, when all have been removed.
fressi-elastic Jan 19, 2026
2a05ff1
Use mimetype "application/x-ndjson" as default for bulk requests.
fressi-elastic Jan 19, 2026
46c6f19
Rewrite how headers are being reconbined in perform_request method
fressi-elastic Jan 19, 2026
e9018e7
Update requests version.
fressi-elastic Jan 19, 2026
933bc1d
It uses `docker compose` command instead of the now deprecated old `d…
fressi-elastic Jan 20, 2026
38aafd8
It removes unnecessary dependency.
fressi-elastic Jan 20, 2026
24f6479
It uses `docker compose` command instead of the now deprecated old `d…
fressi-elastic Jan 20, 2026
79e299b
Corrected capitalization of 'rally' to 'Rally'.
fressi-elastic Jan 20, 2026
4ec0a24
Simplify slice `dependency[len(deprecated) : len(deprecated) + 1]`
fressi-elastic Jan 20, 2026
63dabea
Refactor and test function `_filter_requirements`.
fressi-elastic Jan 20, 2026
5c86570
Restore `docker` as Python dependency.
fressi-elastic Jan 20, 2026
40def93
Fix Python executable path in `test_loader.py`
fressi-elastic Jan 20, 2026
f0ae21a
Update Docker client library to version 7.1.0
fressi-elastic Jan 20, 2026
b67e108
Revert wrong typo correction.
fressi-elastic Jan 22, 2026
002cabf
It reverts some changes to let client classes better match the origin…
fressi-elastic Jan 22, 2026
45309bd
Use compatibility mode version 8 when server version is unknown.
fressi-elastic Jan 22, 2026
d86e421
Update minimal supported version in CLI and project documentation.
fressi-elastic Jan 22, 2026
1f7eaa4
Set constructor `hosts` parameter default value to `None` for compati…
fressi-elastic Jan 22, 2026
73ea086
Remove workaround to forbid installation of deprecated packages durin…
fressi-elastic Jan 22, 2026
25db777
It fixes it/docker_dev_image_test.py error handling and update server…
fressi-elastic Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Once your changes and tests are ready to submit for review:

Ensure that all tests pass by running `make check-all`. This runs sequentially lint checks, unit tests and integration tests. These can be executed in isolation using `make lint`, `make test` and `make it` respectively, in case you need to iterate over a subset of tests.

Note: Integration tests are much slower than unit tests and require `docker-compose`.
Note: Integration tests are much slower than unit tests and require `docker compose`.

3. Sign the Contributor License Agreement

Expand Down
2 changes: 1 addition & 1 deletion docs/developing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Install the following software packages:

* `uv <https://docs.astral.sh/uv/getting-started/installation/>`_
* JDK version required to build Elasticsearch. Please refer to the `build setup requirements <https://github.com/elastic/elasticsearch/blob/main/CONTRIBUTING.md#contributing-to-the-elasticsearch-codebase>`_.
* `Docker <https://docs.docker.com/install/>`_ and on Linux additionally `docker-compose <https://docs.docker.com/compose/install/>`_.
* `Docker <https://docs.docker.com/install/>`_ and on Linux additionally `docker compose <https://docs.docker.com/compose/install/>`_.
* `jq <https://stedolan.github.io/jq/download/>`_
* git

Expand Down
47 changes: 21 additions & 26 deletions esrally/client/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL

from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query
from esrally.client.common import (
_WARNING_RE,
_quote_query,
combine_headers,
mimetype_headers_to_compat,
)
from esrally.client.context import RequestContextHolder
from esrally.utils import io, versions

Expand Down Expand Up @@ -330,12 +335,10 @@ async def put_lifecycle(self, *args, **kwargs):


class RallyAsyncElasticsearch(AsyncElasticsearch, RequestContextHolder):
def __init__(self, *args, **kwargs):
distribution_version = kwargs.pop("distribution_version", None)
distribution_flavor = kwargs.pop("distribution_flavor", None)
super().__init__(*args, **kwargs)
def __init__(self, hosts: Any = None, *, distribution_version: str | None = None, distribution_flavor: str | None = None, **kwargs):
super().__init__(hosts, **kwargs)
# skip verification at this point; we've already verified this earlier with the synchronous client.
# The async client is used in the hot code path and we use customized overrides (such as that we don't
# The async client is used in the hot code path, and we use customized overrides (such as that we don't
# parse response bodies in some cases for performance reasons, e.g. when using the bulk API).
self._verified_elasticsearch = True
self.distribution_version = distribution_version
Expand Down Expand Up @@ -364,33 +367,25 @@ async def perform_request(
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
# We need to ensure that we provide content-type and accept headers
headers = combine_headers(self._headers, headers)
assert isinstance(headers, dict)
if body is not None:
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}
else:
if headers.get("content-type") is None:
headers["content-type"] = "application/json"
if headers.get("accept") is None:
headers["accept"] = "application/json"

if headers:
request_headers = self._headers.copy()
request_headers.update(headers)
else:
request_headers = self._headers
# It ensures content-type and accept headers are set.
mimetype = "application/json"
if path.endswith("/_bulk"):
mimetype = "application/x-ndjson"
for header in ("content-type", "accept"):
headers.setdefault(header, mimetype)

# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
# see https://github.com/elastic/elasticsearch/issues/51816
# Not applicable to serverless
if not self.is_serverless:
if versions.is_version_identifier(self.distribution_version) and (
versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0")
):
_mimetype_header_to_compat("Accept", request_headers)
_mimetype_header_to_compat("Content-Type", request_headers)
mimetype_headers_to_compat(headers, self.distribution_version)

if params:
target = f"{path}?{_quote_query(params)}"
Expand All @@ -400,7 +395,7 @@ async def perform_request(
meta, resp_body = await self.transport.perform_request(
method,
target,
headers=request_headers,
headers=headers,
body=body,
request_timeout=self._request_timeout,
max_retries=self._max_retries,
Expand Down
42 changes: 28 additions & 14 deletions esrally/client/common.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
import re
from collections.abc import Mapping
from collections.abc import Mapping, MutableMapping
from datetime import date, datetime
from typing import Any

from elastic_transport.client_utils import percent_encode
from elasticsearch import VERSION


def _client_major_version_to_str(version: tuple) -> str:
return str(version[0])
from elasticsearch import VERSION as ES_VERSION

from esrally.utils import versions

_MAJOR_SERVER_VERSION = str(ES_VERSION[0])
_WARNING_RE = re.compile(r"\"([^\"]*)\"")
_COMPAT_MIMETYPE_TEMPLATE = "application/vnd.elasticsearch+%s; compatible-with=" + _client_major_version_to_str(VERSION)
_COMPAT_MIMETYPE_RE = re.compile(r"application/(json|x-ndjson|vnd\.mapbox-vector-tile)")
_COMPAT_MIMETYPE_SUB = _COMPAT_MIMETYPE_TEMPLATE % (r"\g<1>",)


def _mimetype_header_to_compat(header, request_headers):
# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
mimetype = request_headers.get(header, None) if request_headers else None
if mimetype:
request_headers[header] = _COMPAT_MIMETYPE_RE.sub(_COMPAT_MIMETYPE_SUB, mimetype)
def combine_headers(*args: Mapping[str, str] | None) -> dict[str, str]:
combined_headers: dict[str, str] = {}
for header_mapping in args:
if header_mapping:
combined_headers.update(header_mapping)
return combined_headers


def mimetype_headers_to_compat(headers: MutableMapping[str, str], distribution_version: str | None) -> None:
if not headers:
return
if not versions.is_version_identifier(distribution_version):
return
major_version = versions.Version.from_string(distribution_version).major
if major_version < 8:
return

for header in ("accept", "content-type"):
mimetype = headers.get(header)
if not mimetype:
continue
headers[header] = _COMPAT_MIMETYPE_RE.sub(
"application/vnd.elasticsearch+%s; compatible-with=%s" % (r"\g<1>", major_version), mimetype
)


def _escape(value: Any) -> str:
Expand Down
2 changes: 1 addition & 1 deletion esrally/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async def on_request_end(session, trace_config_ctx, params):
hosts=self.hosts,
transport_class=RallyAsyncTransport,
ssl_context=self.ssl_context,
maxsize=self.max_connections,
connections_per_node=self.max_connections,
**self.client_options,
)

Expand Down
44 changes: 21 additions & 23 deletions esrally/client/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
UnsupportedProductError,
)

from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query
from esrally.client.common import (
_WARNING_RE,
_quote_query,
combine_headers,
mimetype_headers_to_compat,
)
from esrally.utils import versions


Expand Down Expand Up @@ -148,25 +153,14 @@ def perform_request(
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
# We need to ensure that we provide content-type and accept headers
if body is not None:
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}
else:
if headers.get("content-type") is None:
headers["content-type"] = "application/json"
if headers.get("accept") is None:
headers["accept"] = "application/json"

if headers:
request_headers = self._headers.copy()
request_headers.update(headers)
else:
request_headers = self._headers
headers = combine_headers(self._headers, headers)
assert isinstance(headers, dict)

if self._verified_elasticsearch is None:
info = self.transport.perform_request(method="GET", target="/", headers=request_headers)
info = self.transport.perform_request(method="GET", target="/", headers=headers)
info_meta = info.meta
info_body = info.body

Expand All @@ -178,15 +172,19 @@ def perform_request(
if self._verified_elasticsearch is not True:
_ProductChecker.raise_error(self._verified_elasticsearch, info_meta, info_body)

if body is not None:
# It ensures content-type and accept headers are set.
mimetype = "application/json"
if path.endswith("/_bulk"):
mimetype = "application/x-ndjson"
for header in ("content-type", "accept"):
headers.setdefault(header, mimetype)

# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
# see https://github.com/elastic/elasticsearch/issues/51816
if not self.is_serverless:
if versions.is_version_identifier(self.distribution_version) and (
versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0")
):
_mimetype_header_to_compat("Accept", headers)
_mimetype_header_to_compat("Content-Type", headers)
mimetype_headers_to_compat(headers, self.distribution_version)

if params:
target = f"{path}?{_quote_query(params)}"
Expand All @@ -196,7 +194,7 @@ def perform_request(
meta, resp_body = self.transport.perform_request(
method,
target,
headers=request_headers,
headers=headers,
body=body,
request_timeout=self._request_timeout,
max_retries=self._max_retries,
Expand Down
2 changes: 1 addition & 1 deletion esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _start_process(self, binary_path):
self._wait_for_healthy_running_container(container_id, DockerLauncher.PROCESS_WAIT_TIMEOUT_SECONDS)

def _docker_compose(self, compose_config, cmd):
return "docker-compose -f {} {}".format(os.path.join(compose_config, "docker-compose.yml"), cmd)
return "docker compose -f {} {}".format(os.path.join(compose_config, "docker-compose.yml"), cmd)

def _get_container_id(self, compose_config):
compose_ps_cmd = self._docker_compose(compose_config, "ps -q")
Expand Down
100 changes: 85 additions & 15 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import tempfile
import urllib.error
from collections.abc import Callable, Generator
from collections.abc import Callable, Generator, Iterable

import jinja2
import jinja2.exceptions
Expand Down Expand Up @@ -221,7 +221,6 @@ def challenge_info(c):

def load_track(cfg: types.Config, install_dependencies=False):
"""

Loads a track

:param cfg: The config object. It contains the name of the track to load.
Expand All @@ -231,19 +230,90 @@ def load_track(cfg: types.Config, install_dependencies=False):
return _load_single_track(cfg, repo, repo.track_name, install_dependencies)


def _install_dependencies(dependencies):
if dependencies:
log_path = os.path.join(paths.logs(), "dependency.log")
console.info(f"Installing track dependencies [{', '.join(dependencies)}]")
try:
with open(log_path, "ab") as install_log:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", *dependencies, "--upgrade", "--target", paths.libs()],
stdout=install_log,
stderr=install_log,
)
except subprocess.CalledProcessError:
raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See [{install_log.name}] for more information.")
DEPRECATED_PACKAGES = (
"elasticsearch",
"elastic-transport",
)

DEPRECATED_MESSAGE = (
"Track dependencies are deprecated: {deprecated}. "
"Please update the track to rely on the 'elasticsearch' libraries versions as required by Rally. "
"Rally no longer supports installing per-track Elasticsearch client libraries."
)


def _install_dependencies(requirements: Iterable[str]) -> None:
requirements, deprecated = _filter_requirements(requirements)
if deprecated:
message = DEPRECATED_MESSAGE.format(deprecated=", ".join(deprecated))
LOG.warning(message)
console.warn(message)

if not requirements:
LOG.debug("No track requirements to be installed.")
return

log_path = os.path.join(paths.logs(), "dependency.log")
console.info(f"Installing track dependencies: {requirements}.")
os.makedirs(paths.logs(), exist_ok=True)
try:
with open(log_path, "ab") as install_log:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", *requirements, "--upgrade", "--target", paths.libs()],
stdout=install_log,
stderr=install_log,
)
except subprocess.CalledProcessError:
raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See '{log_path}' for more information.")


def _filter_requirements(
requirements: Iterable[str], deprecated_packages: Iterable[str] = DEPRECATED_PACKAGES
) -> tuple[list[str], list[str]]:
"""
It removes deprecated packages from a list of pip requirement strings.

:param requirements: List of strings (e.g., ['flask==2.0', 'requests>=2.0'])
:param deprecated_packages: List of strings (e.g., ['old-pkg', 'deprecated_lib'])
:return: List of strings with deprecated dependencies removed.
"""

# It normalizes the deprecated list for efficient lookup.
normalized_deprecated = {_normalize_package_name(pkg) for pkg in deprecated_packages}

# It defines a pattern to extract the package name before any operators (==, >=, <, [, etc.)
# This regex looks for the start of the string until it hits a non-alphanumeric
# character that isn't a hyphen or underscore.
name_extractor = re.compile(r"^([a-zA-Z0-9\-_.]+)")

filtered_list = []
deprecated_list = []

for line in requirements:
if not (line := line.strip()) or line.startswith("#"):
# It skips comments and empty lines
continue

if match := name_extractor.match(line):
pkg = _normalize_package_name(match.group(1))
if pkg in normalized_deprecated:
deprecated_list.append(pkg)
continue

# If a name doesn't match any deprecated or if it can't parse a name, then it keep it.
filtered_list.append(line)

return filtered_list, deprecated_list


def _normalize_package_name(name: str) -> str:
"""
It normalizes a package name according to PEP 503.

:param name: The package name to normalize.
:return: The normalized package name.
"""
return re.sub(r"[-_.]+", "-", name).lower().strip()


def _load_single_track(cfg: types.Config, track_repository, track_name, install_dependencies=False):
Expand Down
2 changes: 1 addition & 1 deletion esrally/utils/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _versions_pattern(strict):
return VERSIONS if strict else VERSIONS_OPTIONAL


def is_version_identifier(text, strict=True):
def is_version_identifier(text: str | None, strict: bool = True) -> bool:
return text is not None and _versions_pattern(strict).match(text) is not None


Expand Down
5 changes: 1 addition & 4 deletions it/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
from esrally.utils import process

CONFIG_NAMES = ["in-memory-it", "es-it"]
DISTRIBUTIONS = ["8.4.0"]
# There are no ARM distribution artefacts for 6.8.0, which can't be tested on Apple Silicon
if platform.machine() != "arm64":
DISTRIBUTIONS.insert(0, "6.8.0")
DISTRIBUTIONS = ["8.4.0", "9.2.4"]
TRACKS = ["geonames", "nyc_taxis", "http_logs", "nested"]
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))

Expand Down
Loading