forked from aiidateam/plumpy
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #6 from unkcpz/rmq-out
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control. By forming a `Coordinator` protocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be the `tatzelwurm` a task broker that has scheduler support and file based task broker require no background service). For the prototype of how a coordinator should look like, the `MockCoordinator` in `tests/utils` is the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality. Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping `concurrent.futures.Future` into `asyncio.Future`. I use the same way to convert `asyncio.Future` into `concurent.futures.Future` (which is the `kiwipy.Future` as alias). - move the `aio_pika` import lazily by moving the rmq exceptions to `rmq` module, this can increase the performance of `import aiida; aiida.orm`. - ~~`CancellableAction` using composite to behave as a Future like object.~~ - use `asyncio.Future` in favor of alias `plumpy.Future` and - use `concurrent.futures.Future` instead of alias `kiwipy.Future`. - Hand write `_chain` and `_copy_future` since we can not just rely on the API of asyncio that is not exposed. - Forming the `coordinator/Communicator` protocol. - Just forming the `coordinator/Coordinator` protocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy. - Mock the coordinator for unit test. - test against aiida-core see what need to be changed there and improve here. (aiidateam/aiida-core#6675) - The API for plumpy process can be more compact instead of using kiwipy/rmq "subscriber" concept. (how to replace rpc pattern??)
- Loading branch information
Showing
26 changed files
with
1,565 additions
and
1,006 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# -*- coding: utf-8 -*- | ||
from __future__ import annotations | ||
|
||
from collections.abc import Sequence | ||
from typing import Any, Protocol | ||
|
||
from plumpy import loaders | ||
from plumpy.message import MessageType | ||
from plumpy.utils import PID_TYPE | ||
|
||
ProcessResult = Any | ||
ProcessStatus = Any | ||
|
||
|
||
class ProcessController(Protocol): | ||
""" | ||
Control processes using coroutines that will send messages and wait | ||
(in a non-blocking way) for their response | ||
""" | ||
|
||
def get_status(self, pid: 'PID_TYPE') -> ProcessStatus: | ||
""" | ||
Get the status of a process with the given PID | ||
:param pid: the process id | ||
:return: the status response from the process | ||
""" | ||
... | ||
|
||
def pause_process(self, pid: 'PID_TYPE', msg: Any | None = None) -> ProcessResult: | ||
""" | ||
Pause the process | ||
:param pid: the pid of the process to pause | ||
:param msg: optional pause message | ||
:return: True if paused, False otherwise | ||
""" | ||
... | ||
|
||
def play_process(self, pid: 'PID_TYPE') -> ProcessResult: | ||
""" | ||
Play the process | ||
:param pid: the pid of the process to play | ||
:return: True if played, False otherwise | ||
""" | ||
... | ||
|
||
def kill_process(self, pid: 'PID_TYPE', msg: MessageType | None = None) -> ProcessResult: | ||
""" | ||
Kill the process | ||
:param pid: the pid of the process to kill | ||
:param msg: optional kill message | ||
:return: True if killed, False otherwise | ||
""" | ||
... | ||
|
||
def continue_process( | ||
self, pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False, no_reply: bool = False | ||
) -> ProcessResult | None: | ||
""" | ||
Continue the process | ||
:param _communicator: the communicator | ||
:param pid: the pid of the process to continue | ||
:param tag: the checkpoint tag to continue from | ||
""" | ||
... | ||
|
||
async def launch_process( | ||
self, | ||
process_class: str, | ||
init_args: Sequence[Any] | None = None, | ||
init_kwargs: dict[str, Any] | None = None, | ||
persist: bool = False, | ||
loader: loaders.ObjectLoader | None = None, | ||
nowait: bool = False, | ||
no_reply: bool = False, | ||
) -> ProcessResult: | ||
""" | ||
Launch a process given the class and constructor arguments | ||
:param process_class: the class of the process to launch | ||
:param init_args: the constructor positional arguments | ||
:param init_kwargs: the constructor keyword arguments | ||
:param persist: should the process be persisted | ||
:param loader: the classloader to use | ||
:param nowait: if True, don't wait for the process to send a response, just return the pid | ||
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value | ||
:return: the result of launching the process | ||
""" | ||
... | ||
|
||
async def execute_process( | ||
self, | ||
process_class: str, | ||
init_args: Sequence[Any] | None = None, | ||
init_kwargs: dict[str, Any] | None = None, | ||
loader: loaders.ObjectLoader | None = None, | ||
nowait: bool = False, | ||
no_reply: bool = False, | ||
) -> ProcessResult: | ||
""" | ||
Execute a process. This call will first send a create task and then a continue task over | ||
the communicator. This means that if communicator messages are durable then the process | ||
will run until the end even if this interpreter instance ceases to exist. | ||
:param process_class: the process class to execute | ||
:param init_args: the positional arguments to the class constructor | ||
:param init_kwargs: the keyword arguments to the class constructor | ||
:param loader: the class loader to use | ||
:param nowait: if True, don't wait for the process to send a response | ||
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value | ||
:return: the result of executing the process | ||
""" | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# -*- coding: utf-8 -*- | ||
from __future__ import annotations | ||
|
||
from typing import TYPE_CHECKING, Any, Callable, Hashable, Pattern, Protocol | ||
|
||
if TYPE_CHECKING: | ||
# identifiers for subscribers | ||
ID_TYPE = Hashable | ||
Subscriber = Callable[..., Any] | ||
# RPC subscriber params: communicator, msg | ||
RpcSubscriber = Callable[['Coordinator', Any], Any] | ||
# Task subscriber params: communicator, task | ||
TaskSubscriber = Callable[['Coordinator', Any], Any] | ||
# Broadcast subscribers params: communicator, body, sender, subject, correlation id | ||
BroadcastSubscriber = Callable[['Coordinator', Any, Any, Any, ID_TYPE], Any] | ||
|
||
|
||
class Coordinator(Protocol): | ||
# XXX: naming - 'add_message_handler' | ||
def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ... | ||
|
||
# XXX: naming - 'add_broadcast_handler' | ||
def add_broadcast_subscriber( | ||
self, | ||
subscriber: 'BroadcastSubscriber', | ||
subject_filters: list[Hashable | Pattern[str]] | None = None, | ||
sender_filters: list[Hashable | Pattern[str]] | None = None, | ||
identifier: 'ID_TYPE | None' = None, | ||
) -> Any: ... | ||
|
||
# XXX: naming - absorbed into 'add_message_handler' | ||
def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ... | ||
|
||
def remove_rpc_subscriber(self, identifier: 'ID_TYPE | None') -> None: ... | ||
|
||
def remove_broadcast_subscriber(self, identifier: 'ID_TYPE | None') -> None: ... | ||
|
||
def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: ... | ||
|
||
def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ... | ||
|
||
def broadcast_send( | ||
self, | ||
body: Any | None, | ||
sender: 'ID_TYPE | None' = None, | ||
subject: str | None = None, | ||
correlation_id: 'ID_TYPE | None' = None, | ||
) -> Any: ... | ||
|
||
def task_send(self, task: Any, no_reply: bool = False) -> Any: ... | ||
|
||
def close(self) -> None: ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.