Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1421ad0
Upgrade Elastic search client libraries and urlib3.
fressi-elastic Jan 13, 2026
de1c485
Try enabling client compatibility mode to support ES version 8
fressi-elastic Jan 14, 2026
1011049
Update client compatibility version handling.
fressi-elastic Jan 19, 2026
31a59e3
Drop support for distribution version < 8, add support for version 9
fressi-elastic Jan 19, 2026
169050a
Remove obsolete unit test.
fressi-elastic Jan 19, 2026
494a501
Change track dependency installation: add pip to venv and deprecate u…
fressi-elastic Jan 19, 2026
478846c
It removes unnecessary changes.
fressi-elastic Jan 19, 2026
be8fe13
Skip installing dependencies, when all have been removed.
fressi-elastic Jan 19, 2026
2a05ff1
Use mimetype "application/x-ndjson" as default for bulk requests.
fressi-elastic Jan 19, 2026
46c6f19
Rewrite how headers are being reconbined in perform_request method
fressi-elastic Jan 19, 2026
e9018e7
Update requests version.
fressi-elastic Jan 19, 2026
933bc1d
It uses `docker compose` command instead of the now deprecated old `d…
fressi-elastic Jan 20, 2026
38aafd8
It removes unnecessary dependency.
fressi-elastic Jan 20, 2026
24f6479
It uses `docker compose` command instead of the now deprecated old `d…
fressi-elastic Jan 20, 2026
79e299b
Corrected capitalization of 'rally' to 'Rally'.
fressi-elastic Jan 20, 2026
4ec0a24
Simplify slice `dependency[len(deprecated) : len(deprecated) + 1]`
fressi-elastic Jan 20, 2026
63dabea
Refactor and test function `_filter_requirements`.
fressi-elastic Jan 20, 2026
5c86570
Restore `docker` as Python dependency.
fressi-elastic Jan 20, 2026
40def93
Fix Python executable path in `test_loader.py`
fressi-elastic Jan 20, 2026
f0ae21a
Update Docker client library to version 7.1.0
fressi-elastic Jan 20, 2026
b67e108
Revert wrong typo correction.
fressi-elastic Jan 22, 2026
002cabf
It reverts some changes to let client classes better match the origin…
fressi-elastic Jan 22, 2026
45309bd
Use compatibility mode version 8 when server version is unknown.
fressi-elastic Jan 22, 2026
d86e421
Update minimal supported version in CLI and project documentation.
fressi-elastic Jan 22, 2026
1f7eaa4
Set constructor `hosts` parameter default value to `None` for compati…
fressi-elastic Jan 22, 2026
73ea086
Remove workaround to forbid installation of deprecated packages durin…
fressi-elastic Jan 22, 2026
25db777
It fixes it/docker_dev_image_test.py error handling and update server…
fressi-elastic Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions esrally/client/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL

from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query
from esrally.client.common import (
_WARNING_RE,
_quote_query,
combine_headers,
mimetype_headers_to_compat,
)
from esrally.client.context import RequestContextHolder
from esrally.utils import io, versions

Expand Down Expand Up @@ -330,12 +335,10 @@ async def put_lifecycle(self, *args, **kwargs):


class RallyAsyncElasticsearch(AsyncElasticsearch, RequestContextHolder):
def __init__(self, *args, **kwargs):
distribution_version = kwargs.pop("distribution_version", None)
distribution_flavor = kwargs.pop("distribution_flavor", None)
super().__init__(*args, **kwargs)
def __init__(self, hosts: Any = None, *, distribution_version: str | None = None, distribution_flavor: str | None = None, **kwargs):
super().__init__(hosts, **kwargs)
# skip verification at this point; we've already verified this earlier with the synchronous client.
# The async client is used in the hot code path and we use customized overrides (such as that we don't
# The async client is used in the hot code path, and we use customized overrides (such as that we don't
# parse response bodies in some cases for performance reasons, e.g. when using the bulk API).
self._verified_elasticsearch = True
self.distribution_version = distribution_version
Expand Down Expand Up @@ -364,33 +367,25 @@ async def perform_request(
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
# We need to ensure that we provide content-type and accept headers
headers = combine_headers(self._headers, headers)
assert isinstance(headers, dict)
if body is not None:
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}
else:
if headers.get("content-type") is None:
headers["content-type"] = "application/json"
if headers.get("accept") is None:
headers["accept"] = "application/json"

if headers:
request_headers = self._headers.copy()
request_headers.update(headers)
else:
request_headers = self._headers
# It ensures content-type and accept headers are set.
mimetype = "application/json"
if path.endswith("/_bulk"):
mimetype = "application/x-ndjson"
for header in ("content-type", "accept"):
headers.setdefault(header, mimetype)

# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
# see https://github.com/elastic/elasticsearch/issues/51816
# Not applicable to serverless
if not self.is_serverless:
if versions.is_version_identifier(self.distribution_version) and (
versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0")
):
_mimetype_header_to_compat("Accept", request_headers)
_mimetype_header_to_compat("Content-Type", request_headers)
mimetype_headers_to_compat(headers, self.distribution_version)

if params:
target = f"{path}?{_quote_query(params)}"
Expand All @@ -400,7 +395,7 @@ async def perform_request(
meta, resp_body = await self.transport.perform_request(
method,
target,
headers=request_headers,
headers=headers,
body=body,
request_timeout=self._request_timeout,
max_retries=self._max_retries,
Expand Down
42 changes: 28 additions & 14 deletions esrally/client/common.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
import re
from collections.abc import Mapping
from collections.abc import Mapping, MutableMapping
from datetime import date, datetime
from typing import Any

from elastic_transport.client_utils import percent_encode
from elasticsearch import VERSION


def _client_major_version_to_str(version: tuple) -> str:
return str(version[0])
from elasticsearch import VERSION as ES_VERSION

from esrally.utils import versions

_MAJOR_SERVER_VERSION = str(ES_VERSION[0])
_WARNING_RE = re.compile(r"\"([^\"]*)\"")
_COMPAT_MIMETYPE_TEMPLATE = "application/vnd.elasticsearch+%s; compatible-with=" + _client_major_version_to_str(VERSION)
_COMPAT_MIMETYPE_RE = re.compile(r"application/(json|x-ndjson|vnd\.mapbox-vector-tile)")
_COMPAT_MIMETYPE_SUB = _COMPAT_MIMETYPE_TEMPLATE % (r"\g<1>",)


def _mimetype_header_to_compat(header, request_headers):
# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
mimetype = request_headers.get(header, None) if request_headers else None
if mimetype:
request_headers[header] = _COMPAT_MIMETYPE_RE.sub(_COMPAT_MIMETYPE_SUB, mimetype)
def combine_headers(*args: Mapping[str, str] | None) -> dict[str, str]:
combined_headers: dict[str, str] = {}
for header_mapping in args:
if header_mapping:
combined_headers.update(header_mapping)
return combined_headers


def mimetype_headers_to_compat(headers: MutableMapping[str, str], distribution_version: str | None) -> None:
if not headers:
return
if not versions.is_version_identifier(distribution_version):
return
major_version = versions.Version.from_string(distribution_version).major
if major_version < 8:
return

for header in ("accept", "content-type"):
mimetype = headers.get(header)
if not mimetype:
continue
headers[header] = _COMPAT_MIMETYPE_RE.sub(
"application/vnd.elasticsearch+%s; compatible-with=%s" % (r"\g<1>", major_version), mimetype
)


def _escape(value: Any) -> str:
Expand Down
2 changes: 1 addition & 1 deletion esrally/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async def on_request_end(session, trace_config_ctx, params):
hosts=self.hosts,
transport_class=RallyAsyncTransport,
ssl_context=self.ssl_context,
maxsize=self.max_connections,
connections_per_node=self.max_connections,
**self.client_options,
)

Expand Down
44 changes: 21 additions & 23 deletions esrally/client/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
UnsupportedProductError,
)

from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query
from esrally.client.common import (
_WARNING_RE,
_quote_query,
combine_headers,
mimetype_headers_to_compat,
)
from esrally.utils import versions


Expand Down Expand Up @@ -148,25 +153,14 @@ def perform_request(
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
# We need to ensure that we provide content-type and accept headers
if body is not None:
if headers is None:
headers = {"content-type": "application/json", "accept": "application/json"}
else:
if headers.get("content-type") is None:
headers["content-type"] = "application/json"
if headers.get("accept") is None:
headers["accept"] = "application/json"

if headers:
request_headers = self._headers.copy()
request_headers.update(headers)
else:
request_headers = self._headers
headers = combine_headers(self._headers, headers)
assert isinstance(headers, dict)

if self._verified_elasticsearch is None:
info = self.transport.perform_request(method="GET", target="/", headers=request_headers)
info = self.transport.perform_request(method="GET", target="/", headers=headers)
info_meta = info.meta
info_body = info.body

Expand All @@ -178,15 +172,19 @@ def perform_request(
if self._verified_elasticsearch is not True:
_ProductChecker.raise_error(self._verified_elasticsearch, info_meta, info_body)

if body is not None:
# It ensures content-type and accept headers are set.
mimetype = "application/json"
if path.endswith("/_bulk"):
mimetype = "application/x-ndjson"
for header in ("content-type", "accept"):
headers.setdefault(header, mimetype)

# Converts all parts of a Accept/Content-Type headers
# from application/X -> application/vnd.elasticsearch+X
# see https://github.com/elastic/elasticsearch/issues/51816
if not self.is_serverless:
if versions.is_version_identifier(self.distribution_version) and (
versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0")
):
_mimetype_header_to_compat("Accept", headers)
_mimetype_header_to_compat("Content-Type", headers)
mimetype_headers_to_compat(headers, self.distribution_version)

if params:
target = f"{path}?{_quote_query(params)}"
Expand All @@ -196,7 +194,7 @@ def perform_request(
meta, resp_body = self.transport.perform_request(
method,
target,
headers=request_headers,
headers=headers,
body=body,
request_timeout=self._request_timeout,
max_retries=self._max_retries,
Expand Down
60 changes: 46 additions & 14 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import tempfile
import urllib.error
from collections.abc import Callable, Generator
from collections.abc import Callable, Generator, Iterable

import jinja2
import jinja2.exceptions
Expand Down Expand Up @@ -231,19 +231,51 @@ def load_track(cfg: types.Config, install_dependencies=False):
return _load_single_track(cfg, repo, repo.track_name, install_dependencies)


def _install_dependencies(dependencies):
if dependencies:
log_path = os.path.join(paths.logs(), "dependency.log")
console.info(f"Installing track dependencies [{', '.join(dependencies)}]")
try:
with open(log_path, "ab") as install_log:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", *dependencies, "--upgrade", "--target", paths.libs()],
stdout=install_log,
stderr=install_log,
)
except subprocess.CalledProcessError:
raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See [{install_log.name}] for more information.")
DEPRECATED_DEPENDENCIES = [
"elasticsearch",
"elastic-transport",
]


def _remove_deprecated_dependency(dependencies: Iterable[str]) -> tuple[list[str], list[str]]:
invalid: list[str] = []
valid: list[str] = []
for dependency in dependencies:
for deprecated in DEPRECATED_DEPENDENCIES:
if dependency.startswith(deprecated) and dependency[len(deprecated) : len(deprecated) + 1] in "<=>!~":
invalid.append(deprecated)
break
else:
valid.append(dependency)
return valid, invalid


def _install_dependencies(dependencies: Iterable[str]):
dependencies, deprecated = _remove_deprecated_dependency(set(dependencies))
if deprecated:
message = (
f"Track dependencies are deprecated: {', '.join(deprecated)}. "
f"Please update the track to rely on the 'elasticsearch' library version required by rally. "
f"Rally no longer supports installing deprecated Elasticsearch client libraries."
)
LOG.warning(message)
console.warn(message)

if not dependencies:
LOG.debug("No track dependency to be installed.")
return

log_path = os.path.join(paths.logs(), "dependency.log")
console.info(f"Installing track dependencies [{', '.join(dependencies)}]")
try:
with open(log_path, "ab") as install_log:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", *dependencies, "--upgrade", "--target", paths.libs()],
stdout=install_log,
stderr=install_log,
)
except subprocess.CalledProcessError:
raise exceptions.SystemSetupError(f"Installation of track dependencies failed. See [{install_log.name}] for more information.")


def _load_single_track(cfg: types.Config, track_repository, track_name, install_dependencies=False):
Expand Down
2 changes: 1 addition & 1 deletion esrally/utils/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _versions_pattern(strict):
return VERSIONS if strict else VERSIONS_OPTIONAL


def is_version_identifier(text, strict=True):
def is_version_identifier(text: str | None, strict: bool = True) -> bool:
return text is not None and _versions_pattern(strict).match(text) is not None


Expand Down
5 changes: 1 addition & 4 deletions it/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
from esrally.utils import process

CONFIG_NAMES = ["in-memory-it", "es-it"]
DISTRIBUTIONS = ["8.4.0"]
# There are no ARM distribution artefacts for 6.8.0, which can't be tested on Apple Silicon
if platform.machine() != "arm64":
DISTRIBUTIONS.insert(0, "6.8.0")
DISTRIBUTIONS = ["8.4.0", "9.2.4"]
TRACKS = ["geonames", "nyc_taxis", "http_logs", "nested"]
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))

Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ classifiers = [
################################################################################################
dependencies = [
# License: Apache 2.0
"elasticsearch[async]==8.6.1",
"elastic-transport==8.4.1",
"elasticsearch[async]==9.2.1",
"elastic-transport==9.2.1",
# License: MIT
"urllib3==1.26.19",
"urllib3==2.6.3",
# License: Apache 2.0
"aiohttp==3.13.3",
"aiosignal==1.4.0",
Expand Down Expand Up @@ -92,6 +92,7 @@ dependencies = [
"hatch==1.3.1",
"hatchling==1.6.0",
"wheel==0.45.1",
"pip==25.2",
]

[project.optional-dependencies]
Expand Down
24 changes: 0 additions & 24 deletions tests/client/common_test.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/client/factory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def test_create_async_client_with_api_key_auth_override(self, es):
hosts=["https://localhost:9200"],
transport_class=RallyAsyncTransport,
ssl_context=f.ssl_context,
maxsize=f.max_connections,
connections_per_node=f.max_connections,
verify_certs=True,
serializer=f.client_options["serializer"],
api_key=api_key,
Expand Down
3 changes: 3 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import math
import random
import typing
from unittest import mock

import elastic_transport
Expand Down Expand Up @@ -5968,7 +5969,9 @@ async def test_create_ilm_policy_without_request_params(self, es):

@mock.patch("esrally.client.asynchronous.IlmClient")
@pytest.mark.asyncio
@typing.no_type_check
async def test_RallyIlmClient_rewrites_kwargs(self, es_ilm):

es = RallyAsyncElasticsearch(hosts=["http://localhost:9200"])
es_ilm.put_lifecycle = mock.AsyncMock(return_value={})

Expand Down
Loading