Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/seastar/core/internal/pollable_fd.hh
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public:
writeable_eventfd(writeable_eventfd&&) = default;
readable_eventfd read_side();
void signal(size_t nr);
std::optional<size_t> consume();
int get_read_fd() { return _fd.get(); }
private:
explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {}
Expand Down
2 changes: 1 addition & 1 deletion include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private:
std::shared_ptr<seastar::smp> _smp;
alien::instance& _alien;
reactor_config _cfg;
file_desc _notify_eventfd;
writeable_eventfd _notify;
file_desc _task_quota_timer;
std::unique_ptr<reactor_backend> _backend;
sigset_t _active_sigmask; // holds sigmask while sleeping with sig disabled
Expand Down
13 changes: 7 additions & 6 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1026,12 +1026,11 @@ reactor::reactor(std::shared_ptr<seastar::smp> smp, alien::instance& alien, unsi
: _smp(std::move(smp))
, _alien(alien)
, _cfg(std::move(cfg))
, _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
, _id(id)
, _cpu_stall_detector(internal::make_cpu_stall_detector())
, _cpu_sched(nullptr, 0)
, _thread_pool(std::make_unique<thread_pool>(seastar::format("syscall-{}", id), _notify_eventfd)) {
, _thread_pool(std::make_unique<thread_pool>(seastar::format("syscall-{}", id), _notify)) {
/*
* The _backend assignment is here, not on the initialization list as
* the chosen backend constructor may want to handle signals and thus
Expand Down Expand Up @@ -2953,10 +2952,7 @@ reactor::wakeup() {

// We are free to clear it, because we're sending a signal now
_sleeping.store(false, std::memory_order_relaxed);

uint64_t one = 1;
auto res = ::write(_notify_eventfd.get(), &one, sizeof(one));
SEASTAR_ASSERT(res == sizeof(one) && "write(2) failed on _reactor._notify_eventfd");
_notify.signal(1);
}

void reactor::start_aio_eventfd_loop() {
Expand Down Expand Up @@ -3748,6 +3744,11 @@ void writeable_eventfd::signal(size_t count) {
SEASTAR_ASSERT(r == sizeof(c));
}

std::optional<size_t> writeable_eventfd::consume() {
uint64_t c;
return _fd.read(&c, sizeof(c));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be part of the read side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read-side uses await-able wrapper around pollable_fd, which is not needed by the users of this new method. Actually the writeable_eventfd and readable_eventfd are supposed to play with each other, while reactor only needs the writeable end. I'll think on it a bit more


writeable_eventfd readable_eventfd::write_side() {
return writeable_eventfd(_fd.get_file_desc().dup());
}
Expand Down
27 changes: 13 additions & 14 deletions src/core/reactor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,10 @@ task_quota_aio_completion::task_quota_aio_completion(file_desc& fd)
: fd_kernel_completion(fd)
, completion_with_iocb(fd.get(), POLLIN, this) {}

smp_wakeup_aio_completion::smp_wakeup_aio_completion(file_desc& fd)
: fd_kernel_completion(fd)
, completion_with_iocb(fd.get(), POLLIN, this) {}
smp_wakeup_aio_completion::smp_wakeup_aio_completion(writeable_eventfd& fd)
: completion_with_iocb(fd.get_read_fd(), POLLIN, this)
, _efd(fd)
{}

void
hrtimer_aio_completion::complete_with(ssize_t ret) {
Expand All @@ -413,8 +414,7 @@ task_quota_aio_completion::complete_with(ssize_t ret) {

void
smp_wakeup_aio_completion::complete_with(ssize_t ret) {
uint64_t ignore = 0;
(void)_fd.read(&ignore, 8);
_efd.consume();
completion_with_iocb::completed();
}

Expand Down Expand Up @@ -526,7 +526,7 @@ reactor_backend_aio::reactor_backend_aio(reactor& r)
, _preempting_io(_r, _r._task_quota_timer, _hrtimer_timerfd)
, _polling_io(_r._cfg.max_networking_aio_io_control_blocks)
, _hrtimer_poll_completion(_r, _hrtimer_timerfd)
, _smp_wakeup_aio_completion(_r._notify_eventfd)
, _smp_wakeup_aio_completion(_r._notify)
{
// Protect against spurious wakeups - if we get notified that the timer has
// expired when it really hasn't, we don't want to block in read(tfd, ...).
Expand Down Expand Up @@ -727,7 +727,7 @@ reactor_backend_epoll::reactor_backend_epoll(reactor& r)
::epoll_event event;
event.events = EPOLLIN;
event.data.ptr = nullptr;
auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify_eventfd.get(), &event);
auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify.get_read_fd(), &event);
throw_system_error_on(ret == -1);
event.events = EPOLLIN;
event.data.ptr = &_steady_clock_timer_reactor_thread;
Expand Down Expand Up @@ -854,8 +854,7 @@ reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigm
auto& evt = eevt[i];
auto pfd = reinterpret_cast<pollable_fd_state*>(evt.data.ptr);
if (!pfd) {
char dummy[8];
_r._notify_eventfd.read(dummy, 8);
_r._notify.consume();
continue;
}
if (evt.data.ptr == &_steady_clock_timer_reactor_thread) {
Expand Down Expand Up @@ -1227,13 +1226,13 @@ class reactor_backend_uring final : public reactor_backend {
};

// eventfd and timerfd both need an 8-byte read after completion
class recurring_eventfd_or_timerfd_completion : public fd_kernel_completion {
class recurring_eventfd_or_timerfd_completion final : public kernel_completion {
writeable_eventfd& _efd;
bool _armed = false;
public:
explicit recurring_eventfd_or_timerfd_completion(file_desc& fd) : fd_kernel_completion(fd) {}
explicit recurring_eventfd_or_timerfd_completion(writeable_eventfd& efd) noexcept : _efd(efd) {}
virtual void complete_with(ssize_t res) override {
char garbage[8];
auto ret = _fd.read(garbage, 8);
auto ret = _efd.consume();
// Note: for hrtimer_completion we can have spurious wakeups,
// since we wait for this using both _preempt_io_context and the
// ring. So don't assert that we read anything.
Expand Down Expand Up @@ -1433,7 +1432,7 @@ class reactor_backend_uring final : public reactor_backend {
, _hrtimer_timerfd(make_timerfd())
, _preempt_io_context(_r, _r._task_quota_timer, _hrtimer_timerfd)
, _hrtimer_completion(_r, _hrtimer_timerfd)
, _smp_wakeup_completion(_r._notify_eventfd) {
, _smp_wakeup_completion(_r._notify) {
// Protect against spurious wakeups - if we get notified that the timer has
// expired when it really hasn't, we don't want to block in read(tfd, ...).
auto tfd = _r._task_quota_timer.get();
Expand Down
7 changes: 5 additions & 2 deletions src/core/reactor_backend.hh
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,12 @@ struct task_quota_aio_completion : public fd_kernel_completion,
virtual void complete_with(ssize_t value) override;
};

struct smp_wakeup_aio_completion : public fd_kernel_completion,
struct smp_wakeup_aio_completion final : public kernel_completion,
public completion_with_iocb {
smp_wakeup_aio_completion(file_desc& fd);
private:
writeable_eventfd& _efd;
public:
smp_wakeup_aio_completion(writeable_eventfd& fd);
virtual void complete_with(ssize_t value) override;
};

Expand Down
6 changes: 2 additions & 4 deletions src/core/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module seastar;

namespace seastar {

thread_pool::thread_pool(sstring name, file_desc& notify) : _notify_eventfd(notify), _worker_thread([this, name] { work(name); }) {
thread_pool::thread_pool(sstring name, writeable_eventfd& notify) : _notify(notify), _worker_thread([this, name] { work(name); }) {
}

void thread_pool::work(sstring name) {
Expand Down Expand Up @@ -68,9 +68,7 @@ void thread_pool::work(sstring name) {
// Prevent the following load of _main_thread_idle to be hoisted before the writes to _completed above.
std::atomic_thread_fence(std::memory_order_seq_cst);
if (_main_thread_idle.load(std::memory_order_relaxed)) {
uint64_t one = 1;
auto res = ::write(_notify_eventfd.get(), &one, 8);
SEASTAR_ASSERT(res == 8 && "write(2) failed on _reactor._notify_eventfd");
_notify.signal(1);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/thread_pool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public:
} // namespace internal

class thread_pool {
file_desc& _notify_eventfd;
writeable_eventfd& _notify;
internal::submit_metrics metrics;
syscall_work_queue inter_thread_wq;
posix_thread _worker_thread;
std::atomic<bool> _stopped = { false };
std::atomic<bool> _main_thread_idle = { false };
public:
explicit thread_pool(sstring thread_name, file_desc& notify);
explicit thread_pool(sstring thread_name, writeable_eventfd& notify);
~thread_pool();
template <typename T, typename Func>
future<T> submit(internal::thread_pool_submit_reason reason, Func func) noexcept {
Expand Down