Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport interface design #6686

Closed
unkcpz opened this issue Jan 8, 2025 · 2 comments
Closed

Transport interface design #6686

unkcpz opened this issue Jan 8, 2025 · 2 comments
Labels
type/feature request status undecided

Comments

@unkcpz
Copy link
Member

unkcpz commented Jan 8, 2025

@khsrali, @agoscinski and I had a discussion on what should be the proper interface for the transport plugin to make it support both the sync and async implementations.

The problem with the design of #6626 is, it defines three interfaces.
It has _BaseTransport which is the internal interface that provide some shared methods and the base class is supposed to be only inherited by BlockingTransport and AsyncTransport.
In the BlockingTransport and AsyncTransport both of them has two styles of methods to support both async call and sync call.
Take mkdir as example, there are also mkdir_async. Both of them exist in the BlockingTransport and AsyncTransport.
In BlockingTransport the mkdir_async methods does not need (and forbid to be override) to be implemented but will constructed automatically from sync version of mkdir.
In the AsyncTransport the mkdir_async methods are supposed to be implemented, and the sync mkdir will be constructed automatically from async implementation.

A good design should take three groups of people into consideration:

  1. aiida-core developers. who don't want to deal with three types of interface to dispatch the methods used inside the aiida-core code base.
  2. the transport developers, who need one clear contract to look at to implement the new transport.
  3. the aiida-quantumespresso plugin developers, who has the plugin that use the transport functionalities.

I can understand reasons behind having three interfaces for the same type of operations:

  1. for the backward compatibility of plugins (e.g. aiida-quantumespresso) that already calls method from transport directly, it has no information of transport it use. It might be async or sync. If it is async and it calls without await the function is not run.
  2. Inside aiida-core, there are places where transport is used in the sync manner without await, and it require the method call not await. Change all of those is out of the scope of the PR and not all of them are necessary.

Having two different names for method make it possible to distinguish which type of method should be called.
But if the function is an async function, from function signature the async def foobar() is already tell it is an async function, having async def foobar_async() is redundunt.

When talking about aiida plugin, it is actually the dependency injection pattern we used in aiida-core that in the core code base we assume the plugin has certain methods implemented.
In aiida-core those interface methods are called (this also know as duck typing it is the pattern recommend in python), and leave the actually implemetation to the plugin.
The interface forming the contract of class for the plugin developers.
Go back to the purposes of having the interface for the plugin type, there are two folds of purposes:

  1. The interface forming the contract to tell transport plugin developer what method need to implemented.
  2. The interface tell when using the transport (either in aiida-core or by other plugin) what methods can be called.

For the first purpose, as a transport plugin developer, what they want is no matter the plugin is async or sync, only need to look at the same contract and then knows what methods need to provide.
For the second purpose, inside aiida-core daemon, when it requires call transport to interact with remote resource, all the functions are in async manner which is the new feature introduced by aiidateam/plumpy#272 with the changes are applied in aiida-core in Ali's PR.

However, problem raises when the transport used outside aiida-core.
When calling the method, if the function name is the same for both async and sync, calling async version of transport.mkdir() without putting it in the event loop won't run the function.
It lead to the question: "do we suppose the user outside aiida-core to call async function directly?" My answer is: NO.
I remember Martin told me the original design of having all complex conversion between async/sync back and forth in plumpy with introduce the synchrouns RmqThreadCommunicator which wrap the async RmqCommunicator is we don't want to make plugin developer to deal with any async programming.
Now, the transport become an exception that having async implementation can dramatically improve the performance, then we take the move.
But except it, it is better to keep the other plugins still only require regular synchronous programming.
Thus, I think it is possible to assume when calling the transport from outside it is all in synchronous manner.

Here comes to my proposal.

  • only have one interface (e.g. transport.mkdir) that forming the contract for both async and sync implementation.
  • The asyncssh plugin implement async def mkdir(): and paramiko ssh plugin implement def mkdir():.
  • In aiida-core, inside execmanager module it required to call the coroutine. So before calling the tranport.mkdir(), it require an is_coroutine check and convert it to coroutine as did in plumpy. The transport can first be convert to an async transport then all method can be called with await.
  • Outside aiida-core, things are opposite since the async function will be called in the synchronous manner. But it is not possible to ask user who calls the function to do the is_coroutine check and create event loop to run async method. The solution is to having a wrapper that will make a sync transport from any kind of transport and expose the interface to be called from outside world.

Some code snippets of how I think it will work.

For the interface, I provide the protocol so both async and sync class can work with it.

from typing import Protocol, runtime_check

@runtime_check
class Transport(Protocol):

    def open():
        ...

    def mkdir():
        ...

    def copy():
        ...

The synchronous and async transport plugins will be

class BlockingSshTransport:

    def open():
        paramiko.open()

    def mkdir():
        paramiko.mkdir()

    def copy():
        paramiko.copy()


class AsyncSshTransport:

    async def open():
        await asyncssh.open()

    async def mkdir():
        await asyncssh.mkdir()

    async def copy():
        await asyncssh.copy()

both transport object conform with Transport protocol and can be checked by isinstance(trans_obj, Transport).

In execmanager module, take upload_calculation function as example, it can be changed to an async function after the plumpy PR I mentioned above.
Inside, it has a await transport.copy(), to make it can be called for sync transport, I introduce following async transport proxy.

import asyncio
from functools import wraps

class AsyncProxy:
    def __init__(self, sync_instance):
        self._sync_instance = sync_instance

    def __getattr__(self, name):
        attr = getattr(self._sync_instance, name)
        
        # If it's not callable, return as is.
        if not callable(attr):
            return attr

        # If the attribute is already an async function, return as is.
        if asyncio.iscoroutinefunction(attr):
            return attr
        
        # Wrap the callable in an asynchronous function.
        @wraps(attr)
        async def async_wrapper(*args, **kwargs):
            return attr

        # XXX: loop.run_in_executor to use threading pool?? will have performance boost even for sync transport with python 3.13.
        
        return async_wrapper


async_transport = AsyncProxy(sync_transport)   # this will be convert right after get the transport internally for execmanager.
async def upload_calculation(transport=async_transport):
    ...
    await transport.copy(..)
    ...

In the opposite, when calling the transport from outside of aiida-core, it was always assuming the method is called in the synchronous way.
Take an exmaple in aiida-quantumespresso where the pwimmigrant.py (in fact the only place where this happened) module will call transport.get().
To make it works, we need the transport passed to the call is a synchronous one.
The only API provide in aiida-core to get the transport is authinfo.py::AuthInfo.get_transport, we can then add get_sync_transport() that can convert a async transport to sync one.
If we want to keep the compatibility for outside the aiida-core, we can have get_transport always return the sync transport, and having a get_async_trasport that will to the AsyncProxy converting.

For the SyncProxy,

import asyncio

class SyncProxy:
    def __init__(self, async_instance, loop):
        self._async_instance = async_instance
        # the loop is better to be set rather than just `async.get_event_loop()` as in https://github.com/aiidateam/aiida-core/pull/6626
        self._loop = loop  # Store the provided event loop

    def __getattr__(self, name):
        attr = getattr(self._async_instance, name)

        # If attribute is not callable, return it.
        if not callable(attr):
            return attr

        # If attribute is an async function, wrap it to run synchronously using the given loop.
        if asyncio.iscoroutinefunction(attr):
            def sync_wrapper(*args, **kwargs):
                # Use the provided loop to run the coroutine until complete.
                # We use nest_asyncio in aiida-core, so this is fine.
                return self._loop.run_until_complete(attr(*args, **kwargs))
            return sync_wrapper

        # Otherwise return the attribute directly.
        return attr


# in authinfo.py
class AuthInfo():

    ...

    def get_transport(self, loop) -> 'Transport':
        """Return a fully configured transport that can be used to connect to the computer set for this instance."""
        ...
        transport = transport_class(machine=computer.hostname, **self.get_auth_params())
        return SyncProxy(transport, loop)

To summary, if what I proposed above can work, it can cutdown the lines of changes needed to support both type of transport.
It then provide the only one source of contract to be implemented.
It is clear the transport that used is async or sync by wrapping it through Proxy class and used in in the correct manner.
It has a single place to control which event loop to run the async function when it needs to be run in synchrouns manner.
As a bonus, when I writing the pseudo code for AsyncProxy above, I realize the sync function might be okay to run with loop.run_in_executor which uses the threadpool by default and it may solve the thread blocking problem when it comes to transport interaction by letting the operating system to manage scheduling threads.

Pining @danielhollas @giovannipizzi who were involved in the discussion during coding week. Do you see any problem with this design?

@unkcpz unkcpz added the type/feature request status undecided label Jan 8, 2025
@agoscinski
Copy link
Contributor

We (@unkcpz @khsrali and me) discussed this issue in person. We basically concluded that this design provides more flexibility in usage but this flexibility comes has no additional use case, so the additional complexity through this design is not worth to follow.

To elaborate a but, one could use the transport in this case more generic

def foo(transport: Transport):
    """This function can act on sync and async transport plugins."""
    with transport.open():
        ...

# In some part of the code
transport = get_sync_transport()
foo(transport)
...
# In some other part of the code
async_transport = get_async_transport()
foo(async_transport)

However the implementation of a function is typically very different when it is implemented concurrently. Therefore it is unlikely that this additional flexibility will be of any benefit. On the other hand overwriting the __getattr__ makes the usage less transparent for the user. Especially, in this case we are executing a wrapped function instead of the actual function. The multiple wrapping also adds complexity in understanding the function that introduces more maintenance costs.

@unkcpz
Copy link
Member Author

unkcpz commented Jan 10, 2025

Especially, in this case we are executing a wrapped function instead of the actual function.

This is true.

The other small benefit is when put event loop set in one place it shows we can possibly make sync ssh job run in multi-threading. But this is out the scope of current implementation and it can be done with implement with duplicating the change for every transport operation.

        # If attribute is an async function, wrap it to run synchronously using the given loop.
        if asyncio.iscoroutinefunction(attr):
            def sync_wrapper(*args, **kwargs):
                # Use the provided loop to run the coroutine until complete.
                # We use nest_asyncio in aiida-core, so this is fine.
                return self._loop.run_until_complete(attr(*args, **kwargs))
            return sync_wrapper

        # Otherwise return the attribute directly.
        return attr

I think in the end the direct "user" of the API is @khsrali for supporting asyncssh and firecrest. If he think this API is hard to follow then there is no strong reason to go.

Thanks for discussion @agoscinski @khsrali.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature request status undecided
Projects
None yet
Development

No branches or pull requests

2 participants