diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index ca8dbf2d72..938054058b 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -54,6 +54,7 @@ class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" + async def get_transport(authinfo, transport_queue, cancellable): transport_requests = transport_queue._transport_requests last_transport_request = transport_requests.get(authinfo.pk, None) @@ -72,6 +73,7 @@ async def get_transport(authinfo, transport_queue, cancellable): else: transport_queue._last_request_special = True + async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. @@ -160,7 +162,6 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca authinfo = node.get_authinfo() async def do_submit(): - transport = get_transport(authinfo=authinfo, transport_queue=transport_queue, cancellable=cancellable) print('a') diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 24b3d97435..f9cb9f471f 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -13,7 +13,6 @@ import contextvars import logging import traceback -from datetime import datetime from typing import TYPE_CHECKING, Awaitable, Dict, Hashable, Iterator, Optional from aiida.common import timezone @@ -32,6 +31,7 @@ def __init__(self): self.future: asyncio.Future = asyncio.Future() self.count = 0 + class TransportCloseRequest: """Information kept about close request for a transport object""" @@ -39,6 +39,7 @@ def __init__(self): self.future: asyncio.Future = asyncio.Future() self.count = 0 + class TransportQueue: """A queue to get transport objects from authinfo. This class allows clients to register their interest in a transport object which will be provided at @@ -140,15 +141,17 @@ def do_open(): else: # If the last one was a special request, wait the difference between safe_open_interval and lost - open_callback_handle = self._loop.call_later(safe_open_interval-open_timedelta, do_open, context=contextvars.Context()) + open_callback_handle = self._loop.call_later( + safe_open_interval - open_timedelta, do_open, context=contextvars.Context() + ) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # ? This logic is implemented in `tasks.py` instead. + # ? This logic is implemented in `tasks.py` instead. # else: # transport = authinfo.get_transport() # return transport - # If transport_request is open already + # If transport_request is open already try: transport_request.count += 1 yield transport_request.future @@ -172,18 +175,19 @@ def do_close(): transport_request.future.result().close() self._last_close_time = timezone.localtime(timezone.now()) - close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() + close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() if close_timedelta > safe_open_interval: - # Also here logic when transport should be closed immediately, or when via call_later? close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) self._last_close_time = timezone.localtime(timezone.now()) self._transport_requests.pop(authinfo.pk, None) else: - close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) + close_callback_handle = self._loop.call_later( + safe_open_interval, do_close, context=contextvars.Context() + ) self._transport_requests.pop(authinfo.pk, None) - + # transport_request.transport_closer = close_callback_handle # This should be replaced with the call_later close_callback_handle invocation