diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 6aa6e67efe7a..bf7ebd3fd09d 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -2948,6 +2948,8 @@ variant Connection::IoLoopV2() { // to global pipeline-backpressure relief notifications. util::fb2::detail::Waiter backpressure_waiter{ioevent_cb}; + const uint32_t async_dispatch_quota = GetFlag(FLAGS_async_dispatch_quota); + do { HandleMigrateRequest(); @@ -2992,12 +2994,29 @@ variant Connection::IoLoopV2() { phase_ = PROCESS; bool reached_capacity = io_buf_.AppendLen() == 0; - // Temporary: Handle dispatch queue items (Control Path) one by one blocking command execution + // Handle dispatch queue items (Control Path) with a bounded quota to prevent + // starvation of the data path: + // - Under a PubSub flood, dispatch_q_ can accumulate thousands of messages. + // - Without a quota, the fiber would drain them all before parsing any new data from the + // socket, starving GET/SET and other pipeline commands. + // - This mirrors V1's async_dispatch_quota / prefer_pipeline_execution mechanism in AsyncFiber. if (!dispatch_q_.empty()) { + uint32_t dispatched{}; + bool quota_reached = false; + while (!dispatch_q_.empty()) { + // Quota reached: stop draining the dispatch queue and fall through to the data path. + if ((async_dispatch_quota > 0) && (dispatched >= async_dispatch_quota)) { + quota_reached = true; + LOG_EVERY_T(INFO, 1) << "[" << id_ << "] V2 dispatch_q_ quota reached (" << dispatched + << "/" << async_dispatch_quota << "), falling through to data path"; + break; + } + auto msg = std::move(dispatch_q_.front()); dispatch_q_.pop_front(); UpdateDispatchStats(msg, false /* subtract */); + dispatched++; // If a MigrationRequestMessage arrives via the dispatch queue, stop processing // and let the loop iterate back to HandleMigrateRequest() at the top. @@ -3006,15 +3025,19 @@ variant Connection::IoLoopV2() { } std::visit(AsyncOperations{reply_builder_.get(), this}, msg.handle); - } - - // Note: No flush needed here: the `continue` below re-enters the loop, which either - // hits the data path (ParseLoop flushes via ReplyBatch) or the idle-await block - // (Flush 1), which always flushes before sleeping. + } // while - // TODO: Properly handle backpressure GetQueueBackpressure().pubsub_ec.notifyAll(); - continue; + + // Starvation prevention: if quota was hit, fall through to the data path so + // pipelined commands get a turn. Otherwise `continue` back to the top. + // + // No flush here: both paths reach a flush before sleeping via the idle-await block + // at the top of the loop, allowing PubSub and command replies to be coalesced into + // one sendmsg syscall. + if (!quota_reached) { + continue; + } } // Handle Parsed Commands Queue (Data Path) diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index f17571eabf76..a45c38be614f 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -662,7 +662,10 @@ async def test_keyspace_events_config_set(async_client: aioredis.Redis): await collect_expiring_events(pclient, keys) -@dfly_args({"max_busy_read_usec": 50000}) +@dfly_multi_test_args( + {"max_busy_read_usec": 50000, "experimental_io_loop_v2": "false"}, + {"max_busy_read_usec": 50000, "experimental_io_loop_v2": "true"}, +) async def test_reply_count(df_server: DflyInstance): """Make sure reply aggregations reduce reply counts for common cases""" @@ -704,7 +707,7 @@ async def measure(aw): # MULTI-OK + the EXEC array. # V1 (dual-fiber) is aggressive (<=2). - # V2 (single-fiber cycle) flushes slightly more often (<=3). + # V2 (single-fiber): MULTI/OK may flush separately before the EXEC batch (<=3). is_v2 = is_io_loop_v2(df_server) multi_limit = 3 if is_v2 else 2 assert await measure(e.execute()) <= multi_limit @@ -715,9 +718,13 @@ async def measure(aw): p.incr("num-1") # V1: aggressive squashing across dual fibers (<=2). - # V2: single-fiber loop flushes based on OS scheduling and batch boundaries (<=12). + # V2: conditional flushing defers flush when pending_input_ or io_buf_ has data, + # but fast synchronous commands (INCR) don't yield, so io_uring completions aren't processed + # mid-batch. Task 2 (Epoch Yield) will fix this by yielding before flush. Until then, V2 + # flushes based on TCP segment boundaries (<=12). pipe_limit = 12 if is_v2 else 2 - assert await measure(p.execute()) <= pipe_limit + pipe_flushes = await measure(p.execute()) + assert pipe_flushes <= pipe_limit # Script result assert await measure(async_client.eval('return {1,2,{3,4},5,6,7,8,"nine"}', 0)) == 1 @@ -729,6 +736,45 @@ async def measure(aw): assert await measure(async_client.ft("i1").search("*")) <= 2 +@dfly_args({"experimental_io_loop_v2": "true"}) +async def test_v2_conditional_flush_no_stall(df_server: DflyInstance): + """Verify that V2 conditional flushing never stalls client replies. + + When a pipeline arrives fragmented (separate TCP segments), the server must flush + completed replies before sleeping on the socket. If maybe_flush() before the main + await fails to flush, the reply stays trapped in memory and the client hangs. + """ + reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) + + # Send an isolated command and wait for the server to process and flush. + writer.write(b"PING\r\n") + await writer.drain() + + # If conditional flush is broken, PONG is trapped in the reply buffer and this will timeout. + res = await asyncio.wait_for(reader.readline(), timeout=1.0) + assert res == b"+PONG\r\n", f"Expected PONG, got {res!r}" + + # Send a fragmented pipeline: two commands in separate TCP segments. + writer.write(b"SET foo bar\r\n") + await writer.drain() + await asyncio.sleep(0.1) + + writer.write(b"GET foo\r\n") + await writer.drain() + + res1 = await asyncio.wait_for(reader.readline(), timeout=1.0) + assert res1 == b"+OK\r\n", f"Expected +OK, got {res1!r}" + + # GET reply is a bulk string: $3\r\nbar\r\n + res2_hdr = await asyncio.wait_for(reader.readline(), timeout=1.0) + assert res2_hdr == b"$3\r\n", f"Expected $3, got {res2_hdr!r}" + res2_body = await asyncio.wait_for(reader.readline(), timeout=1.0) + assert res2_body == b"bar\r\n", f"Expected bar, got {res2_body!r}" + + writer.close() + await writer.wait_closed() + + async def test_big_command(df_server, size=8 * 1024): reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) @@ -1291,7 +1337,14 @@ async def wait_for_stuck_on_send(): # This test relies on SquashPipeline() memory lifecycle (V1/AsyncFiber only). # V2's single-fiber ExecuteBatch() releases commands incrementally, so the cache # is non-empty during execution. Skip when V2 is explicitly enabled. -@dfly_args({"proactor_threads": 1, "pipeline_squash": 9, "max_busy_read_usec": 50000}) +@dfly_args( + { + "proactor_threads": 1, + "pipeline_squash": 9, + "max_busy_read_usec": 50000, + "experimental_io_loop_v2": "false", + } +) async def test_pipeline_cache_only_async_squashed_dispatches(df_factory): server = df_factory.create() server.start() @@ -2094,7 +2147,10 @@ async def delayed_push(): await pusher.aclose() -@dfly_args({"proactor_threads": 2, "async_dispatch_quota": 50}) +@dfly_multi_test_args( + {"proactor_threads": 2, "async_dispatch_quota": 50, "experimental_io_loop_v2": "false"}, + {"proactor_threads": 2, "async_dispatch_quota": 50, "experimental_io_loop_v2": "true"}, +) async def test_pubsub_pipeline_starvation(df_server: DflyInstance): reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) # Send subscribe and consume the standard 6-line RESP array reply