diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 8e10ea3718..04d4d30a6b 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -254,6 +254,7 @@ public: capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } capacity_t grab_capacity(capacity_t cap) noexcept; clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } + void refund_tokens(capacity_t) noexcept; void replenish_capacity(clock_type::time_point now) noexcept; void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; @@ -344,13 +345,12 @@ private: * in the middle of the waiting */ struct pending { - capacity_t head; - capacity_t cap; - - pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} + capacity_t head = 0; + capacity_t cap = 0; }; - std::optional _pending; + pending _pending; + capacity_t _queued_cap = 0; void push_priority_class(priority_class_data& pc) noexcept; void push_priority_class_from_idle(priority_class_data& pc) noexcept; @@ -360,7 +360,7 @@ private: enum class grab_result { grabbed, cant_preempt, pending }; grab_result grab_capacity(const fair_queue_entry& ent) noexcept; - grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; + capacity_t reap_pending_capacity(bool& contact) noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index ee0d6513e8..9a2f87fd35 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -160,6 +160,10 @@ public: _rovers.release(tokens); } + void refund(T tokens) noexcept { + fetch_add(_rovers.head, tokens); + } + void replenish(typename Clock::time_point now) noexcept { auto ts = _replenished.load(std::memory_order_relaxed); diff --git a/include/seastar/util/tracer.hh b/include/seastar/util/tracer.hh index c46b410f2e..0a6127793e 100644 --- a/include/seastar/util/tracer.hh +++ b/include/seastar/util/tracer.hh @@ -72,6 +72,7 @@ enum class trace_events { IO_QUEUE, IO_DISPATCH, IO_COMPLETE, + IO_CANCEL, MONITORING_SCRAPE, COUNT, }; @@ -143,6 +144,10 @@ inline void tracepoint_io_complete(uint64_t io_id) { tracepoint_unary(trace_events::IO_COMPLETE, io_id); } +inline void tracepoint_io_cancel(uint64_t io_id) { + tracepoint_unary(trace_events::IO_CANCEL, io_id); +} + inline void tracepoint_grab_capacity(uint64_t cap, uint64_t want_head, uint64_t head) { auto p = reinterpret_cast(g_tracer.write(12 + 24)); p = seastar::write_le(p, static_cast(trace_events::GRAB_CAPACITY)); @@ -168,8 +173,8 @@ inline void tracepoint_dispatch_queue(uint8_t id) { tracepoint_unary(trace_events::DISPATCH_QUEUE, id); } -inline void tracepoint_dispatch_requests() { - tracepoint_nullary(trace_events::DISPATCH_REQUESTS); +inline void tracepoint_dispatch_requests(uint64_t queued) { + tracepoint_unary(trace_events::DISPATCH_REQUESTS, queued); } inline void tracepoint_monitoring_scrape() { diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index 2be38e6c32..dc2d7f175e 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -118,6 +118,11 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept { tracepoint_replenish(_token_bucket.head()); } +void fair_group::refund_tokens(capacity_t cap) noexcept { + _token_bucket.refund(cap); + tracepoint_replenish(_token_bucket.head()); +} + void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { auto now = clock_type::now(); auto extra = _token_bucket.accumulated_in(now - local_ts); @@ -229,38 +234,29 @@ void fair_queue::unplug_class(class_id cid) noexcept { unplug_priority_class(*_priority_classes[cid]); } -auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - _group.maybe_replenish_capacity(_group_replenish); - - capacity_t head = _group.head(); - tracepoint_grab_capacity(ent._capacity, _pending->head, head); - if (internal::wrapping_difference(_pending->head, head)) { - return grab_result::pending; - } - - capacity_t cap = ent._capacity; - if (cap > _pending->cap) { - return grab_result::cant_preempt; +auto fair_queue::reap_pending_capacity(bool& contact) noexcept -> capacity_t { + capacity_t result = 0; + contact = true; + if (_pending.cap) { + _group.maybe_replenish_capacity(_group_replenish); + capacity_t head = _group.head(); + // tracepoint_grab_capacity(ent._capacity, _pending->head, head); + auto diff = internal::wrapping_difference(_pending.head, head); + contact = diff <= _pending.cap; + if (diff < _pending.cap) { + result = _pending.cap - diff; + _pending.cap = diff; + } } - - _pending.reset(); - return grab_result::grabbed; + return result; } auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - if (_pending) { - return grab_pending_capacity(ent); - } - capacity_t cap = ent._capacity; capacity_t want_head = _group.grab_capacity(cap); capacity_t head = _group.head(); tracepoint_grab_capacity(ent._capacity, want_head, head); - if (internal::wrapping_difference(want_head, head)) { - _pending.emplace(want_head, cap); - return grab_result::pending; - } - + _pending = pending{want_head, cap}; return grab_result::grabbed; } @@ -307,17 +303,19 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { push_priority_class_from_idle(pc); } pc._queue.push_back(ent); + _queued_cap += ent.capacity(); } void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept { } void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { + _queued_cap -= ent._capacity; ent._capacity = 0; } fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { - if (_pending) { + if (_pending.cap) { /* * We expect the disk to release the ticket within some time, * but it's ... OK if it doesn't -- the pending wait still @@ -328,7 +326,7 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept * which's sub-optimal. The expectation is that we think disk * works faster, than it really does. */ - auto over = _group.capacity_deficiency(_pending->head); + auto over = _group.capacity_deficiency(_pending.head); auto ticks = _group.capacity_duration(over); return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); } @@ -337,11 +335,15 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept } void fair_queue::dispatch_requests(std::function cb) { - capacity_t dispatched = 0; - boost::container::small_vector preempt; + tracepoint_dispatch_requests(_queued_cap); + _pending.cap = std::min(_pending.cap, _queued_cap); + bool contact = false; + capacity_t available = reap_pending_capacity(contact); + capacity_t recycled = 0; + uint64_t can_grab_this_tick = _group.per_tick_grab_threshold(); // tracepoint_dispatch_requests(); - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); tracepoint_dispatch_queue(h._id); if (h._queue.empty() || !h._plugged) { @@ -350,16 +352,29 @@ void fair_queue::dispatch_requests(std::function cb) { } auto& req = h._queue.front(); - auto gr = grab_capacity(req); - if (gr == grab_result::pending) { + if (req._capacity <= available) { + // pass + } else if (req._capacity <= available + _pending.cap) { + _pending.cap += available; + available = 0; + break; + } else if (contact) { + can_grab_this_tick += available + _pending.cap; + recycled = available + _pending.cap; + _pending.cap = 0; + available = 0; + capacity_t grab_amount = std::min(can_grab_this_tick, _queued_cap); + grab_capacity(grab_amount); + can_grab_this_tick -= grab_amount; + available += reap_pending_capacity(contact); + if (req._capacity > available) { + break; + } + } else { break; } - if (gr == grab_result::cant_preempt) { - pop_priority_class(h); - preempt.emplace_back(&h); - continue; - } + available -= req._capacity; _last_accumulated = std::max(h._accumulated, _last_accumulated); pop_priority_class(h); @@ -386,7 +401,7 @@ void fair_queue::dispatch_requests(std::function cb) { } h._accumulated += req_cost; h._pure_accumulated += req_cap; - dispatched += req_cap; + _queued_cap -= req_cap; cb(req); @@ -395,8 +410,11 @@ void fair_queue::dispatch_requests(std::function cb) { } } - for (auto&& h : preempt) { - push_priority_class(*h); + if (_pending.cap == 0 && available) { + grab_capacity(available); + } + if (available + recycled) { + _group.refund_tokens(available + recycled); } } diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index dad0cfd02d..f79236695b 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -259,6 +259,7 @@ class io_desc_read_write final : public io_completion { } void cancel() noexcept { + tracepoint_io_cancel(_io_id); _pclass.on_cancel(); _pr.set_exception(std::make_exception_ptr(default_io_exception_factory::cancelled())); delete this;