Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public:
void update_shares_for_class_group(unsigned index, size_t new_shares);
future<> update_bandwidth_for_class(internal::priority_class pc, uint64_t new_bandwidth);
void rename_priority_class(internal::priority_class pc, sstring new_name);
void destroy_priority_class(internal::priority_class pc) noexcept;
void throttle_priority_class(const priority_class_data& pc) noexcept;
void unthrottle_priority_class(const priority_class_data& pc) noexcept;

Expand Down
15 changes: 15 additions & 0 deletions src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ class io_queue::priority_class_data {
priority_class_data(const priority_class_data&) = delete;
priority_class_data(priority_class_data&&) = delete;

~priority_class_data() {
SEASTAR_ASSERT(_nr_queued == 0);
SEASTAR_ASSERT(_nr_executing == 0);
}

void on_queue() noexcept {
_nr_queued++;
if (_nr_executing == 0 && _nr_queued == 1) {
Expand Down Expand Up @@ -1140,6 +1145,16 @@ io_queue::rename_priority_class(internal::priority_class pc, sstring new_name) {
}
}

void io_queue::destroy_priority_class(internal::priority_class pc) noexcept {
if (_priority_classes.size() > pc.id() && _priority_classes[pc.id()]) {
auto& pc_ptr = _priority_classes[pc.id()];
for (auto&& s : _streams) {
s.fq.unregister_priority_class(pc_ptr->fq_class());
}
pc_ptr.reset();
}
}

void io_queue::throttle_priority_class(const priority_class_data& pc) noexcept {
for (auto&& s : _streams) {
s.fq.unplug_class(pc.fq_class());
Expand Down
3 changes: 3 additions & 0 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4982,6 +4982,9 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept {
}).then( [this, sg] () {
get_sg_data(sg).queue_is_initialized = false;
_task_queues[sg._id].reset();
for (auto&& queue : _io_queues) {
queue.second->destroy_priority_class(internal::priority_class(sg));
}
});

}
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/io_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ struct io_queue_for_tests {
fair_queue& get_fair_queue() {
return queue._streams[0].fq;
}

bool is_class_registered(internal::priority_class pc) const noexcept {
return queue._priority_classes.size() > pc.id() && (queue._priority_classes[pc.id()] != nullptr);
}
};

internal::priority_class get_default_pc() {
Expand Down Expand Up @@ -666,3 +670,28 @@ SEASTAR_THREAD_TEST_CASE(test_nested_priority_classes_basic_linkage) {
BOOST_CHECK(fq.get_parent_index(internal::priority_class(sg1)) == 0);
BOOST_CHECK(fq.get_parent_index(internal::priority_class(sg2)) == 1);
}

SEASTAR_THREAD_TEST_CASE(test_destroy_priority_class_with_requests) {
io_queue_for_tests tio;

auto sg = create_scheduling_group("a", 100).get();
auto pc = internal::priority_class(sg);

auto fx = tio.queue_request(pc,
internal::io_direction_and_length(internal::io_direction_and_length::read_idx, 0),
internal::io_request::make_write(0, 0, nullptr, 1, false),
nullptr, {});

// Push this request through to make io-queue instantiate the priority class
tio.queue.poll_io_queue();
tio.sink.drain([] (const internal::io_request& rq, io_completion* desc) -> bool {
desc->complete_with(1);
return true;
});
BOOST_REQUIRE_EQUAL(fx.get(), 1);

BOOST_REQUIRE(tio.is_class_registered(pc));
tio.queue.destroy_priority_class(internal::priority_class(sg));
destroy_scheduling_group(sg).get();
BOOST_REQUIRE(!tio.is_class_registered(pc));
}