Skip to content

Commit

Permalink
fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
michoecho committed Dec 22, 2024
1 parent bd5aa72 commit b0ec97d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 47 deletions.
12 changes: 6 additions & 6 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public:
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
capacity_t grab_capacity(capacity_t cap) noexcept;
clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
void refund_tokens(capacity_t) noexcept;
void replenish_capacity(clock_type::time_point now) noexcept;
void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;

Expand Down Expand Up @@ -344,13 +345,12 @@ private:
* in the middle of the waiting
*/
struct pending {
capacity_t head;
capacity_t cap;

pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
capacity_t head = 0;
capacity_t cap = 0;
};

std::optional<pending> _pending;
pending _pending;
capacity_t _queued_cap = 0;

void push_priority_class(priority_class_data& pc) noexcept;
void push_priority_class_from_idle(priority_class_data& pc) noexcept;
Expand All @@ -360,7 +360,7 @@ private:

enum class grab_result { grabbed, cant_preempt, pending };
grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
capacity_t reap_pending_capacity(bool& contact) noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
Expand Down
4 changes: 4 additions & 0 deletions include/seastar/util/shared_token_bucket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ public:
_rovers.release(tokens);
}

void refund(T tokens) noexcept {
fetch_add(_rovers.head, tokens);
}

void replenish(typename Clock::time_point now) noexcept {
auto ts = _replenished.load(std::memory_order_relaxed);

Expand Down
9 changes: 7 additions & 2 deletions include/seastar/util/tracer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ enum class trace_events {
IO_QUEUE,
IO_DISPATCH,
IO_COMPLETE,
IO_CANCEL,
MONITORING_SCRAPE,
COUNT,
};
Expand Down Expand Up @@ -143,6 +144,10 @@ inline void tracepoint_io_complete(uint64_t io_id) {
tracepoint_unary<uint64_t>(trace_events::IO_COMPLETE, io_id);
}

inline void tracepoint_io_cancel(uint64_t io_id) {
tracepoint_unary<uint64_t>(trace_events::IO_CANCEL, io_id);
}

inline void tracepoint_grab_capacity(uint64_t cap, uint64_t want_head, uint64_t head) {
auto p = reinterpret_cast<char*>(g_tracer.write(12 + 24));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(trace_events::GRAB_CAPACITY));
Expand All @@ -168,8 +173,8 @@ inline void tracepoint_dispatch_queue(uint8_t id) {
tracepoint_unary<uint8_t>(trace_events::DISPATCH_QUEUE, id);
}

inline void tracepoint_dispatch_requests() {
tracepoint_nullary(trace_events::DISPATCH_REQUESTS);
inline void tracepoint_dispatch_requests(uint64_t queued) {
tracepoint_unary<uint64_t>(trace_events::DISPATCH_REQUESTS, queued);
}

inline void tracepoint_monitoring_scrape() {
Expand Down
96 changes: 57 additions & 39 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
tracepoint_replenish(_token_bucket.head());
}

void fair_group::refund_tokens(capacity_t cap) noexcept {
_token_bucket.refund(cap);
tracepoint_replenish(_token_bucket.head());
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
auto now = clock_type::now();
auto extra = _token_bucket.accumulated_in(now - local_ts);
Expand Down Expand Up @@ -229,38 +234,29 @@ void fair_queue::unplug_class(class_id cid) noexcept {
unplug_priority_class(*_priority_classes[cid]);
}

auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
_group.maybe_replenish_capacity(_group_replenish);

capacity_t head = _group.head();
tracepoint_grab_capacity(ent._capacity, _pending->head, head);
if (internal::wrapping_difference(_pending->head, head)) {
return grab_result::pending;
}

capacity_t cap = ent._capacity;
if (cap > _pending->cap) {
return grab_result::cant_preempt;
auto fair_queue::reap_pending_capacity(bool& contact) noexcept -> capacity_t {
capacity_t result = 0;
contact = true;
if (_pending.cap) {
_group.maybe_replenish_capacity(_group_replenish);
capacity_t head = _group.head();
// tracepoint_grab_capacity(ent._capacity, _pending->head, head);
auto diff = internal::wrapping_difference(_pending.head, head);
contact = diff <= _pending.cap;
if (diff < _pending.cap) {
result = _pending.cap - diff;
_pending.cap = diff;
}
}

_pending.reset();
return grab_result::grabbed;
return result;
}

auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
if (_pending) {
return grab_pending_capacity(ent);
}

capacity_t cap = ent._capacity;
capacity_t want_head = _group.grab_capacity(cap);
capacity_t head = _group.head();
tracepoint_grab_capacity(ent._capacity, want_head, head);
if (internal::wrapping_difference(want_head, head)) {
_pending.emplace(want_head, cap);
return grab_result::pending;
}

_pending = pending{want_head, cap};
return grab_result::grabbed;
}

Expand Down Expand Up @@ -307,17 +303,19 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept {
push_priority_class_from_idle(pc);
}
pc._queue.push_back(ent);
_queued_cap += ent.capacity();
}

void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept {
}

void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
_queued_cap -= ent._capacity;
ent._capacity = 0;
}

fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept {
if (_pending) {
if (_pending.cap) {
/*
* We expect the disk to release the ticket within some time,
* but it's ... OK if it doesn't -- the pending wait still
Expand All @@ -328,7 +326,7 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept
* which's sub-optimal. The expectation is that we think disk
* works faster, than it really does.
*/
auto over = _group.capacity_deficiency(_pending->head);
auto over = _group.capacity_deficiency(_pending.head);
auto ticks = _group.capacity_duration(over);
return std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::microseconds>(ticks);
}
Expand All @@ -337,11 +335,15 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept
}

void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;
tracepoint_dispatch_requests(_queued_cap);
_pending.cap = std::min(_pending.cap, _queued_cap);
bool contact = false;
capacity_t available = reap_pending_capacity(contact);
capacity_t recycled = 0;
uint64_t can_grab_this_tick = _group.per_tick_grab_threshold();

// tracepoint_dispatch_requests();
while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
while (!_handles.empty()) {
priority_class_data& h = *_handles.top();
tracepoint_dispatch_queue(h._id);
if (h._queue.empty() || !h._plugged) {
Expand All @@ -350,16 +352,29 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}

auto& req = h._queue.front();
auto gr = grab_capacity(req);
if (gr == grab_result::pending) {
if (req._capacity <= available) {
// pass
} else if (req._capacity <= available + _pending.cap) {
_pending.cap += available;
available = 0;
break;
} else if (contact) {
can_grab_this_tick += available + _pending.cap;
recycled = available + _pending.cap;
_pending.cap = 0;
available = 0;
capacity_t grab_amount = std::min<capacity_t>(can_grab_this_tick, _queued_cap);
grab_capacity(grab_amount);
can_grab_this_tick -= grab_amount;
available += reap_pending_capacity(contact);
if (req._capacity > available) {
break;
}
} else {
break;
}

if (gr == grab_result::cant_preempt) {
pop_priority_class(h);
preempt.emplace_back(&h);
continue;
}
available -= req._capacity;

_last_accumulated = std::max(h._accumulated, _last_accumulated);
pop_priority_class(h);
Expand All @@ -386,7 +401,7 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
h._accumulated += req_cost;
h._pure_accumulated += req_cap;
dispatched += req_cap;
_queued_cap -= req_cap;

cb(req);

Expand All @@ -395,8 +410,11 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
}

for (auto&& h : preempt) {
push_priority_class(*h);
if (_pending.cap == 0 && available) {
grab_capacity(available);
}
if (available + recycled) {
_group.refund_tokens(available + recycled);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class io_desc_read_write final : public io_completion {
}

void cancel() noexcept {
tracepoint_io_cancel(_io_id);
_pclass.on_cancel();
_pr.set_exception(std::make_exception_ptr(default_io_exception_factory::cancelled()));
delete this;
Expand Down

0 comments on commit b0ec97d

Please sign in to comment.