Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
michoecho committed Dec 19, 2024
1 parent e2f29cb commit bd5aa72
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 6 deletions.
3 changes: 3 additions & 0 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include <cstring>
#include <memory>
#include <string_view>
#include <map>
#include <unordered_map>
#include <vector>
#include <unistd.h>
Expand Down Expand Up @@ -281,6 +282,8 @@ private:
};

boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
public:
std::vector<std::tuple<int, std::reference_wrapper<const sstring>, float>> list_groups();
internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data;
int64_t _last_vruntime = 0;
task_queue_list _active_task_queues;
Expand Down
11 changes: 7 additions & 4 deletions include/seastar/util/tracer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct tracer {
size_t _cur_pos = 0;

tracer() {
for (int i = 0; i < 80; ++i) {
for (int i = 0; i < 480; ++i) {
_old.push_back(std::vector<std::byte>());
}
_current.resize(buffer_size);
Expand Down Expand Up @@ -72,6 +72,7 @@ enum class trace_events {
IO_QUEUE,
IO_DISPATCH,
IO_COMPLETE,
MONITORING_SCRAPE,
COUNT,
};

Expand Down Expand Up @@ -164,14 +165,16 @@ inline void tracepoint_replenish(uint64_t new_head) {
}

inline void tracepoint_dispatch_queue(uint8_t id) {
auto p = reinterpret_cast<char*>(g_tracer.write(5));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(trace_events::DISPATCH_QUEUE));
p = seastar::write_le<uint8_t>(p, id);
tracepoint_unary<uint8_t>(trace_events::DISPATCH_QUEUE, id);
}

inline void tracepoint_dispatch_requests() {
tracepoint_nullary(trace_events::DISPATCH_REQUESTS);
}

inline void tracepoint_monitoring_scrape() {
tracepoint_nullary(trace_events::MONITORING_SCRAPE);
}


} // namespace seastar
2 changes: 1 addition & 1 deletion src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;

tracepoint_dispatch_requests();
// tracepoint_dispatch_requests();
while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
priority_class_data& h = *_handles.top();
tracepoint_dispatch_queue(h._id);
Expand Down
2 changes: 1 addition & 1 deletion src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class io_desc_read_write final : public io_completion {
io_log.trace("dev {} : req {} queue len {} capacity {}", _ioq.dev_id(), fmt::ptr(this), _dnl.length(), _fq_capacity);
static thread_local uint64_t io_id = 0;
_io_id = io_id++;
tracepoint_io_queue(_fq_capacity, dnl.rw_idx(), _io_id);
tracepoint_io_queue(dnl.rw_idx(), _fq_capacity, _io_id);
}

virtual void set_exception(std::exception_ptr eptr) noexcept override {
Expand Down
1 change: 1 addition & 0 deletions src/core/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ const value_map& get_value_map() {
}

foreign_ptr<values_reference> get_values() {
tracepoint_monitoring_scrape();
shared_ptr<values_copy> res_ref = ::seastar::make_shared<values_copy>();
auto& res = *(res_ref.get());
auto& mv = res.values;
Expand Down
10 changes: 10 additions & 0 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ shard_id reactor::cpu_id() const {
return _id;
}

std::vector<std::tuple<int, std::reference_wrapper<const sstring>, float>> reactor::list_groups() {
std::vector<std::tuple<int, std::reference_wrapper<const sstring>, float>> result;
for (int i = 0; i < _task_queues.size(); ++i) {
if (_task_queues[i]) {
result.emplace_back(i, _task_queues[i]->_name, _task_queues[i]->_shares);
}
}
return result;
}

void reactor::update_shares_for_queues(internal::priority_class pc, uint32_t shares) {
for (auto&& q : _io_queues) {
q.second->update_shares_for_class(pc, shares);
Expand Down

0 comments on commit bd5aa72

Please sign in to comment.