test(connection): add regression tests for MULTI/EXEC tx-queue stall#7273
test(connection): add regression tests for MULTI/EXEC tx-queue stall#7273
Conversation
…7272) Two new tests reproduce the incident where a MULTI/EXEC reply blocked on send holds the shard lock, stalling other transactions: - test_multi_exec_stuck_send_stalls_tx: verifies that a clogger connection whose 1 MB EXEC reply fills the TCP send buffer holds the shard lock and blocks a competing MULTI/SET; send_timeout=3 disconnects the clogger and unblocks the queue. - test_multi_exec_phantom_connections: reproduces the CLIENT LIST diagnostic signal from the incident — ghost connections that RST-close while their main fiber is stuck in ScheduleSingleHop appear with addr=0.0.0.0 and phase=process, matching the production snapshot. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
🤖 Augment PR SummarySummary: Adds regression coverage for issue #7272, where a blocked MULTI/EXEC reply can stall other transactions by holding a shard lock. Changes:
Technical Notes: These tests intentionally induce TCP backpressure (client not reading) to force server-side 🤖 Was this summary useful? React with 👍 or 👎 |
|
|
||
| await check_phantoms() | ||
|
|
||
| flood_task.cancel() |
There was a problem hiding this comment.
This test doesn’t ensure cleanup on failure: flood_task.cancel() isn’t awaited and writer.close() isn’t followed by await writer.wait_closed(), which can leave a flooding task/transport running and make later tests flaky. Consider ensuring task cancellation and stream closure happen in a finally and are awaited.
Severity: medium
Other Locations
tests/dragonfly/connection_test.py:2312tests/dragonfly/connection_test.py:2313
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| # ScheduleSingleHop on the shard lock held by the clogger. | ||
| def recv_line(s): | ||
| buf = b"" | ||
| while not buf.endswith(b"\r\n"): |
There was a problem hiding this comment.
recv_line() can hang indefinitely if recv() returns b"" (peer closed) or if the server never sends a \r\n-terminated line, because the loop never breaks. Consider adding an EOF check and a socket timeout so the test fails fast instead of stalling CI.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
Pull request overview
Adds Python integration tests to reproduce and detect the MULTI/EXEC tx-queue stall incident described in #7272, focusing on blocked socket sends during EXEC and the resulting “phantom” connections in CLIENT LIST.
Changes:
- Add
test_multi_exec_phantom_connectionsto reproduceaddr=0.0.0.0+phase=processphantom connections when a clogger holds the shard lock and ghosts RST-close. - Add
test_multi_exec_stuck_send_stalls_txto reproduce a shard-lock stall caused by a blocked send during EXEC, and verify recovery viasend_timeout.
| def recv_line(s): | ||
| buf = b"" | ||
| while not buf.endswith(b"\r\n"): | ||
| buf += s.recv(256) | ||
| return buf | ||
|
|
||
| ghosts = [] | ||
| for i in range(3): | ||
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger_rst) | ||
| s.connect(("127.0.0.1", df_server.port)) | ||
| s.sendall(f"CLIENT SETNAME ghost{i}\r\n".encode()) | ||
| recv_line(s) # +OK | ||
| s.sendall(b"MULTI\r\n") | ||
| recv_line(s) # +OK | ||
| s.sendall(b"SET key v\r\n") | ||
| recv_line(s) # +QUEUED |
| flood_task = asyncio.create_task(flood()) | ||
|
|
||
| @assert_eventually(times=600) | ||
| async def wait_clogger_stuck(): | ||
| clients = await admin.client_list() | ||
| v = next((c for c in clients if c.get("name") == "clogger"), None) | ||
| assert v is not None and v["phase"] == "send" | ||
|
|
||
| await wait_clogger_stuck() |
| writer.write(b"CLIENT SETNAME clogger\r\n") | ||
| await writer.drain() | ||
| await reader.readline() # +OK | ||
|
|
||
| async def flood(): | ||
| cmd = b"MULTI\r\nGET mkey\r\nEXEC\r\n" | ||
| while True: | ||
| try: | ||
| writer.write(cmd) | ||
| await writer.drain() | ||
| except Exception: | ||
| break | ||
|
|
||
| flood_task = asyncio.create_task(flood()) | ||
|
|
||
| @assert_eventually(times=600) | ||
| async def wait_clogger_in_send(): | ||
| clients = await admin.client_list() | ||
| v = next((c for c in clients if c.get("name") == "clogger"), None) | ||
| assert v is not None and v["phase"] == "send" | ||
|
|
||
| await wait_clogger_in_send() | ||
|
|
||
| # A competing MULTI/SET on the same key must block: the shard lock is still | ||
| # held by the clogger's EXEC handler suspended inside Send(). | ||
| other = df_server.client() | ||
| pipe = other.pipeline(transaction=True) | ||
| pipe.set("mkey", "new") | ||
| compete = asyncio.create_task(pipe.execute()) | ||
|
|
||
| await asyncio.sleep(0.5) | ||
| assert not compete.done(), "Competing MULTI/EXEC should be stuck waiting for the shard lock" | ||
|
|
||
| # send_timeout=3 shuts down the clogger socket via ShutdownSelfBlocking(), which | ||
| # fails the blocked Write(), resumes the fiber, calls UnlockMulti(), and lets | ||
| # the competing transaction proceed. |
Replace the send_timeout=3 mechanism with an explicit RST-close via transport.abort() (SO_LINGER=0 already set). transport.abort() skips asyncio's write-buffer flush, so the RST fires immediately even with a large flood backlog. Both tests now work without any server-side timeout configuration, making the blocked state observable for as long as needed. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Extend CLIENT LIST with two finer-grained phases to make the issue-#7272 stall scenario immediately identifiable: - phase=send (pre-existing): connection fiber is blocked inside SinkReplyBuilder::Send() waiting for the socket write to drain. - phase=scheduled (new): coordinator fiber is blocked in run_barrier_.Wait() inside Transaction::Execute(), waiting for a shard callback to complete. This state covers the window where a transaction has been dispatched to shards but has not yet received the run-barrier signal — e.g. when another transaction holds an exclusive shard lock and the newcomer is queued behind it. Implementation mirrors SinkReplyBuilder::send_time_ns_: - Transaction::schedule_time_ns_ is set just before run_barrier_.Wait() and cleared immediately after; read lock-free by CLIENT LIST (same benign-race pattern as send_time_ns_). - IsWaitingOnShard() exposes the flag. - Service::GetContextInfo() populates ContextInfo::is_scheduled. - Connection::GetClientInfoBeforeAfterTid() overrides phase_name when is_scheduled is true (lower priority than "send"). Also add two regression/reproduction tests for #7272: - test_multi_exec_phantom_connections: ghosts show addr=0.0.0.0 + phase=scheduled, matching the production CLIENT LIST snapshot. - test_multi_exec_stuck_send_stalls_tx: competing MULTI/SET is stalled until the clogger is RST-closed. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
The stall test was a subset of the phantom test: both set up the same clogger scenario, and phase=scheduled already implies the shard lock is held. Merge the recovery assertion (competing MULTI/SET unblocks after RST-close) directly into test_multi_exec_phantom_connections and remove the now-redundant test_multi_exec_stuck_send_stalls_tx. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Summary
Adds two tests reproducing the production incident from issue #7272, where a MULTI/EXEC reply blocked on send holds the shard lock and stalls other transactions.
test_multi_exec_stuck_send_stalls_tx: clogger floodsMULTI/GET <1 MB key>/EXECwithout reading, filling the TCP send buffer and blockingSend()mid-EXEC while the shard lock is held; a competingMULTI/SETon the same key is stalled untilsend_timeout=3disconnects the cloggertest_multi_exec_phantom_connections: reproduces theCLIENT LISTdiagnostic signal from the incident — ghost connections that RST-close while their main fiber is stuck inScheduleSingleHopappear withaddr=0.0.0.0andphase=process, matching the production snapshot exactlyRoot cause (documented)
A 1 MB GET reply inside EXEC overflows the 8 KB reply buffer, triggering
Send()→Write()beforeUnlockMulti()is called. With the client's TCP receive window full, the fiber suspends insideWrite()while still holding the shard lock. Other connections attemptingScheduleSingleHopon the same keys block indefinitely.Mitigation:
--send_timeout=N(used in production assend_timeout=60) callsShutdownSelfBlocking()on the stuck connection, which fails the blockedWrite(), resumes the fiber, and allowsUnlockMulti()to proceed.Fixes #7272