Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1071,12 +1071,14 @@ pair<string, string> Connection::GetClientInfoBeforeAfterTid() const {
" tot-dispatches=", local_stats_.dispatch_entries_added);

if (cc_) {
string cc_info = service_->GetContextInfo(cc_.get()).Format();
auto ctx_info = service_->GetContextInfo(cc_.get());

// reply_builder_ may be null if the connection is in the setup phase, for example.
if (reply_builder_ && reply_builder_->IsSendActive())
phase_name = "send";
absl::StrAppend(&after, " ", cc_info);
else if (ctx_info.is_scheduled)
phase_name = "scheduled";
absl::StrAppend(&after, " ", ctx_info.Format());
}
absl::StrAppend(&after, " phase=", phase_name);

Expand Down
1 change: 1 addition & 0 deletions src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ServiceInterface {

unsigned db_index;
bool async_dispatch, conn_closing, subscribers, blocked;
bool is_scheduled; // coordinator fiber is blocked in run_barrier_.Wait()
};

virtual ContextInfo GetContextInfo(ConnectionContext* cntx) const {
Expand Down
3 changes: 2 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2912,7 +2912,8 @@ Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) co
.async_dispatch = server_cntx->async_dispatch,
.conn_closing = server_cntx->conn_closing,
.subscribers = bool(server_cntx->conn_state.subscribe_info),
.blocked = server_cntx->blocked};
.blocked = server_cntx->blocked,
.is_scheduled = server_cntx->transaction && server_cntx->transaction->IsWaitingOnShard()};
}

#define HFUNC(x) SetHandler(&Service::x)
Expand Down
2 changes: 2 additions & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,9 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
}

DispatchHop();
schedule_time_ns_ = ProactorBase::GetMonotonicTimeNs();
run_barrier_.Wait();
schedule_time_ns_ = 0;
cb_ptr_.reset();

if (coordinator_state_ & COORD_CONCLUDING)
Expand Down
9 changes: 9 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ class Transaction {
return coordinator_state_ & COORD_SCHED;
}

// True while the coordinator fiber is blocked in run_barrier_.Wait().
// Used by CLIENT LIST to report phase="scheduled".
bool IsWaitingOnShard() const {
return schedule_time_ns_ != 0;
}

MultiMode GetMultiMode() const {
return multi_->mode;
}
Expand Down Expand Up @@ -613,6 +619,9 @@ class Transaction {
Namespace* namespace_{nullptr};
DbIndex db_index_{0};
uint64_t time_now_ms_{0};
// Non-zero while the coordinator fiber is blocked in run_barrier_.Wait() inside Execute().
// Read lock-free by CLIENT LIST — same pattern as SinkReplyBuilder::send_time_ns_.
uint64_t schedule_time_ns_{0};

std::atomic_uint32_t use_count_{0}; // transaction exists only as an intrusive_ptr

Expand Down
110 changes: 110 additions & 0 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,116 @@ async def flood():
await writer.wait_closed()


@dfly_args({"proactor_threads": 1})
async def test_multi_exec_phantom_connections(df_server: DflyInstance):
"""Reproduce the addr=0.0.0.0 phantom connections from issue #7272.

Clogger floods MULTI/GET <1 MB key>/EXEC without reading, blocking Send() mid-EXEC
while holding the shard lock. Ghost connections send MULTI/SET/EXEC one command at a
time (sync-dispatch mode) then RST-close. The main connection fiber is stuck in
run_barrier_.Wait() inside Transaction::Execute(), so the io_uring RST event goes
unprocessed: the kernel moves the socket to TCP_CLOSE (addr=0.0.0.0) while phase
shows "scheduled" (coordinator fiber waiting for shard callback to complete).
"""
import struct

linger_rst = struct.pack("ii", 1, 0)
admin = df_server.client()
await admin.set("key", "v" * 1024 * 1024)

# Clogger: floods MULTI/GET/EXEC without reading — fills the TCP send buffer,
# blocking Send() mid-EXEC while the shard lock on "key" is still held.
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)
writer.get_extra_info("socket").setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger_rst)
writer.write(b"CLIENT SETNAME clogger\r\n")
await writer.drain()
await reader.readline() # +OK

# Stop asyncio from draining incoming data — otherwise it continuously reads the
# server's replies into its StreamReader buffer, keeping the OS receive window open
# and allowing the server to send all replies before getting stuck.
writer.transport.pause_reading()

# A few commands are enough: each EXEC reply is ~1 MB; the OS receive buffer
# (~4 MB) fills after 4 replies, TCP window drops to 0, and the server's
# Send() blocks mid-EXEC while still holding the shard lock.
cmd = b"MULTI\r\nGET key\r\nEXEC\r\n"
for _ in range(10):
writer.write(cmd)
await writer.drain()

@assert_eventually(times=600)
async def wait_clogger_stuck():
clients = await admin.client_list()
v = next((c for c in clients if c.get("name") == "clogger"), None)
assert v is not None and v["phase"] == "send"

await wait_clogger_stuck()

# Ghost connections: preamble commands sent+ack'd one-by-one (sync-dispatch,
# tot-dispatches stays 0), then EXEC fired without reading. EXEC blocks in
# ScheduleSingleHop on the shard lock held by the clogger.
def recv_line(s):
buf = b""
while not buf.endswith(b"\r\n"):
Copy link
Copy Markdown

@augmentcode augmentcode Bot May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv_line() can hang indefinitely if recv() returns b"" (peer closed) or if the server never sends a \r\n-terminated line, because the loop never breaks. Consider adding an EOF check and a socket timeout so the test fails fast instead of stalling CI.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

buf += s.recv(256)
return buf

ghosts = []
for i in range(3):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, linger_rst)
s.connect(("127.0.0.1", df_server.port))
s.sendall(f"CLIENT SETNAME ghost{i}\r\n".encode())
recv_line(s) # +OK
s.sendall(b"MULTI\r\n")
recv_line(s) # +OK
s.sendall(b"SET key v\r\n")
recv_line(s) # +QUEUED
Comment on lines +2212 to +2228
s.sendall(b"EXEC\r\n") # blocked on shard lock; do not read reply
ghosts.append(s)

await asyncio.sleep(0.1) # let EXEC bytes reach server's receive buffer

# RST-close: kernel moves server-side sockets to TCP_CLOSE.
# getpeername() → ENOTCONN → addr=0.0.0.0 in CLIENT LIST.
for s in ghosts:
s.close()

@assert_eventually(times=50)
async def check_phantoms():
clients = await admin.client_list()
phantoms = [
c
for c in clients
if c.get("addr", "").startswith("0.0.0.0") and c.get("phase") == "scheduled"
]
logging.info("phantoms: %s", [(c["name"], c["addr"], c["phase"]) for c in phantoms])
assert len(phantoms) >= 3, f"got {[(c['addr'], c['phase']) for c in clients]}"

await check_phantoms()

# A competing MULTI/SET confirms the shard lock is still held while we observe.
other = df_server.client()
pipe = other.pipeline(transaction=True)
pipe.set("key", "new")
compete = asyncio.create_task(pipe.execute())

await asyncio.sleep(0.5)
assert not compete.done(), "Competing MULTI/EXEC should be stuck waiting for the shard lock"

logging.info("Holding stuck state — inspect with: redis-cli -p %d client list", df_server.port)
await asyncio.sleep(300)

# RST-close the clogger: Write() fails, fiber resumes, UnlockMulti() is called,
# the competing transaction unblocks, and phantom connections clean up.
writer.transport.abort()

result = await asyncio.wait_for(compete, timeout=5.0)
assert result == [True]
assert await admin.ping()


async def test_blocking_command_close_eof(df_server: DflyInstance):
"""Server must drop a connection that is parked on a blocking command
when the client closes its socket.
Expand Down
Loading