-
Notifications
You must be signed in to change notification settings - Fork 235
Fix a race condition in TransportQueue, and set semaphore on exec_command_wait_async
#7144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #7144 +/- ##
==========================================
- Coverage 79.61% 79.60% -0.00%
==========================================
Files 566 566
Lines 43572 43580 +8
==========================================
+ Hits 34684 34687 +3
- Misses 8888 8893 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
FAILED tests/engine/test_transport.py::TestTransportQueue::test_new_request_during_close_gets_fresh_transport - AssertionError: Each sequential request should get a fresh transport might be flaky EDIT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical race condition in TransportQueue and adds semaphore control to exec_command_wait_async to prevent SSH connection overwhelm. The race condition occurred when async transport closure yielded to the event loop (via nest_asyncio in plumpy), allowing new tasks to receive references to transports being closed. The fix ensures transport requests are removed from the dictionary before calling close(), preventing this interleaving.
- Moved
_transport_requests.pop()to execute beforetransport.close()in TransportQueue to fix race condition - Added semaphore wrapping to
exec_command_wait_asyncto limit concurrent SSH subchannels - Added comprehensive regression tests for both the race condition and semaphore behavior
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/aiida/engine/transports.py | Moved transport request removal before close() call to fix race condition where new tasks could get closing transports |
| src/aiida/transports/plugins/ssh_async.py | Added semaphore control to exec_command_wait_async and error handling to close_async |
| tests/engine/test_transport.py | Added regression tests verifying transport request removal ordering and fresh transport creation |
| tests/transports/test_asyncssh_plugin.py | Restructured into test class with tests for semaphore release after errors and concurrent operation limiting |
| src/aiida/schedulers/scheduler.py | Removed redundant transport context manager (transport already opened per docstring) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
6eeae1b to
bf24acc
Compare
agoscinski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the first bug could also happen without nest_asyncio, the usage of nest_asyncio makes such bugs more likely to happen, since we have now async calls in functions that are completely sync, so not marked by any concurrent execution.
I think the PR and tests are really good, just minor comments to improve the understanding of the bug. As discussed, the PR will be split into two commits since two bugs are fixed.
| # 1. close() is called, which for AsyncTransport uses run_until_complete() | ||
| # 2. With nest_asyncio (used by plumpy), this can yield to the event loop | ||
| # 3. Another task might enter and get the same transport_request | ||
| # 4. That task tries to use the transport that's being closed -> error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tried to improve the message by being more verbose, not sure, you can adapt it as you whish
| # 1. close() is called, which for AsyncTransport uses run_until_complete() | |
| # 2. With nest_asyncio (used by plumpy), this can yield to the event loop | |
| # 3. Another task might enter and get the same transport_request | |
| # 4. That task tries to use the transport that's being closed -> error | |
| # 1. close() is called, which for AsyncTransport uses run_until_complete(close_async) | |
| # 2. With nest_asyncio (used by plumpy), this call yields back to the event loop | |
| # 3. The event loop schedules close_async, then continues running a other tasks - for example one that requests the transport which is scheduled to be closed | |
| # 4. The task now using the transport to do some operation awaits, next the close_async task closes the transport while still in use -> error |
Fix #7119
This PR:
TransportQueueexec_command_wait_async, to limit the failures