-
Notifications
You must be signed in to change notification settings - Fork 235
Implement dynamic safe_interval for TransportQueue to reduce wait times #7145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add last_close_times tracking dict to TransportQueue - Calculate wait_interval based on time since last transport close - Open immediately if no previous close or safe_interval has passed - Wait only remaining time if some time has already passed - Update last_close_time when transport is closed - Add test for dynamic safe interval behavior Co-authored-by: khsrali <[email protected]>
- Remove trailing whitespace from docstring - Move time import to top level - Remove duplicate import statements in tests - Update class docstring to explain dynamic wait behavior Co-authored-by: khsrali <[email protected]>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7145 +/- ##
===========================================
- Coverage 79.61% 29.18% -50.42%
===========================================
Files 566 566
Lines 43572 43537 -35
===========================================
- Hits 34684 12703 -21981
- Misses 8888 30834 +21946 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements dynamic safe_interval calculation for the TransportQueue to reduce unnecessary wait times before transport operations. The enhancement tracks when each transport connection was last closed and calculates wait intervals dynamically, opening connections immediately when sufficient time has passed since the last close, rather than imposing a fixed wait on every request.
Key Changes:
- Added per-authinfo tracking of last close times to enable dynamic wait calculation
- Modified wait interval logic to open immediately for first requests or when safe_interval has elapsed
- Added comprehensive test coverage for the new dynamic behavior
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| src/aiida/engine/transports.py | Implements dynamic safe_interval by tracking last close times in _last_close_times dict and calculating wait intervals based on time elapsed since last close |
| tests/engine/test_transport.py | Adds test_dynamic_safe_interval to verify immediate opening on first request and after safe_interval has elapsed; moves import time to module level |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """:param loop: An asyncio event, will use `asyncio.get_event_loop()` if not supplied""" | ||
| self._loop = loop if loop is not None else asyncio.get_event_loop() | ||
| self._transport_requests: Dict[Hashable, TransportRequest] = {} | ||
| self._last_close_times: Dict[Hashable, float] = {} |
Copilot
AI
Dec 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _last_close_times dictionary will grow unbounded as new authinfo instances are used over time. This could lead to a memory leak in long-running daemon processes that work with many different computers/authinfo objects. Consider implementing a cleanup mechanism, such as removing entries after they become stale (e.g., older than a configurable threshold like 24 hours), or using an LRU cache with a maximum size to automatically evict old entries.
| # Calculate the actual wait time based on when the transport was last closed | ||
| last_close_time = self._last_close_times.get(authinfo.pk, None) | ||
| current_time = time.time() | ||
|
|
||
| if last_close_time is None: | ||
| # Never opened before, open immediately | ||
| wait_interval = 0 | ||
| else: | ||
| time_since_last_close = current_time - last_close_time | ||
| if time_since_last_close >= safe_open_interval: | ||
| # Enough time has passed, open immediately | ||
| wait_interval = 0 | ||
| else: | ||
| # Not enough time has passed, wait for the remaining time | ||
| wait_interval = safe_open_interval - time_since_last_close |
Copilot
AI
Dec 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dynamic wait interval calculation causes the existing test_safe_interval test to fail. The old test expects each successive transport request to wait for the full safe_interval, but with this implementation, the first request opens immediately (wait_interval = 0). This breaks the test's assumption that iteration 0 should take at least safe_interval time. The test_safe_interval test was designed to verify that the safe_interval is respected between successive requests, and this change fundamentally alters that behavior. Consider whether the test should be updated to reflect the new behavior, or if there's a way to make both behaviors work correctly.
| trans = await request | ||
| time_elapsed = time.time() - time_start | ||
| # Should open immediately or very quickly | ||
| assert time_elapsed < 0.1, f'First transport took too long to open: {time_elapsed}s' |
Copilot
AI
Dec 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hard-coded time threshold of 0.1 seconds for asserting immediate opening could be flaky on heavily loaded systems or slow CI runners. Consider using a more generous threshold (e.g., 0.2 seconds) or making it configurable to improve test reliability across different environments.
| trans = await request | ||
| time_elapsed = time.time() - time_start | ||
| # Should wait approximately the safe interval since not enough time has passed | ||
| assert time_elapsed >= 0.4, f'Second transport opened too quickly: {time_elapsed}s' |
Copilot
AI
Dec 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time threshold of 0.4 seconds (for a 0.5s safe interval) provides only a 0.1s margin, which could lead to flaky test failures on slow systems. Consider using a more generous lower bound (e.g., 0.35 seconds) to improve test reliability.
| assert time_elapsed >= 0.4, f'Second transport opened too quickly: {time_elapsed}s' | |
| assert time_elapsed >= 0.35, f'Second transport opened too quickly: {time_elapsed}s' |
| trans = await request | ||
| time_elapsed = time.time() - time_start | ||
| # Should open immediately since safe interval has passed | ||
| assert time_elapsed < 0.1, f'Third transport took too long to open: {time_elapsed}s' |
Copilot
AI
Dec 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hard-coded time threshold of 0.1 seconds for asserting immediate opening could be flaky on heavily loaded systems or slow CI runners. Consider using a more generous threshold or making it configurable to improve test reliability.
Quick calculations (e.g., 10s runtime) experienced 120s overhead due to fixed 30s
safe_intervalwaits before each transport operation (upload, submit, retrieve, final retrieve). This resulted in 4×30s = 120s of unnecessary waiting.Changes
TransportQueue._last_close_timesdictwait = max(0, safe_interval - time_since_close)Implementation
Impact
For a 10s calculation with default
safe_interval=30:State is maintained per daemon worker (not in database) to avoid multi-worker synchronization overhead. Backward compatible: existing behavior preserved when requests occur within safe_interval.
Original prompt
This section details on the original issue you should resolve
<issue_title>Make
safe_intervalmore dynamic for quick transport tasks</issue_title><issue_description>As realized together with @giovannipizzi while debugging things for our new cluster at PSI: When submitting a simple calculation (execution takes about 10s) for testing purposes, with the default
safe_interval=30in theComputerconfiguration, one has to wait an additional 90s until the job is done (30s for theupload,submit, andretrievetasks, each). This is to be expected, of course, and one could just reduce thesafe_interval(albeit increasing the risk of SSH overloads).However, the
uploadtask in that case is truly the firstTransporttask that is being executed by the daemon worker, so it could, in principle, enter immediately (the same if jobs were run previously, but longer ago than thesafe_interval). I locally implemented a first version (thanks to @giovannipizzi's input) that does this, by adding alast_close_timeattribute (currently added to theauthinfometadata for a first PoC). In therequest_transportmethod of theTransportQueue, the time difference between the current time and thelast_close_timeis then checked, and if it is larger thansafe_interval, theTransportis opened immediately via:bypassing the
safe_interval(orsafe_open_intervalas it is called intransports.py).In addition, the waiting times for the
submitandretrievetasks could also be reduced. It seems like currently, thesafe_intervalis imposed on all of them, even if they finish very quickly (I assume as all open a transport connection via SSH). So we were thinking if it's possible to make this a bit more sophisticated, e.g. by adding special transport requests, that could make use of the open transport, and keep a transport of which the task has finished open for a short time longer (also quickly discussed with @mbercx). Of course, one would still need to make sure SSH doesn't get overloaded, the implementation works with heavy loads (not just individual testing calculations), and one would also have to consider how this all works with multiple daemon workers. Again with @giovannipizzi, I had a quick look, but it seems like the implementation would be a bit more involved. So wondering what the others think, if this is feasible and worth investigating more time into. Pinging @khsrali who has looked a bit more into transports.</issue_description>Comments on the Issue (you are @copilot in this section)
@giovannipizzi Thanks for the nice write-up @GeigerJ2 ! Just some minor additional comments/clarificationsactually even for a millisecond run, the time to wait is 120s (or generally 4 times the safe interval), rather than 3x (90s):
(and I guess one adds another 30s if there is also stashing involved)
The initial implementation keeps the time in the metadata of the authinfo, but already while discussing with Julian, we realized it's better not to put it there, as this is shared by all daemon workers, and could lead to wrong results, collisions and exceptions when multiple write the same DB row, etc. - better to just keep in another local attribute
self.last_close_time, parallel toself._transport_requests. On the other hand, I just realize that if you are running from a local interpreter, and maybe submittingrun()from a bash "for" loop (e.g. ofverdi runcommands), this might bypass the limit as all of them will think that nothing was submitted before. But probably this is OK with the current implementation? Fixing it properly would require making the whole concept of a safe_interval not specific to a worker, but global to a AiiDA profile.In the implementation discussed above, in addition to setting the first parameter of
call_laterto zero if more thansafe_intervalseconds passed from the last call, I would also set the waiting time to the differencecurrent_time - last_close_time, so e.g. you only wait 10 seconds if you closed the transport 20 seconds ago.The points above solve the waiting of the first 30 seconds. For the other 3x30 seconds, the idea is that probably in this case the connection was just closed less than a second before, i.e. the time for AiiDA to change state. If we could keep the connection open for a configurable time after the last command (say with a default of 5 or 10 seconds), a full single submission could go down to probably just < ...
safe_intervalmore dynamic for quick transport tasks #6544💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.