Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,8 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
// to global pipeline-backpressure relief notifications.
util::fb2::detail::Waiter backpressure_waiter{ioevent_cb};

const uint32_t async_dispatch_quota = GetFlag(FLAGS_async_dispatch_quota);

do {
HandleMigrateRequest();

Expand Down Expand Up @@ -2992,12 +2994,29 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
phase_ = PROCESS;
bool reached_capacity = io_buf_.AppendLen() == 0;

// Temporary: Handle dispatch queue items (Control Path) one by one blocking command execution
// Handle dispatch queue items (Control Path) with a bounded quota to prevent
// starvation of the data path:
// - Under a PubSub flood, dispatch_q_ can accumulate thousands of messages.
// - Without a quota, the fiber would drain them all before parsing any new data from the
// socket, starving GET/SET and other pipeline commands.
// - This mirrors V1's async_dispatch_quota / prefer_pipeline_execution mechanism in AsyncFiber.
if (!dispatch_q_.empty()) {
uint32_t dispatched{};
Comment thread
glevkovich marked this conversation as resolved.
bool quota_reached = false;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another suggestion - consider extracting this inner while loop into a helper function, aka:
bool quota_reached = ProcessControlCommands(async_dispatch_quota);


while (!dispatch_q_.empty()) {
// Quota reached: stop draining the dispatch queue and fall through to the data path.
if ((async_dispatch_quota > 0) && (dispatched >= async_dispatch_quota)) {
quota_reached = true;
LOG_EVERY_T(INFO, 1) << "[" << id_ << "] V2 dispatch_q_ quota reached (" << dispatched
<< "/" << async_dispatch_quota << "), falling through to data path";
break;
}

auto msg = std::move(dispatch_q_.front());
dispatch_q_.pop_front();
UpdateDispatchStats(msg, false /* subtract */);
dispatched++;

// If a MigrationRequestMessage arrives via the dispatch queue, stop processing
// and let the loop iterate back to HandleMigrateRequest() at the top.
Expand All @@ -3006,15 +3025,19 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
}

std::visit(AsyncOperations{reply_builder_.get(), this}, msg.handle);
}

// Note: No flush needed here: the `continue` below re-enters the loop, which either
// hits the data path (ParseLoop flushes via ReplyBatch) or the idle-await block
// (Flush 1), which always flushes before sleeping.
} // while

// TODO: Properly handle backpressure
GetQueueBackpressure().pubsub_ec.notifyAll();
continue;

Comment thread
glevkovich marked this conversation as resolved.
// Starvation prevention: if quota was hit, fall through to the data path so
// pipelined commands get a turn. Otherwise `continue` back to the top.
//
// No flush here: both paths reach a flush before sleeping via the idle-await block
// at the top of the loop, allowing PubSub and command replies to be coalesced into
// one sendmsg syscall.
if (!quota_reached) {
continue;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand the flow here.
if quota was not reached why go back and not fall through? I see continue existed before but I still do not understand what it does.

Copy link
Copy Markdown
Contributor Author

@glevkovich glevkovich Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The continue is not just optimization, it is needed for correctness and to keep low latency. I'll explain:

Reason 1:
When we are done processing here the dispatch_q, it took time, in some cases even long time, and during that time we might have got new client messages waiting in the socket (e.g GET/SET command). Sicne we havn't called ReadPendingInput (worse-case scenario), the io_buf_ is totally empty. If we just fall through to the data path, it checks io_buf_.InputLen() and thinks that there is no data from the client and skips parsing entirely (It makes a decision based on wrong info).

When we jump to the top, these are the only areas in the hot-path where we flush and read. (ignore the special case flashes further down). So we read and pull more data:

if (pending_input_) {
  ReadPendingInput();
}

Now, when the loop eventually reaches the data path, it is working with fresh, up-to-date network data.

Reason 2:
When we process PubSub messages, their replies accumulate in reply_builder_. Flush() only happens at the top of the loop (in the idle-await block). If we fall through, we traverse the entire data path section doing nothing useful, then loop back to the top to flush. The continue skips that dead code and reaches the flush immediately - one fewer loop iteration, lower latency.

In short: We use continue to guarantee low latency and fresh socket reads when the queue is naturally empty. We only use the fall-through as an emergency case (when quota_reached is true) to force the data path to run so it doesn't get starved by a never-ending flood of PubSub messages.

Copy link
Copy Markdown
Collaborator

@romange romange Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then the comment above focuses only on "quota_reached" path, but it's not clear why continue is needed in the first place. I would add additional comment around "continue" to explain why it's needed. Maybe the code structured differently would provide a more natural flow but I might be wrong too and comment at least will close the gap.

}
}

// Handle Parsed Commands Queue (Data Path)
Expand Down
68 changes: 62 additions & 6 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,10 @@ async def test_keyspace_events_config_set(async_client: aioredis.Redis):
await collect_expiring_events(pclient, keys)


@dfly_args({"max_busy_read_usec": 50000})
@dfly_multi_test_args(
{"max_busy_read_usec": 50000, "experimental_io_loop_v2": "false"},
{"max_busy_read_usec": 50000, "experimental_io_loop_v2": "true"},
)
async def test_reply_count(df_server: DflyInstance):
"""Make sure reply aggregations reduce reply counts for common cases"""

Expand Down Expand Up @@ -704,7 +707,7 @@ async def measure(aw):

# MULTI-OK + the EXEC array.
# V1 (dual-fiber) is aggressive (<=2).
# V2 (single-fiber cycle) flushes slightly more often (<=3).
# V2 (single-fiber): MULTI/OK may flush separately before the EXEC batch (<=3).
is_v2 = is_io_loop_v2(df_server)
multi_limit = 3 if is_v2 else 2
assert await measure(e.execute()) <= multi_limit
Expand All @@ -715,9 +718,13 @@ async def measure(aw):
p.incr("num-1")

# V1: aggressive squashing across dual fibers (<=2).
# V2: single-fiber loop flushes based on OS scheduling and batch boundaries (<=12).
# V2: conditional flushing defers flush when pending_input_ or io_buf_ has data,
# but fast synchronous commands (INCR) don't yield, so io_uring completions aren't processed
# mid-batch. Task 2 (Epoch Yield) will fix this by yielding before flush. Until then, V2
# flushes based on TCP segment boundaries (<=12).
pipe_limit = 12 if is_v2 else 2
assert await measure(p.execute()) <= pipe_limit
pipe_flushes = await measure(p.execute())
assert pipe_flushes <= pipe_limit

# Script result
assert await measure(async_client.eval('return {1,2,{3,4},5,6,7,8,"nine"}', 0)) == 1
Expand All @@ -729,6 +736,45 @@ async def measure(aw):
assert await measure(async_client.ft("i1").search("*")) <= 2


@dfly_args({"experimental_io_loop_v2": "true"})
async def test_v2_conditional_flush_no_stall(df_server: DflyInstance):
"""Verify that V2 conditional flushing never stalls client replies.

When a pipeline arrives fragmented (separate TCP segments), the server must flush
completed replies before sleeping on the socket. If maybe_flush() before the main
await fails to flush, the reply stays trapped in memory and the client hangs.
"""
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)

# Send an isolated command and wait for the server to process and flush.
writer.write(b"PING\r\n")
await writer.drain()

# If conditional flush is broken, PONG is trapped in the reply buffer and this will timeout.
res = await asyncio.wait_for(reader.readline(), timeout=1.0)
assert res == b"+PONG\r\n", f"Expected PONG, got {res!r}"

# Send a fragmented pipeline: two commands in separate TCP segments.
writer.write(b"SET foo bar\r\n")
await writer.drain()
await asyncio.sleep(0.1)

writer.write(b"GET foo\r\n")
await writer.drain()

res1 = await asyncio.wait_for(reader.readline(), timeout=1.0)
assert res1 == b"+OK\r\n", f"Expected +OK, got {res1!r}"

# GET reply is a bulk string: $3\r\nbar\r\n
res2_hdr = await asyncio.wait_for(reader.readline(), timeout=1.0)
assert res2_hdr == b"$3\r\n", f"Expected $3, got {res2_hdr!r}"
res2_body = await asyncio.wait_for(reader.readline(), timeout=1.0)
assert res2_body == b"bar\r\n", f"Expected bar, got {res2_body!r}"

writer.close()
await writer.wait_closed()


async def test_big_command(df_server, size=8 * 1024):
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)

Expand Down Expand Up @@ -1291,7 +1337,14 @@ async def wait_for_stuck_on_send():
# This test relies on SquashPipeline() memory lifecycle (V1/AsyncFiber only).
# V2's single-fiber ExecuteBatch() releases commands incrementally, so the cache
# is non-empty during execution. Skip when V2 is explicitly enabled.
@dfly_args({"proactor_threads": 1, "pipeline_squash": 9, "max_busy_read_usec": 50000})
@dfly_args(
{
"proactor_threads": 1,
"pipeline_squash": 9,
"max_busy_read_usec": 50000,
"experimental_io_loop_v2": "false",
}
)
async def test_pipeline_cache_only_async_squashed_dispatches(df_factory):
server = df_factory.create()
server.start()
Expand Down Expand Up @@ -2094,7 +2147,10 @@ async def delayed_push():
await pusher.aclose()


@dfly_args({"proactor_threads": 2, "async_dispatch_quota": 50})
@dfly_multi_test_args(
{"proactor_threads": 2, "async_dispatch_quota": 50, "experimental_io_loop_v2": "false"},
{"proactor_threads": 2, "async_dispatch_quota": 50, "experimental_io_loop_v2": "true"},
)
async def test_pubsub_pipeline_starvation(df_server: DflyInstance):
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)
# Send subscribe and consume the standard 6-line RESP array reply
Expand Down
Loading