Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
michoecho committed Dec 17, 2024
1 parent b68efd3 commit e2f29cb
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 164 deletions.
3 changes: 2 additions & 1 deletion include/seastar/core/byteorder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ read_le(const char* p) noexcept {

template <typename T>
inline
void
char*
write_le(char* p, T datum) noexcept {
datum = cpu_to_le(datum);
std::copy_n(reinterpret_cast<const char*>(&datum), sizeof(T), p);
return p + sizeof(T);
}

template <typename T>
Expand Down
3 changes: 2 additions & 1 deletion include/seastar/core/execution_stage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ private:
auto wi_ready = std::move(wi._ready);
_queue.pop_front();
{
auto st = switch_task(11, wi._task_id);
tracepoint_run_execution_stage_task(wi._task_id);
auto st = switch_task(wi._task_id);
futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
}
_stats.function_calls_executed++;
Expand Down
1 change: 1 addition & 0 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public:
void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;

capacity_t capacity_deficiency(capacity_t from) const noexcept;
capacity_t head() const noexcept;

std::chrono::duration<double> rate_limit_duration() const noexcept {
std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
Expand Down
48 changes: 2 additions & 46 deletions include/seastar/core/task.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <seastar/core/scheduling.hh>
#include <seastar/util/backtrace.hh>
#include <seastar/util/tracer.hh>

#ifndef SEASTAR_MODULE
#include <utility>
Expand All @@ -32,46 +33,6 @@ namespace seastar {

SEASTAR_MODULE_EXPORT

inline int64_t rdtsc() {
uint64_t rax, rdx;
asm volatile ( "rdtsc" : "=a" (rax), "=d" (rdx) );
return (int64_t)(( rdx << 32 ) + rax);
}

struct tracer {
struct entry {
uint64_t event;
uint64_t id;
uint64_t arg;
int64_t ts;
};

tracer();
static constexpr size_t buffer_size = (32 * 1024);
std::vector<entry> _buf;
size_t _head = 0;
size_t _tail = buffer_size - 1;
void add(uint64_t event, uint64_t id, uint64_t arg) {
if (_head == _tail) {
return;
}
_buf[_head++] = entry{.event = event, .id = id, .arg = arg, .ts = rdtsc()};
if (_head % (buffer_size / 2) == 0) {
commit();
if (_head == buffer_size) {
_head = 0;
}
}
}

struct impl;
std::unique_ptr<impl> _impl;
void commit();
void start();
future<> stop();
};
extern thread_local tracer g_tracer;

extern thread_local uint64_t fresh_task_id;
extern thread_local uint64_t current_task_id;

Expand All @@ -83,19 +44,14 @@ struct task_id {

struct [[nodiscard]] switch_task {
task_id _prev;
switch_task(uint64_t event, uint64_t id) {
switch_task(uint64_t id) {
current_task_id = id;
g_tracer.add(event, _prev, current_task_id);
}
~switch_task() {
current_task_id = _prev;
}
};

inline void task_event(uint64_t event, uint64_t arg, uint64_t id = current_task_id) {
g_tracer.add(event, id, arg);
}

class task {
public:
task_id _id;
Expand Down
3 changes: 2 additions & 1 deletion include/seastar/util/shared_token_bucket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ class shared_token_bucket {
rovers_t _rovers;

T tail() const noexcept { return _rovers.tail.load(std::memory_order_relaxed); }
public:
T head() const noexcept { return _rovers.head.load(std::memory_order_relaxed); }

private:
/*
* Need to make sure that the multiplication in accumulated_in() doesn't
* overflow. Not to introduce an extra branch there, define that the
Expand Down
177 changes: 177 additions & 0 deletions include/seastar/util/tracer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#pragma once

#include <deque>
#include <vector>
#include <cstdint>
#include <cstring>
#include <seastar/core/byteorder.hh>

namespace seastar {

struct tracer {
static constexpr size_t buffer_size = (128 * 1024);
std::deque<std::vector<std::byte>> _old;
std::vector<std::byte> _current;
size_t _cur_pos = 0;

tracer() {
for (int i = 0; i < 80; ++i) {
_old.push_back(std::vector<std::byte>());
}
_current.resize(buffer_size);
}

void rotate() {
_current.resize(_cur_pos);
_old.push_back(std::move(_current));
_current = std::move(_old.front());
_old.pop_front();
_current.resize(buffer_size);
_cur_pos = 0;
}

std::byte* write(size_t n) {
if (_current.size() - _cur_pos < n) [[unlikely]] {
rotate();
}
auto result = &_current[_cur_pos];
_cur_pos += n;
return result;
}

std::deque<std::vector<std::byte>> snapshot() {
auto result = _old;
auto cur = _current;
cur.resize(_cur_pos);
result.push_back(cur);
return result;
}

uint64_t rdtsc() {
uint64_t rax, rdx;
asm volatile ( "rdtsc" : "=a" (rax), "=d" (rdx) );
return (uint64_t)(( rdx << 32 ) + rax);
}
};
extern thread_local tracer g_tracer;


enum class trace_events {
POLL,
SLEEP,
WAKEUP,
RUN_TASK_QUEUE,
RUN_TASK_QUEUE_END,
RUN_TASK,
RUN_EXECUTION_STAGE_TASK,
GRAB_CAPACITY,
GRAB_CAPACITY_PENDING,
DISPATCH_REQUESTS,
DISPATCH_QUEUE,
REPLENISH,
IO_QUEUE,
IO_DISPATCH,
IO_COMPLETE,
COUNT,
};

[[gnu::always_inline]]
inline void tracepoint_nullary(trace_events header) {
auto p = reinterpret_cast<char*>(g_tracer.write(12));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(header));
p = seastar::write_le<uint64_t>(p, g_tracer.rdtsc());
}

template <typename T>
[[gnu::always_inline]]
inline void tracepoint_unary(uint32_t header, T arg) {
auto p = reinterpret_cast<char*>(g_tracer.write(12 + sizeof(T)));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(header));
p = seastar::write_le<uint64_t>(p, g_tracer.rdtsc());
p = seastar::write_le<T>(p, arg);
}

template <typename T>
[[gnu::always_inline]]
inline void tracepoint_unary(trace_events header, T arg) {
tracepoint_unary(static_cast<uint32_t>(header), arg);
}

inline void tracepoint_poll() {
tracepoint_nullary(trace_events::POLL);
}

inline void tracepoint_sleep() {
tracepoint_nullary(trace_events::SLEEP);
}

inline void tracepoint_wakeup() {
tracepoint_nullary(trace_events::WAKEUP);
}

inline void tracepoint_run_task_queue(uint8_t sg) {
tracepoint_unary<uint8_t>(trace_events::RUN_TASK_QUEUE, sg);
}

inline void tracepoint_run_task_queue_end() {
tracepoint_nullary(trace_events::RUN_TASK_QUEUE_END);
}

inline void tracepoint_run_task(int64_t task_id) {
tracepoint_unary<uint64_t>(trace_events::RUN_TASK, task_id);
}

inline void tracepoint_run_execution_stage_task(int64_t task_id) {
tracepoint_unary<uint64_t>(trace_events::RUN_EXECUTION_STAGE_TASK, task_id);
}

inline void tracepoint_io_queue(uint8_t direction, uint64_t tokens, uint64_t io_id) {
auto p = reinterpret_cast<char*>(g_tracer.write(12 + 17));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(trace_events::IO_QUEUE));
p = seastar::write_le<uint64_t>(p, g_tracer.rdtsc());
p = seastar::write_le<uint8_t>(p, direction);
p = seastar::write_le<uint64_t>(p, tokens);
p = seastar::write_le<uint64_t>(p, io_id);
}

inline void tracepoint_io_dispatch(uint64_t io_id) {
tracepoint_unary<uint64_t>(trace_events::IO_DISPATCH, io_id);
}

inline void tracepoint_io_complete(uint64_t io_id) {
tracepoint_unary<uint64_t>(trace_events::IO_COMPLETE, io_id);
}

inline void tracepoint_grab_capacity(uint64_t cap, uint64_t want_head, uint64_t head) {
auto p = reinterpret_cast<char*>(g_tracer.write(12 + 24));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(trace_events::GRAB_CAPACITY));
p = seastar::write_le<uint64_t>(p, g_tracer.rdtsc());
p = seastar::write_le<uint64_t>(p, cap);
p = seastar::write_le<uint64_t>(p, want_head);
p = seastar::write_le<uint64_t>(p, head);
}

inline void tracepoint_grab_capacity_pending(uint64_t cap, uint64_t head) {
auto p = reinterpret_cast<char*>(g_tracer.write(12 + 16));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(trace_events::GRAB_CAPACITY_PENDING));
p = seastar::write_le<uint64_t>(p, g_tracer.rdtsc());
p = seastar::write_le<uint64_t>(p, cap);
p = seastar::write_le<uint64_t>(p, head);
}

inline void tracepoint_replenish(uint64_t new_head) {
tracepoint_unary<uint64_t>(trace_events::REPLENISH, new_head);
}

inline void tracepoint_dispatch_queue(uint8_t id) {
auto p = reinterpret_cast<char*>(g_tracer.write(5));
p = seastar::write_le<uint32_t>(p, static_cast<uint32_t>(trace_events::DISPATCH_QUEUE));
p = seastar::write_le<uint8_t>(p, id);
}

inline void tracepoint_dispatch_requests() {
tracepoint_nullary(trace_events::DISPATCH_REQUESTS);
}


} // namespace seastar
20 changes: 16 additions & 4 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {

void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
_token_bucket.replenish(now);
tracepoint_replenish(_token_bucket.head());
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
Expand All @@ -131,6 +132,10 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity
return _token_bucket.deficiency(from);
}

auto fair_group::head() const noexcept -> capacity_t {
return _token_bucket.head();
}

// Priority class, to be used with a given fair_queue
class fair_queue::priority_class_data {
friend class fair_queue;
Expand All @@ -141,9 +146,10 @@ class fair_queue::priority_class_data {
bool _queued = false;
bool _plugged = true;
uint32_t _activations = 0;
fair_queue::class_id _id;

public:
explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
explicit priority_class_data(uint32_t shares, fair_queue::class_id id) noexcept : _shares(std::max(shares, 1u)), _id(id) {}
priority_class_data(const priority_class_data&) = delete;
priority_class_data(priority_class_data&&) = delete;

Expand Down Expand Up @@ -226,7 +232,9 @@ void fair_queue::unplug_class(class_id cid) noexcept {
auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
_group.maybe_replenish_capacity(_group_replenish);

if (_group.capacity_deficiency(_pending->head)) {
capacity_t head = _group.head();
tracepoint_grab_capacity(ent._capacity, _pending->head, head);
if (internal::wrapping_difference(_pending->head, head)) {
return grab_result::pending;
}

Expand All @@ -246,7 +254,9 @@ auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_res

capacity_t cap = ent._capacity;
capacity_t want_head = _group.grab_capacity(cap);
if (_group.capacity_deficiency(want_head)) {
capacity_t head = _group.head();
tracepoint_grab_capacity(ent._capacity, want_head, head);
if (internal::wrapping_difference(want_head, head)) {
_pending.emplace(want_head, cap);
return grab_result::pending;
}
Expand All @@ -262,7 +272,7 @@ void fair_queue::register_priority_class(class_id id, uint32_t shares) {
}

_handles.reserve(_nr_classes + 1);
_priority_classes[id] = std::make_unique<priority_class_data>(shares);
_priority_classes[id] = std::make_unique<priority_class_data>(shares, id);
_nr_classes++;
}

Expand Down Expand Up @@ -330,8 +340,10 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;

tracepoint_dispatch_requests();
while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
priority_class_data& h = *_handles.top();
tracepoint_dispatch_queue(h._id);
if (h._queue.empty() || !h._plugged) {
pop_priority_class(h);
continue;
Expand Down
Loading

0 comments on commit e2f29cb

Please sign in to comment.