From b3314fa70bc7bd6ceda5b9ef040e1ac51fb7d3cb Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 8 May 2026 15:18:13 +0300 Subject: [PATCH 1/5] test(connection): add regression tests for MULTI/EXEC tx-queue stall (#7272) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new tests reproduce the incident where a MULTI/EXEC reply blocked on send holds the shard lock, stalling other transactions: - test_multi_exec_stuck_send_stalls_tx: verifies that a clogger connection whose 1 MB EXEC reply fills the TCP send buffer holds the shard lock and blocks a competing MULTI/SET; send_timeout=3 disconnects the clogger and unblocks the queue. - test_multi_exec_phantom_connections: reproduces the CLIENT LIST diagnostic signal from the incident — ghost connections that RST-close while their main fiber is stuck in ScheduleSingleHop appear with addr=0.0.0.0 and phase=process, matching the production snapshot. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- tests/dragonfly/connection_test.py | 153 +++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index f17571eabf76..82275c6521e2 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -2160,6 +2160,159 @@ async def flood(): await writer.wait_closed() +@dfly_args({"proactor_threads": 1}) +async def test_multi_exec_phantom_connections(df_server: DflyInstance): + """Reproduce the addr=0.0.0.0 phantom connections from issue #7272. + + Clogger floods MULTI/GET <1 MB key>/EXEC without reading, blocking Send() mid-EXEC + while holding the shard lock. Ghost connections send MULTI/SET/EXEC one command at a + time (sync-dispatch mode) then RST-close. The main connection fiber is stuck in + ScheduleSingleHop with no separate read fiber, so the io_uring RST event goes + unprocessed: the kernel moves the socket to TCP_CLOSE (addr=0.0.0.0) while + phase stays "process" — matching the production CLIENT LIST snapshot. + """ + import struct + + linger_rst = struct.pack("ii", 1, 0) + admin = df_server.client() + await admin.set("key", "v" * 1024 * 1024) + + # Clogger: floods MULTI/GET/EXEC without reading — fills the TCP send buffer, + # blocking Send() mid-EXEC while the shard lock on "key" is still held. + reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) + writer.get_extra_info("socket").setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger_rst) + writer.write(b"CLIENT SETNAME clogger\r\n") + await writer.drain() + await reader.readline() # +OK + + async def flood(): + cmd = b"MULTI\r\nGET key\r\nEXEC\r\n" + while True: + try: + writer.write(cmd) + await writer.drain() + except Exception: + break + + flood_task = asyncio.create_task(flood()) + + @assert_eventually(times=600) + async def wait_clogger_stuck(): + clients = await admin.client_list() + v = next((c for c in clients if c.get("name") == "clogger"), None) + assert v is not None and v["phase"] == "send" + + await wait_clogger_stuck() + + # Ghost connections: preamble commands sent+ack'd one-by-one (sync-dispatch, + # tot-dispatches stays 0), then EXEC fired without reading. EXEC blocks in + # ScheduleSingleHop on the shard lock held by the clogger. + def recv_line(s): + buf = b"" + while not buf.endswith(b"\r\n"): + buf += s.recv(256) + return buf + + ghosts = [] + for i in range(3): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger_rst) + s.connect(("127.0.0.1", df_server.port)) + s.sendall(f"CLIENT SETNAME ghost{i}\r\n".encode()) + recv_line(s) # +OK + s.sendall(b"MULTI\r\n") + recv_line(s) # +OK + s.sendall(b"SET key v\r\n") + recv_line(s) # +QUEUED + s.sendall(b"EXEC\r\n") # blocked on shard lock; do not read reply + ghosts.append(s) + + await asyncio.sleep(0.1) # let EXEC bytes reach server's receive buffer + + # RST-close: kernel moves server-side sockets to TCP_CLOSE. + # getpeername() → ENOTCONN → addr=0.0.0.0 in CLIENT LIST. + for s in ghosts: + s.close() + + @assert_eventually(times=50) + async def check_phantoms(): + clients = await admin.client_list() + phantoms = [ + c + for c in clients + if c.get("addr", "").startswith("0.0.0.0") and c.get("phase") == "process" + ] + logging.info("phantoms: %s", [(c["name"], c["addr"], c["phase"]) for c in phantoms]) + assert len(phantoms) >= 3, f"got {[(c['addr'], c['phase']) for c in clients]}" + + await check_phantoms() + + flood_task.cancel() + writer.close() + + +@dfly_args({"proactor_threads": 1, "send_timeout": 3}) +async def test_multi_exec_stuck_send_stalls_tx(df_server: DflyInstance): + """Reproduce issue #7272: MULTI/EXEC reply blocked on send holds the shard lock, + stalling concurrent transactions on the same keys. + + A 1 MB value causes each GET reply to overflow the 8 KB reply buffer, so + Send() fires during EXEC execution — before UnlockMulti(). With the socket + send-buffer full (client not reading), the server fiber suspends inside + Send() while still holding the shard lock. A second MULTI/SET on the same + key must block until send_timeout disconnects the clogger. + """ + admin = df_server.client() + await admin.set("mkey", "v" * 1024 * 1024) + + reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) + writer.write(b"CLIENT SETNAME clogger\r\n") + await writer.drain() + await reader.readline() # +OK + + async def flood(): + cmd = b"MULTI\r\nGET mkey\r\nEXEC\r\n" + while True: + try: + writer.write(cmd) + await writer.drain() + except Exception: + break + + flood_task = asyncio.create_task(flood()) + + @assert_eventually(times=600) + async def wait_clogger_in_send(): + clients = await admin.client_list() + v = next((c for c in clients if c.get("name") == "clogger"), None) + assert v is not None and v["phase"] == "send" + + await wait_clogger_in_send() + + # A competing MULTI/SET on the same key must block: the shard lock is still + # held by the clogger's EXEC handler suspended inside Send(). + other = df_server.client() + pipe = other.pipeline(transaction=True) + pipe.set("mkey", "new") + compete = asyncio.create_task(pipe.execute()) + + await asyncio.sleep(0.5) + assert not compete.done(), "Competing MULTI/EXEC should be stuck waiting for the shard lock" + + # send_timeout=3 shuts down the clogger socket via ShutdownSelfBlocking(), which + # fails the blocked Write(), resumes the fiber, calls UnlockMulti(), and lets + # the competing transaction proceed. + result = await asyncio.wait_for(compete, timeout=15.0) + assert result == [True] + + info = await admin.info("clients") + assert int(info["timeout_disconnects"]) >= 1 + + assert await admin.ping() + flood_task.cancel() + writer.close() + + async def test_blocking_command_close_eof(df_server: DflyInstance): """Server must drop a connection that is parked on a blocking command when the client closes its socket. From 5cd525bff294aa75deebe35ebdae8d571ce20717 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 8 May 2026 16:10:37 +0300 Subject: [PATCH 2/5] test: remove send_timeout dependency, use transport.abort() for RST Replace the send_timeout=3 mechanism with an explicit RST-close via transport.abort() (SO_LINGER=0 already set). transport.abort() skips asyncio's write-buffer flush, so the RST fires immediately even with a large flood backlog. Both tests now work without any server-side timeout configuration, making the blocked state observable for as long as needed. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- tests/dragonfly/connection_test.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 82275c6521e2..1ea4040a5387 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -2248,10 +2248,10 @@ async def check_phantoms(): await check_phantoms() flood_task.cancel() - writer.close() + writer.transport.abort() -@dfly_args({"proactor_threads": 1, "send_timeout": 3}) +@dfly_args({"proactor_threads": 1}) async def test_multi_exec_stuck_send_stalls_tx(df_server: DflyInstance): """Reproduce issue #7272: MULTI/EXEC reply blocked on send holds the shard lock, stalling concurrent transactions on the same keys. @@ -2260,12 +2260,18 @@ async def test_multi_exec_stuck_send_stalls_tx(df_server: DflyInstance): Send() fires during EXEC execution — before UnlockMulti(). With the socket send-buffer full (client not reading), the server fiber suspends inside Send() while still holding the shard lock. A second MULTI/SET on the same - key must block until send_timeout disconnects the clogger. + key is stalled until the clogger is RST-closed. """ + import struct + admin = df_server.client() await admin.set("mkey", "v" * 1024 * 1024) reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) + # SO_LINGER=0: close() sends RST, immediately unblocking the server's stuck Write(). + writer.get_extra_info("socket").setsockopt( + socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0) + ) writer.write(b"CLIENT SETNAME clogger\r\n") await writer.drain() await reader.readline() # +OK @@ -2299,18 +2305,16 @@ async def wait_clogger_in_send(): await asyncio.sleep(0.5) assert not compete.done(), "Competing MULTI/EXEC should be stuck waiting for the shard lock" - # send_timeout=3 shuts down the clogger socket via ShutdownSelfBlocking(), which - # fails the blocked Write(), resumes the fiber, calls UnlockMulti(), and lets - # the competing transaction proceed. - result = await asyncio.wait_for(compete, timeout=15.0) - assert result == [True] - - info = await admin.info("clients") - assert int(info["timeout_disconnects"]) >= 1 + # RST-close the clogger: the stuck Write() fails immediately, the fiber resumes, + # UnlockMulti() is called, and the competing transaction can proceed. + # transport.abort() closes without flushing the write buffer (unlike writer.close() + # which would drain the flood backlog first, delaying the RST by seconds). + flood_task.cancel() + writer.transport.abort() + result = await asyncio.wait_for(compete, timeout=5.0) + assert result == [True] assert await admin.ping() - flood_task.cancel() - writer.close() async def test_blocking_command_close_eof(df_server: DflyInstance): From def7e5a03ba2fb27d7c6ba1b3c1c35f6e53025bd Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 8 May 2026 16:42:49 +0300 Subject: [PATCH 3/5] chore: add delay to be able to inspect the test --- tests/dragonfly/connection_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 1ea4040a5387..45e8a6429bae 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -2247,6 +2247,9 @@ async def check_phantoms(): await check_phantoms() + logging.info("Holding stuck state — inspect with: redis-cli -p %d client list", df_server.port) + await asyncio.sleep(300) + flood_task.cancel() writer.transport.abort() @@ -2305,6 +2308,9 @@ async def wait_clogger_in_send(): await asyncio.sleep(0.5) assert not compete.done(), "Competing MULTI/EXEC should be stuck waiting for the shard lock" + logging.info("Holding stuck state — inspect with: redis-cli -p %d client list", df_server.port) + await asyncio.sleep(300) + # RST-close the clogger: the stuck Write() fails immediately, the fiber resumes, # UnlockMulti() is called, and the competing transaction can proceed. # transport.abort() closes without flushing the write buffer (unlike writer.close() From 5e3a2b35324341fed19795b75d300c93a27a3da2 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 8 May 2026 22:14:59 +0300 Subject: [PATCH 4/5] feat(server): add phase=scheduled and phase=send to CLIENT LIST MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend CLIENT LIST with two finer-grained phases to make the issue-#7272 stall scenario immediately identifiable: - phase=send (pre-existing): connection fiber is blocked inside SinkReplyBuilder::Send() waiting for the socket write to drain. - phase=scheduled (new): coordinator fiber is blocked in run_barrier_.Wait() inside Transaction::Execute(), waiting for a shard callback to complete. This state covers the window where a transaction has been dispatched to shards but has not yet received the run-barrier signal — e.g. when another transaction holds an exclusive shard lock and the newcomer is queued behind it. Implementation mirrors SinkReplyBuilder::send_time_ns_: - Transaction::schedule_time_ns_ is set just before run_barrier_.Wait() and cleared immediately after; read lock-free by CLIENT LIST (same benign-race pattern as send_time_ns_). - IsWaitingOnShard() exposes the flag. - Service::GetContextInfo() populates ContextInfo::is_scheduled. - Connection::GetClientInfoBeforeAfterTid() overrides phase_name when is_scheduled is true (lower priority than "send"). Also add two regression/reproduction tests for #7272: - test_multi_exec_phantom_connections: ghosts show addr=0.0.0.0 + phase=scheduled, matching the production CLIENT LIST snapshot. - test_multi_exec_stuck_send_stalls_tx: competing MULTI/SET is stalled until the clogger is RST-closed. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/facade/dragonfly_connection.cc | 6 ++-- src/facade/service_interface.h | 1 + src/server/main_service.cc | 3 +- src/server/transaction.cc | 2 ++ src/server/transaction.h | 9 ++++++ tests/dragonfly/connection_test.py | 48 +++++++++++++----------------- 6 files changed, 39 insertions(+), 30 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 6aa6e67efe7a..4ffb0a487cd7 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1071,12 +1071,14 @@ pair Connection::GetClientInfoBeforeAfterTid() const { " tot-dispatches=", local_stats_.dispatch_entries_added); if (cc_) { - string cc_info = service_->GetContextInfo(cc_.get()).Format(); + auto ctx_info = service_->GetContextInfo(cc_.get()); // reply_builder_ may be null if the connection is in the setup phase, for example. if (reply_builder_ && reply_builder_->IsSendActive()) phase_name = "send"; - absl::StrAppend(&after, " ", cc_info); + else if (ctx_info.is_scheduled) + phase_name = "scheduled"; + absl::StrAppend(&after, " ", ctx_info.Format()); } absl::StrAppend(&after, " phase=", phase_name); diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index b4667d440005..c56238da66de 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -72,6 +72,7 @@ class ServiceInterface { unsigned db_index; bool async_dispatch, conn_closing, subscribers, blocked; + bool is_scheduled; // coordinator fiber is blocked in run_barrier_.Wait() }; virtual ContextInfo GetContextInfo(ConnectionContext* cntx) const { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 5da081168e1d..3d30fbe01a3b 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2912,7 +2912,8 @@ Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) co .async_dispatch = server_cntx->async_dispatch, .conn_closing = server_cntx->conn_closing, .subscribers = bool(server_cntx->conn_state.subscribe_info), - .blocked = server_cntx->blocked}; + .blocked = server_cntx->blocked, + .is_scheduled = server_cntx->transaction && server_cntx->transaction->IsWaitingOnShard()}; } #define HFUNC(x) SetHandler(&Service::x) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 48121323192d..2b91c3653c69 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -964,7 +964,9 @@ void Transaction::Execute(RunnableType cb, bool conclude) { } DispatchHop(); + schedule_time_ns_ = ProactorBase::GetMonotonicTimeNs(); run_barrier_.Wait(); + schedule_time_ns_ = 0; cb_ptr_.reset(); if (coordinator_state_ & COORD_CONCLUDING) diff --git a/src/server/transaction.h b/src/server/transaction.h index 6944b9855452..9873c2a2b2e1 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -303,6 +303,12 @@ class Transaction { return coordinator_state_ & COORD_SCHED; } + // True while the coordinator fiber is blocked in run_barrier_.Wait(). + // Used by CLIENT LIST to report phase="scheduled". + bool IsWaitingOnShard() const { + return schedule_time_ns_ != 0; + } + MultiMode GetMultiMode() const { return multi_->mode; } @@ -613,6 +619,9 @@ class Transaction { Namespace* namespace_{nullptr}; DbIndex db_index_{0}; uint64_t time_now_ms_{0}; + // Non-zero while the coordinator fiber is blocked in run_barrier_.Wait() inside Execute(). + // Read lock-free by CLIENT LIST — same pattern as SinkReplyBuilder::send_time_ns_. + uint64_t schedule_time_ns_{0}; std::atomic_uint32_t use_count_{0}; // transaction exists only as an intrusive_ptr diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 45e8a6429bae..c54e70d0ae63 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -2167,9 +2167,9 @@ async def test_multi_exec_phantom_connections(df_server: DflyInstance): Clogger floods MULTI/GET <1 MB key>/EXEC without reading, blocking Send() mid-EXEC while holding the shard lock. Ghost connections send MULTI/SET/EXEC one command at a time (sync-dispatch mode) then RST-close. The main connection fiber is stuck in - ScheduleSingleHop with no separate read fiber, so the io_uring RST event goes - unprocessed: the kernel moves the socket to TCP_CLOSE (addr=0.0.0.0) while - phase stays "process" — matching the production CLIENT LIST snapshot. + run_barrier_.Wait() inside Transaction::Execute(), so the io_uring RST event goes + unprocessed: the kernel moves the socket to TCP_CLOSE (addr=0.0.0.0) while phase + shows "scheduled" (coordinator fiber waiting for shard callback to complete). """ import struct @@ -2185,16 +2185,18 @@ async def test_multi_exec_phantom_connections(df_server: DflyInstance): await writer.drain() await reader.readline() # +OK - async def flood(): - cmd = b"MULTI\r\nGET key\r\nEXEC\r\n" - while True: - try: - writer.write(cmd) - await writer.drain() - except Exception: - break + # Stop asyncio from draining incoming data — otherwise it continuously reads the + # server's replies into its StreamReader buffer, keeping the OS receive window open + # and allowing the server to send all replies before getting stuck. + writer.transport.pause_reading() - flood_task = asyncio.create_task(flood()) + # A few commands are enough: each EXEC reply is ~1 MB; the OS receive buffer + # (~4 MB) fills after 4 replies, TCP window drops to 0, and the server's + # Send() blocks mid-EXEC while still holding the shard lock. + cmd = b"MULTI\r\nGET key\r\nEXEC\r\n" + for _ in range(10): + writer.write(cmd) + await writer.drain() @assert_eventually(times=600) async def wait_clogger_stuck(): @@ -2240,7 +2242,7 @@ async def check_phantoms(): phantoms = [ c for c in clients - if c.get("addr", "").startswith("0.0.0.0") and c.get("phase") == "process" + if c.get("addr", "").startswith("0.0.0.0") and c.get("phase") == "scheduled" ] logging.info("phantoms: %s", [(c["name"], c["addr"], c["phase"]) for c in phantoms]) assert len(phantoms) >= 3, f"got {[(c['addr'], c['phase']) for c in clients]}" @@ -2250,8 +2252,7 @@ async def check_phantoms(): logging.info("Holding stuck state — inspect with: redis-cli -p %d client list", df_server.port) await asyncio.sleep(300) - flood_task.cancel() - writer.transport.abort() + writer.transport.abort() # RST-close clogger → unlocks shard → ghosts unblock @dfly_args({"proactor_threads": 1}) @@ -2279,16 +2280,12 @@ async def test_multi_exec_stuck_send_stalls_tx(df_server: DflyInstance): await writer.drain() await reader.readline() # +OK - async def flood(): - cmd = b"MULTI\r\nGET mkey\r\nEXEC\r\n" - while True: - try: - writer.write(cmd) - await writer.drain() - except Exception: - break + writer.transport.pause_reading() # keep OS receive buffer full so server stays stuck - flood_task = asyncio.create_task(flood()) + cmd = b"MULTI\r\nGET mkey\r\nEXEC\r\n" + for _ in range(10): + writer.write(cmd) + await writer.drain() @assert_eventually(times=600) async def wait_clogger_in_send(): @@ -2313,9 +2310,6 @@ async def wait_clogger_in_send(): # RST-close the clogger: the stuck Write() fails immediately, the fiber resumes, # UnlockMulti() is called, and the competing transaction can proceed. - # transport.abort() closes without flushing the write buffer (unlike writer.close() - # which would drain the flood backlog first, delaying the RST by seconds). - flood_task.cancel() writer.transport.abort() result = await asyncio.wait_for(compete, timeout=5.0) From 64e2e7cb643b26f197039671494edfc5f471c501 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 8 May 2026 23:53:11 +0300 Subject: [PATCH 5/5] test: merge recovery assertion into phantom test, drop stall test The stall test was a subset of the phantom test: both set up the same clogger scenario, and phase=scheduled already implies the shard lock is held. Merge the recovery assertion (competing MULTI/SET unblocks after RST-close) directly into test_multi_exec_phantom_connections and remove the now-redundant test_multi_exec_stuck_send_stalls_tx. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- tests/dragonfly/connection_test.py | 55 +++--------------------------- 1 file changed, 4 insertions(+), 51 deletions(-) diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index c54e70d0ae63..2da72c8c37de 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -2249,57 +2249,10 @@ async def check_phantoms(): await check_phantoms() - logging.info("Holding stuck state — inspect with: redis-cli -p %d client list", df_server.port) - await asyncio.sleep(300) - - writer.transport.abort() # RST-close clogger → unlocks shard → ghosts unblock - - -@dfly_args({"proactor_threads": 1}) -async def test_multi_exec_stuck_send_stalls_tx(df_server: DflyInstance): - """Reproduce issue #7272: MULTI/EXEC reply blocked on send holds the shard lock, - stalling concurrent transactions on the same keys. - - A 1 MB value causes each GET reply to overflow the 8 KB reply buffer, so - Send() fires during EXEC execution — before UnlockMulti(). With the socket - send-buffer full (client not reading), the server fiber suspends inside - Send() while still holding the shard lock. A second MULTI/SET on the same - key is stalled until the clogger is RST-closed. - """ - import struct - - admin = df_server.client() - await admin.set("mkey", "v" * 1024 * 1024) - - reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) - # SO_LINGER=0: close() sends RST, immediately unblocking the server's stuck Write(). - writer.get_extra_info("socket").setsockopt( - socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0) - ) - writer.write(b"CLIENT SETNAME clogger\r\n") - await writer.drain() - await reader.readline() # +OK - - writer.transport.pause_reading() # keep OS receive buffer full so server stays stuck - - cmd = b"MULTI\r\nGET mkey\r\nEXEC\r\n" - for _ in range(10): - writer.write(cmd) - await writer.drain() - - @assert_eventually(times=600) - async def wait_clogger_in_send(): - clients = await admin.client_list() - v = next((c for c in clients if c.get("name") == "clogger"), None) - assert v is not None and v["phase"] == "send" - - await wait_clogger_in_send() - - # A competing MULTI/SET on the same key must block: the shard lock is still - # held by the clogger's EXEC handler suspended inside Send(). + # A competing MULTI/SET confirms the shard lock is still held while we observe. other = df_server.client() pipe = other.pipeline(transaction=True) - pipe.set("mkey", "new") + pipe.set("key", "new") compete = asyncio.create_task(pipe.execute()) await asyncio.sleep(0.5) @@ -2308,8 +2261,8 @@ async def wait_clogger_in_send(): logging.info("Holding stuck state — inspect with: redis-cli -p %d client list", df_server.port) await asyncio.sleep(300) - # RST-close the clogger: the stuck Write() fails immediately, the fiber resumes, - # UnlockMulti() is called, and the competing transaction can proceed. + # RST-close the clogger: Write() fails, fiber resumes, UnlockMulti() is called, + # the competing transaction unblocks, and phantom connections clean up. writer.transport.abort() result = await asyncio.wait_for(compete, timeout=5.0)