feat(facade): bound IoLoopV2 dispatch_q_ quota to prevent starvation#7234
feat(facade): bound IoLoopV2 dispatch_q_ quota to prevent starvation#7234glevkovich wants to merge 1 commit intomainfrom
Conversation
🤖 Augment PR SummarySummary: This PR prevents PubSub/control-path floods from starving pipelined command execution in Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
There was a problem hiding this comment.
Pull request overview
Adds fairness to the V2 connection I/O loop by bounding how many control-path (dispatch queue) messages are processed per iteration, preventing PubSub floods from starving pipelined command execution; expands Python integration tests to run key cases against both IoLoop V1 and V2.
Changes:
- Bound
IoLoopV2dispatch-queue draining usingFLAGS_async_dispatch_quota, falling through to the data path when the quota is hit. - Update/parameterize existing connection tests to run with
experimental_io_loop_v2enabled/disabled. - Add a V2-focused regression test to ensure conditional flushing does not stall replies.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tests/dragonfly/connection_test.py | Parameterizes tests across V1/V2, adjusts reply-count expectations, and adds a V2 conditional-flush regression test. |
| src/facade/dragonfly_connection.cc | Implements quota-bounded dispatch queue draining in IoLoopV2 to prevent pipeline starvation under heavy control-path load. |
Previously, IoLoopV2 drained dispatch_q_ with an unbounded while loop. Under a PubSub flood, this trapped the fiber in the control path, starving pipelined commands (GET/SET) and causing client timeouts. Key changes: - Bounded dispatch: process at most FLAGS_async_dispatch_quota messages per iteration; if the quota is hit, fall through to the data path so pipeline commands get a turn. Mirrors V1's async_dispatch_quota / prefer_pipeline_execution mechanism in AsyncFiber. - Deferred flush: the quota-hit path falls through to ParseLoop, which reaches the idle-await flush, coalescing PubSub and command replies into a single sendmsg syscall. - Batched backpressure: pubsub_ec.notifyAll() is now called once per quota chunk instead of once per message. - Testing: parameterized test_pubsub_pipeline_starvation for both V1 and V2 to prevent regressions. Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
229e939 to
5a48ddb
Compare
| // at the top of the loop, allowing PubSub and command replies to be coalesced into | ||
| // one sendmsg syscall. | ||
| if (!quota_reached) { | ||
| continue; |
There was a problem hiding this comment.
I do not understand the flow here.
if quota was not reached why go back and not fall through? I see continue existed before but I still do not understand what it does.
There was a problem hiding this comment.
The continue is not just optimization, it is needed for correctness and to keep low latency. I'll explain:
Reason 1:
When we are done processing here the dispatch_q, it took time, in some cases even long time, and during that time we might have got new client messages waiting in the socket (e.g GET/SET command). Sicne we havn't called ReadPendingInput (worse-case scenario), the io_buf_ is totally empty. If we just fall through to the data path, it checks io_buf_.InputLen() and thinks that there is no data from the client and skips parsing entirely (It makes a decision based on wrong info).
When we jump to the top, these are the only areas in the hot-path where we flush and read. (ignore the special case flashes further down). So we read and pull more data:
if (pending_input_) {
ReadPendingInput();
}
Now, when the loop eventually reaches the data path, it is working with fresh, up-to-date network data.
Reason 2:
When we process PubSub messages, their replies accumulate in reply_builder_. Flush() only happens at the top of the loop (in the idle-await block). If we fall through, we traverse the entire data path section doing nothing useful, then loop back to the top to flush. The continue skips that dead code and reaches the flush immediately - one fewer loop iteration, lower latency.
In short: We use continue to guarantee low latency and fresh socket reads when the queue is naturally empty. We only use the fall-through as an emergency case (when quota_reached is true) to force the data path to run so it doesn't get starved by a never-ending flood of PubSub messages.
There was a problem hiding this comment.
Ok, then the comment above focuses only on "quota_reached" path, but it's not clear why continue is needed in the first place. I would add additional comment around "continue" to explain why it's needed. Maybe the code structured differently would provide a more natural flow but I might be wrong too and comment at least will close the gap.
| // - This mirrors V1's async_dispatch_quota / prefer_pipeline_execution mechanism in AsyncFiber. | ||
| if (!dispatch_q_.empty()) { | ||
| uint32_t dispatched{}; | ||
| bool quota_reached = false; |
There was a problem hiding this comment.
another suggestion - consider extracting this inner while loop into a helper function, aka:
bool quota_reached = ProcessControlCommands(async_dispatch_quota);
Previously, IoLoopV2 drained dispatch_q_ with an unbounded while loop. Under a PubSub flood, this trapped the fiber in the control path, starving pipelined commands (e.g GET/SET) and causing client timeouts.
Key changes: