diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 56baeb9237..e5311e3830 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -87,7 +87,7 @@ async def transport_task(transport_queue, authinfo): transport = authinfo.get_transport() # safe_open_interval = transport.get_safe_open_interval() - safe_open_interval = 15 + safe_open_interval = 30 # Check here if last_open_time > safe_interval, one could immediately open the transport # This should be the very first request, after a while @@ -98,7 +98,7 @@ def do_open(): _LOGGER.debug('Transport request opening transport for %s', authinfo) try: transport.open() - self._last_open_time = datetime.now() + self._last_open_time = timezone.localtime(timezone.now()) except Exception as exception: _LOGGER.error('exception occurred while trying to open transport:\n %s', exception) transport_request.future.set_exception(exception) @@ -114,45 +114,34 @@ def do_open(): # to this handle would otherwise keep the Process context (and thus the process itself) in memory. # See https://github.com/aiidateam/aiida-core/issues/4698 - # if self._last_request_special: - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # self._last_request_special = False + # First request, submit immediately + # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? + if self._last_close_time is None: + open_callback_handle = self._loop.call_soon(do_open, context=contextvars.Context()) + self._last_request_special = True - # # First request, submit immediately - # # ? Are these attributes persistet, or is a new TransportQueue instance created for every transport task? - # if self._last_close_time is None: - # open_callback_handle = self._loop.call_later(1, do_open, context=contextvars.Context()) - # self._last_request_special = True + elif self._last_request_special: + open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) + self._last_request_special = False - # # self._last_close_time = datetime.strptime(self._last_close_time, '%Y-%m-%dT%H:%M:%S.%f%z') - # else: - # timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() + else: + timedelta_seconds = (timezone.localtime(timezone.now()) - self._last_close_time).total_seconds() - # if timedelta_seconds > safe_open_interval: - # # ! This could also be `_loop.call_soon` which has an implicit delay of 0s + if timedelta_seconds > safe_open_interval: + # ! This could also be `_loop.call_soon` which has an implicit delay of 0s - # open_callback_handle = self._loop.call_later(timedelta_seconds-safe_open_interval, do_open, context=contextvars.Context()) - # self._last_request_special = True + open_timedelta = timedelta_seconds-safe_open_interval + open_callback_handle = self._loop.call_later(open_timedelta, do_open, context=contextvars.Context()) + self._last_request_special = True - # else: - # # If the last one was a special request, wait the safe_open_interval - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - - # if self._last_request_special: - - # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # self._last_request_special = False - - # else: - # open_callback_handle = self._loop.call_later(timedelta_seconds, do_open, context=contextvars.Context()) - # self._last_request_special = True - # open_callback_handle = self._loop.call_later(5, do_open, context=contextvars.Context()) + else: + # If the last one was a special request, wait the safe_open_interval + open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) try: transport_request.count += 1 - self._last_submission_time = datetime.now() yield transport_request.future except asyncio.CancelledError: # note this is only required in python<=3.7, @@ -169,21 +158,26 @@ def do_open(): if transport_request.count == 0: if transport_request.future.done(): - # if (datetime.now() - self._last_open_time).total_seconds() > 5: + def do_close(): + """Close the transport if conditions are met.""" + transport_request.future.result().close() + # self._last_close_time = timezone.localtime(timezone.now()) - # def close_transport(): - # """Close the transport if conditions are met.""" - # transport_request.future.result().close() + close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() + + if close_timedelta > safe_open_interval: # Also here logic when transport should be closed immediately, or when via call_later? - # close_callback_handle = self._loop.call_later(5, close_transport, context=contextvars.Context()) + close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) + self._last_close_time = timezone.localtime(timezone.now()) + else: + close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) + # transport_request.transport_closer = close_callback_handle # This should be replaced with the call_later close_callback_handle invocation - transport_request.future.result().close() - # When storing in `AuthInfo`, had to convert to str to be storeable in the DB - # self._last_close_time = timezone.localtime(timezone.now()).strftime('%Y-%m-%dT%H:%M:%S.%f%z') - self._last_close_time = timezone.localtime(timezone.now()) + # transport_request.future.result().close() + elif open_callback_handle is not None: open_callback_handle.cancel()