Skip to content
Draft
Show file tree
Hide file tree
Changes from 95 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
665557f
Revert "Allow configuring the process startup method to be used for c…
fressi-elastic Aug 26, 2025
ba07027
Refactor actor system initialization and retrieval.
fressi-elastic Sep 8, 2025
5a021dd
Implements AsyncActor
fressi-elastic Sep 18, 2025
1771a5d
Revert unintended changes.
fressi-elastic Sep 18, 2025
4767108
Revert unintended changes.
fressi-elastic Sep 18, 2025
b7f7e4e
Revert unintended changes.
fressi-elastic Sep 18, 2025
de24507
Create method for getting Config subclass from context and add a test…
fressi-elastic Sep 18, 2025
41a544a
Merge branch 'master' of github.com:elastic/rally
fressi-elastic Sep 18, 2025
b78f157
Fix issues with configuration context.
fressi-elastic Sep 18, 2025
0931be4
Improve configuration and actor system tests.
fressi-elastic Sep 18, 2025
3fb3f49
Use a port range as default actor admin port for TCP and UDP system b…
fressi-elastic Sep 18, 2025
dd2e114
Revert unrelated changes
fressi-elastic Sep 19, 2025
ef89968
Ignore problems loading logging configuration.
fressi-elastic Sep 19, 2025
9a13e3e
Pass all typing annotations checks in the new esrally.actors package.
fressi-elastic Sep 19, 2025
b594887
Improve request context implementation.
fressi-elastic Sep 19, 2025
127715f
Rename Context as ActorContext.
fressi-elastic Sep 19, 2025
5951451
Move implementation of create method inside of the context implementa…
fressi-elastic Sep 19, 2025
aa4dc82
Add test cases for async actor implementation.
fressi-elastic Sep 22, 2025
50e43ba
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Sep 22, 2025
9cf79a6
Ensures blocking requests can be cancelled.
fressi-elastic Sep 22, 2025
c0db245
It update code style and documentation.
fressi-elastic Sep 22, 2025
7f4e555
Add support for ping request inside the base async actor class.
fressi-elastic Sep 23, 2025
15ddc82
Fixing support for python 3.9
fressi-elastic Sep 23, 2025
30732d2
Remove support for UDP actor system base.
fressi-elastic Sep 23, 2025
67d0632
Some further refactory to reduce code duplication and support task ca…
fressi-elastic Sep 25, 2025
c69486c
Make ActorSystem.ask asynchronousish.
fressi-elastic Sep 26, 2025
4816c0e
Add some documentation in the code (_actor.py)
fressi-elastic Sep 26, 2025
6816e5e
Add some documentation in the code (_config.py)
fressi-elastic Sep 26, 2025
27ee00a
Write a new fixture to run test functions from inside of an AsyncActo…
fressi-elastic Sep 28, 2025
7aabfc0
Remove pending tasks and some other smaller refactory in actor context.
fressi-elastic Sep 29, 2025
4e5dc18
Ensure pending results is clean after request cancellation.
fressi-elastic Sep 29, 2025
3bb4cc6
Just return the wrapped future instead of the task after performing t…
fressi-elastic Sep 29, 2025
482e9ad
Add actor type name and request id to task name if no name is given t…
fressi-elastic Sep 29, 2025
1a1f603
It adds some extra documentation in the code.
fressi-elastic Sep 29, 2025
d6321d1
Add option to allow picking up a random unused port for Thespian admi…
fressi-elastic Sep 29, 2025
0925770
Merge branch 'master' of github.com:elastic/rally
fressi-elastic Sep 29, 2025
d32aee7
Add support for Python3.13 (pass unit tests).
fressi-elastic Sep 29, 2025
dff998d
Revert unnecessary changes.
fressi-elastic Sep 30, 2025
8e4f56f
Revert unnecessary changes.
fressi-elastic Sep 30, 2025
df04aa1
Revert unnecessary changes.
fressi-elastic Sep 30, 2025
91ae14a
Update the uv.lock file.
fressi-elastic Sep 30, 2025
f2965c7
Add entries for Python v3.13 to the CI testing matrix.
fressi-elastic Sep 30, 2025
ec0c82b
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Sep 30, 2025
5d32663
Workaround issue when log definition files is not present.
fressi-elastic Sep 30, 2025
80ed753
Use `None` as failback value for logDefs when logging configuration f…
fressi-elastic Sep 30, 2025
e37d7b9
It drops support for Python 3.9
fressi-elastic Sep 30, 2025
7f4fee3
Fix docs/migrate.rst
fressi-elastic Sep 30, 2025
4ca911d
Remove statement: from __future__ import annotations
fressi-elastic Sep 30, 2025
cb01f3a
Fix esrally/track/params.py and update uv.lock file.
fressi-elastic Sep 30, 2025
001f33b
Revert changes to esrally/storage/_range.py
fressi-elastic Sep 30, 2025
78d294f
Skip testing version 3.12 in buildkite pipeline.
fressi-elastic Sep 30, 2025
4f20116
Fix some other annotation and one pylint founding.
fressi-elastic Sep 30, 2025
11d5159
Merge branch 'python3.13' into actors
fressi-elastic Sep 30, 2025
272b72c
Remove lines: from __future__ import annotations
fressi-elastic Sep 30, 2025
a140c78
Workaround issue blocking when sending external request.
fressi-elastic Oct 1, 2025
f5658ff
It removes 'from __future__ import annotations' statements.
fressi-elastic Oct 2, 2025
9e217ef
Update CI python versions file.
fressi-elastic Oct 2, 2025
23e1314
Remove unnecessary changes.
fressi-elastic Oct 2, 2025
ab11c9b
Update CI python versions.
fressi-elastic Oct 2, 2025
4e06935
Set Python3.13 as the default version.
fressi-elastic Oct 2, 2025
45a9015
Let `make test` and `make it` use python 3.13
fressi-elastic Oct 2, 2025
0e9cc67
Add standard-imghdr because imghdr has been removed from python 3.13
fressi-elastic Oct 2, 2025
48f3da0
Update docker image version.
fressi-elastic Oct 2, 2025
0699feb
Use managed python version for preparing venv directory.
fressi-elastic Oct 2, 2025
8c75c66
Refactor Makefile:
fressi-elastic Oct 2, 2025
073d5f4
Add goal to run IT for serverless.
fressi-elastic Oct 2, 2025
210565c
Add goal for rally_tracks_compat
fressi-elastic Oct 2, 2025
bf47095
Fix PHONY goals.
fressi-elastic Oct 2, 2025
7ff36ad
Update python versions for mypy and black
fressi-elastic Oct 2, 2025
6cbea73
Update .buildkite pipeline files.
fressi-elastic Oct 2, 2025
e3f2857
Fix UV installation step.
fressi-elastic Oct 2, 2025
0678a19
Debug CI scripts
fressi-elastic Oct 2, 2025
6fbff06
Fix Dockerfile and CI scripts.
fressi-elastic Oct 2, 2025
d2ffdb5
Fix VENV path in Dockerfile.
fressi-elastic Oct 2, 2025
a92dbf6
Use levacy VIRTUAL_ENV as name for VENV_DIR
fressi-elastic Oct 2, 2025
5ceb63e
Force Python version when running it_serverless test cases.
fressi-elastic Oct 3, 2025
35a216f
Export PY_VERSION as an environment variable.
fressi-elastic Oct 3, 2025
930ac3a
Update uv version in Docerfile.
fressi-elastic Oct 3, 2025
caea1a3
It drops the check for max python version.
fressi-elastic Oct 4, 2025
9dea73e
Drop dependency on tox/nox.
fressi-elastic Oct 4, 2025
a08fa72
It ensures the rally plugin is installed before using for tests.
fressi-elastic Oct 4, 2025
f408e14
It renames Makefile goal rally_tracks_compat -> it_tracks_compat
fressi-elastic Oct 4, 2025
a7a0e6e
It adds some comment in the Makefile.
fressi-elastic Oct 4, 2025
7f2915f
Merge branch 'python3.13' into actors
fressi-elastic Oct 5, 2025
9b38493
Remove unnecessary requirements and update wheel version.
fressi-elastic Oct 6, 2025
70a88c2
Update Makefile error message.
fressi-elastic Oct 6, 2025
3f943fe
Update add_missing_loggers_to_config to make it a little easier to read.
fressi-elastic Oct 6, 2025
432cda8
Merge branch 'python3.13' into actors
fressi-elastic Oct 6, 2025
86f2357
It gets few fixes back from storage.manager branch.
fressi-elastic Oct 6, 2025
929dc0f
Add an example test to demonstrate the new AsyncActor at work.
fressi-elastic Oct 6, 2025
8b2ba23
Improve example readability.
fressi-elastic Oct 6, 2025
bbb7d9d
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Oct 6, 2025
1953694
Revert changes to net.resolve.
fressi-elastic Oct 6, 2025
5743a2b
Update and test convert.range function.
fressi-elastic Oct 6, 2025
711e211
Relax acceptance levels of test_execute_schedule_throughput_throttled.
fressi-elastic Oct 7, 2025
07ac669
Starting final clean up and some simplifications.
fressi-elastic Oct 9, 2025
2f6d948
Fix actor_test.py.
fressi-elastic Oct 9, 2025
7384a34
Clean 3 left over.
fressi-elastic Oct 9, 2025
a299647
Provide a default configuration for actors process when starting befo…
fressi-elastic Oct 10, 2025
541a407
Add some additional documentation in the code.
fressi-elastic Oct 10, 2025
814e9a3
Add more code documentation and other small cleaning of the code.
fressi-elastic Oct 10, 2025
2bc32cd
Fix system_test.
fressi-elastic Oct 10, 2025
e0be984
Merge branch 'master' of github.com:elastic/rally into actors
fressi-elastic Oct 10, 2025
c62c62e
Make sure actor system is shut down at process termination by default.
fressi-elastic Oct 10, 2025
f83b2a5
Handle actor system shut down in listen_for_result() closure.
fressi-elastic Oct 10, 2025
ccb578f
Ensure actor subprocesses have conviguration details to eventually jo…
fressi-elastic Oct 10, 2025
f47cc37
Fix test stability on Python-3.10
fressi-elastic Oct 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions esrally/actors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from thespian.actors import ( # type: ignore[import-untyped]
Actor,
ActorAddress,
ActorExitRequest,
ActorSystem,
PoisonMessage,
WakeupMessage,
)

from esrally.actors._actor import AsyncActor, get_actor, respond
from esrally.actors._config import ActorConfig
from esrally.actors._context import (
ActorContext,
ActorContextError,
create_actor,
create_task,
get_actor_context,
ping,
request,
send,
shutdown,
wait_for,
)
from esrally.actors._system import SystemBase, get_actor_system, init_actor_system
354 changes: 354 additions & 0 deletions esrally/actors/_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import collections
import contextvars
import dataclasses
import inspect
import logging
from collections.abc import Awaitable, Coroutine
from typing import Any, TypeVar

from thespian import actors # type: ignore[import-untyped]

from esrally.actors._config import ActorConfig
from esrally.actors._context import (
ActorContext,
ActorContextError,
get_actor_context,
set_actor_context,
)
from esrally.actors._proto import (
CancelledResponse,
CancelRequest,
PingRequest,
PongResponse,
Request,
ResultResponse,
RunningTaskResponse,
)
from esrally.config import init_config

LOG = logging.getLogger(__name__)


def get_actor() -> "AsyncActor":
"""It returns the actor where the actual message is being received."""
return get_actor_request_context().actor


def get_actor_request_context() -> "ActorRequestContext":
"""It retrieve details about the context where the actual message is being received."""
ctx = get_actor_context()
if not isinstance(ctx, ActorRequestContext):
raise ActorContextError(f"Context is not a AsyncActorContext: {ctx!r}")
return ctx


def respond(status: Any = None, error: Exception | None = None) -> None:
"""It sends a response message to the sender actor."""
get_actor_request_context().respond(status=status, error=error)


R = TypeVar("R")


@dataclasses.dataclass
class ActorRequestContext(ActorContext):
"""Actor context being used while some received messages is being processed by an actor."""

# sender represents the address of the actor that sent current request message.
sender: actors.ActorAddress | None = None

# req_id represents the unique request ID carried by current request message.
req_id: str = ""

# pending_tasks contains sets of asyncio tasks (indexed by req_id) to be cancelled in case of a CancelRequest is
# received.
pending_tasks: dict[str, set[asyncio.Task]] = dataclasses.field(default_factory=lambda: collections.defaultdict(set))

# responded will be true after current request message has been responded. The purpose of this flag is avoiding
# returning multiple responses to the same 'req_id'.
responded: bool = False

@property
def name(self) -> str:
return f"{super().name}|{self.req_id}"

@property
def actor(self) -> "AsyncActor":
"""actor property returns the local actor where the current messages are being received."""
assert isinstance(self.handler, AsyncActor)
return self.handler

def create_task(self, coro: Coroutine[None, None, R], *, name: str | None = None) -> asyncio.Task[R]:
"""create_task is a wrapper around asyncio.AbstractEventLoop.create_task

While processing a request message will register create task for cancellation in case a CancelRequest message
is received.

Please note that while processing a request from inside an actor, all tasks created by calling this method will
be cancelled in case of a CancelRequest message. This could also include requests that have been forwarded to
other actors. To prevent this to happen, please use asyncio.create_task function instead.

:param coro: The coroutine to wrap.
:param name: The name of the task.
:return: The wrapper task.
"""
task = super().create_task(coro, name=name)
if self.req_id:

def remove_task(f: asyncio.Task) -> None:
tasks = self.pending_tasks.get(self.req_id)
if tasks is not None:
tasks.discard(f)
if not tasks:
del self.pending_tasks[self.req_id]

task.add_done_callback(remove_task)

self.pending_tasks[self.req_id].add(task)
self.send(self.sender, RunningTaskResponse(req_id=self.req_id, name=task.get_name()))
return task

def receive_message(self, message: Any, sender: actors.ActorAddress) -> bool:
"""receive_message is called by the actor when receiving a new message."""

# It processes requests.
if isinstance(message, Request) and self.receive_request(message, sender):
return True

# It receives application configuration.
if isinstance(message, ActorConfig) and self.receive_actor_config(message, sender):
return True

# It processes responses.
if super().receive_message(message, sender):
return True

# It dispatches the message back to the actor.
actors.ActorTypeDispatcher.receiveMessage(self.actor, message, sender)
return True

def receive_actor_config(self, cfg: ActorConfig, sender: actors.ActorAddress) -> bool:
"""It adds the configuration to the actor contextvars.Context."""
self.log.debug("Received configuration message: %s", cfg)
init_config(cfg, force=True)
return False

def receive_request(self, request: Request, sender: actors.ActorAddress) -> bool:
"""It processes a request message."""
try:
ctx = get_actor_context()
except ActorContextError:
ctx = None
if ctx is not self:
# It ensures the request is processed within this actor context.
with self.enter():
return self.receive_request(request, sender)

self.log.debug("Received request from actor %s: %s", sender, request)
# It sets context variables that will later be used to respond the request.
self.sender = sender
self.req_id = request.req_id
try:
# It cancels all tasks of a previous request.
if isinstance(request, CancelRequest) and self.cancel_request(request.message):
return True

# It processes a ping request.
if isinstance(request, PingRequest) and self.receive_ping_request(request):
return True

# It dispatches the request enveloped message to the actor.
self.respond(actors.ActorTypeDispatcher.receiveMessage(self.actor, request.message, sender))
return True

except Exception as error:
# It sends the error as a response.
self.respond(error=error)
return True

def receive_cancel_request(self, request: CancelRequest) -> bool:
"""It processes a cancel request message."""
self.cancel_request(request.message)
return True

def cancel_request(self, message: Any = None) -> bool:
"""It cancels all pending tasks of the current request. Then it sends back a CancelledResponse to notify request sender."""
for t in self.pending_tasks.pop(self.req_id, []):
if not t.done():
t.cancel(message)
if not self.responded:
# It notifies the request sender the request has been cancelled.
self.respond(CancelledResponse(self.req_id, message))
return True

def receive_ping_request(self, request: PingRequest) -> bool:
"""It processes a ping request message."""
if request.destination in [None, self.actor.myAddress]:
# It responds to the ping request.
self.respond(PongResponse(request.req_id, request.message))
else:
# It forwards the request to the destination actor.
self.respond(self.request(request.destination, request))
return True

def respond(self, status: Any = None, error: Exception | None = None) -> None:
"""It sends a response message to the sender actor."""
# It ensures a request gets responded only once in the scope of this context.
if self.responded:
if error is not None:
# The error will eventually reach requester in the form of a PoisonMessage
raise error
if status is not None:
raise ActorContextError("Already responded.")

if error is None and inspect.isawaitable(status):
# It schedules a task to wait for the actual status before responding.
# Please note that the final status could be another awaitable that would
# make it spawning another task to wait for it. This should ensure there
# should always be a response for every request which processing come to
# an end. All tasks will be cancelled in case of a CancelRequest message is received.
self.create_task(self.respond_later(status), name=f"respond_later({status!r})")
return

if self.req_id:
# The status or the error will eventually reach requester in the form of a Response message,
# so that the requester can match the target future by using its req_id.
response = ResultResponse.from_status(self.req_id, status, error)
elif error is None:
# The status will eventually reach requester in the form of a standalone message without any req_id.
response = status
else:
# The error will eventually reach requester in the form of a PoisonMessage
raise error

if response is None:
self.log.debug("No response sent to actor %s: %r", self.sender, response)
else:
# It finally sends a response.
self.send(self.sender, response)
self.log.debug("Sent response to actor %s: %r", self.sender, response)

# Reaching this point it mean there is nothing more to respond to the request and all pending tasks can be
# cancelled.
self.responded = True
self.cancel_request()

async def respond_later(self, status: Awaitable) -> None:
"""respond_later awaits for a response to get ready.

It makes sures a response is always sent to the request sender, even if the awaited task is cancelled.
"""
try:
self.respond(status=await status)
except Exception as error:
self.respond(error=error)
except asyncio.CancelledError as error:
self.respond(CancelledResponse(message=str(error), req_id=self.req_id))
raise


class AsyncActor(actors.ActorTypeDispatcher):
"""Override the thespian ActorTypeDispatcher with some few additional features.

Additional features include:
- It uses its own `asyncio` event loop to run asynchronous tasks (co-routines). The loop is set as current during
messages processing.
- It periodically run pending tasks from its event loop.
- The methods processing a message type can be async coroutines, on which case they will be scheduled for execution
as an async task of the actor event loop. While messages are being processed by these co-routines, other messages
and loop events can be processed from the actor, making the actor truly asynchronous.
- It implements `request` method, an awaitable version of `Actor.send` method. It sends a message with a unique
request ID, and until a response with the same ID is received, from the target actor, other messages and loop
events are being processed from the actor.
- When receiving an ActorConfig message it inits context configuration with it.
- When receiving a PoisonErrorMessage as response of `request` method, it translates it to a PoisonError and raises
it as a possible outcome of waiting for a response.
- When receiving an ActorExitRequest, it cancels all pending tasks from the loop, then stops the loop, so that
request senders should receive CancelledError while waiting for a response.
- It creates its own logger.
"""

def __init__(self) -> None:
super().__init__()
self._log: logging.Logger | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._sent_requests: dict[str, asyncio.Future] = {}
self._request_tasks: dict[str, set[asyncio.Task]] = collections.defaultdict(set)
self._ctx: contextvars.Context = contextvars.copy_context()

@property
def log(self) -> logging.Logger:
"""It returns the logger of this actor."""
if self._log is None:
self._log = self.logger(f"{type(self).__module__}.{type(self).__name__}")
return self._log

@property
def loop(self) -> asyncio.AbstractEventLoop:
"""It returns the event loop of this actor."""
loop = self._loop
if loop is None:
self._loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# It schedules the periodic execution of actor's asyncio tasks.
self._run_pending_tasks()
return loop

def receiveMessage(self, message: Any, sender: actors.ActorAddress) -> None:
"""It makes sure the message is handled with the actor context variables."""
ctx = ActorRequestContext(
handler=self, pending_results=self._sent_requests, loop=self.loop, log=self.log, pending_tasks=self._request_tasks
)
self._ctx.run(ctx.receive_message, message, sender)

def receiveMsg_ActorConfig(self, message: ActorConfig, sender: actors.ActorAddress) -> None:
self.log.debug("Received actor config: %r", message)

def receiveMsg_ActorExitRequest(self, message: actors.ActorExitRequest, sender: actors.ActorAddress) -> None:
"""It cancels all pending tasks in the event loop, then stops the loop and finally unlink the current context."""
while self._request_tasks:
_, tasks = self._request_tasks.popitem()
for task in tasks:
task.cancel("Actor exit.")
if self._loop is not None:
self._loop.stop()
self._loop = None
set_actor_context(None)

_RUN_PENDING_TASKS = "RunPendingTasks"

def receiveMsg_WakeupMessage(self, message: actors.WakeupMessage, sender: actors.ActorAddress) -> None:
"""It executes pending tasks on a scheduled time period."""
if message.payload is None:
return None
if message.payload == self._RUN_PENDING_TASKS:
self._run_pending_tasks()
return None
# It processes payload message as a standalone message.
super().receiveMessage(message.payload, sender)

def _run_pending_tasks(self) -> None:
try:
self.loop.run_until_complete(nop())
finally:
self.wakeupAfter(0.001, self._RUN_PENDING_TASKS)


async def nop() -> None: ...
Loading