diff --git a/include/seastar/core/internal/pollable_fd.hh b/include/seastar/core/internal/pollable_fd.hh index fb4403433fc..c49cb3e2fb0 100644 --- a/include/seastar/core/internal/pollable_fd.hh +++ b/include/seastar/core/internal/pollable_fd.hh @@ -237,6 +237,7 @@ public: writeable_eventfd(writeable_eventfd&&) = default; readable_eventfd read_side(); void signal(size_t nr); + std::optional consume(); int get_read_fd() { return _fd.get(); } private: explicit writeable_eventfd(file_desc&& fd) : _fd(std::move(fd)) {} diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 83c792316cf..ae090b2a969 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -223,7 +223,7 @@ private: std::shared_ptr _smp; alien::instance& _alien; reactor_config _cfg; - file_desc _notify_eventfd; + writeable_eventfd _notify; file_desc _task_quota_timer; std::unique_ptr _backend; sigset_t _active_sigmask; // holds sigmask while sleeping with sig disabled diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 74fa943bb38..0182ead0e12 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1026,12 +1026,11 @@ reactor::reactor(std::shared_ptr smp, alien::instance& alien, unsi : _smp(std::move(smp)) , _alien(alien) , _cfg(std::move(cfg)) - , _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC)) , _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC)) , _id(id) , _cpu_stall_detector(internal::make_cpu_stall_detector()) , _cpu_sched(nullptr, 0) - , _thread_pool(std::make_unique(seastar::format("syscall-{}", id), _notify_eventfd)) { + , _thread_pool(std::make_unique(seastar::format("syscall-{}", id), _notify)) { /* * The _backend assignment is here, not on the initialization list as * the chosen backend constructor may want to handle signals and thus @@ -2953,10 +2952,7 @@ reactor::wakeup() { // We are free to clear it, because we're sending a signal now _sleeping.store(false, std::memory_order_relaxed); - - uint64_t one = 1; - auto res = ::write(_notify_eventfd.get(), &one, sizeof(one)); - SEASTAR_ASSERT(res == sizeof(one) && "write(2) failed on _reactor._notify_eventfd"); + _notify.signal(1); } void reactor::start_aio_eventfd_loop() { @@ -3748,6 +3744,11 @@ void writeable_eventfd::signal(size_t count) { SEASTAR_ASSERT(r == sizeof(c)); } +std::optional writeable_eventfd::consume() { + uint64_t c; + return _fd.read(&c, sizeof(c)); +} + writeable_eventfd readable_eventfd::write_side() { return writeable_eventfd(_fd.get_file_desc().dup()); } diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc index 7b386e68e3b..e2e41bdedff 100644 --- a/src/core/reactor_backend.cc +++ b/src/core/reactor_backend.cc @@ -390,9 +390,10 @@ task_quota_aio_completion::task_quota_aio_completion(file_desc& fd) : fd_kernel_completion(fd) , completion_with_iocb(fd.get(), POLLIN, this) {} -smp_wakeup_aio_completion::smp_wakeup_aio_completion(file_desc& fd) - : fd_kernel_completion(fd) - , completion_with_iocb(fd.get(), POLLIN, this) {} +smp_wakeup_aio_completion::smp_wakeup_aio_completion(writeable_eventfd& fd) + : completion_with_iocb(fd.get_read_fd(), POLLIN, this) + , _efd(fd) +{} void hrtimer_aio_completion::complete_with(ssize_t ret) { @@ -413,8 +414,7 @@ task_quota_aio_completion::complete_with(ssize_t ret) { void smp_wakeup_aio_completion::complete_with(ssize_t ret) { - uint64_t ignore = 0; - (void)_fd.read(&ignore, 8); + _efd.consume(); completion_with_iocb::completed(); } @@ -526,7 +526,7 @@ reactor_backend_aio::reactor_backend_aio(reactor& r) , _preempting_io(_r, _r._task_quota_timer, _hrtimer_timerfd) , _polling_io(_r._cfg.max_networking_aio_io_control_blocks) , _hrtimer_poll_completion(_r, _hrtimer_timerfd) - , _smp_wakeup_aio_completion(_r._notify_eventfd) + , _smp_wakeup_aio_completion(_r._notify) { // Protect against spurious wakeups - if we get notified that the timer has // expired when it really hasn't, we don't want to block in read(tfd, ...). @@ -727,7 +727,7 @@ reactor_backend_epoll::reactor_backend_epoll(reactor& r) ::epoll_event event; event.events = EPOLLIN; event.data.ptr = nullptr; - auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify_eventfd.get(), &event); + auto ret = ::epoll_ctl(_epollfd.get(), EPOLL_CTL_ADD, _r._notify.get_read_fd(), &event); throw_system_error_on(ret == -1); event.events = EPOLLIN; event.data.ptr = &_steady_clock_timer_reactor_thread; @@ -854,8 +854,7 @@ reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigm auto& evt = eevt[i]; auto pfd = reinterpret_cast(evt.data.ptr); if (!pfd) { - char dummy[8]; - _r._notify_eventfd.read(dummy, 8); + _r._notify.consume(); continue; } if (evt.data.ptr == &_steady_clock_timer_reactor_thread) { @@ -1227,13 +1226,13 @@ class reactor_backend_uring final : public reactor_backend { }; // eventfd and timerfd both need an 8-byte read after completion - class recurring_eventfd_or_timerfd_completion : public fd_kernel_completion { + class recurring_eventfd_or_timerfd_completion final : public kernel_completion { + writeable_eventfd& _efd; bool _armed = false; public: - explicit recurring_eventfd_or_timerfd_completion(file_desc& fd) : fd_kernel_completion(fd) {} + explicit recurring_eventfd_or_timerfd_completion(writeable_eventfd& efd) noexcept : _efd(efd) {} virtual void complete_with(ssize_t res) override { - char garbage[8]; - auto ret = _fd.read(garbage, 8); + auto ret = _efd.consume(); // Note: for hrtimer_completion we can have spurious wakeups, // since we wait for this using both _preempt_io_context and the // ring. So don't assert that we read anything. @@ -1433,7 +1432,7 @@ class reactor_backend_uring final : public reactor_backend { , _hrtimer_timerfd(make_timerfd()) , _preempt_io_context(_r, _r._task_quota_timer, _hrtimer_timerfd) , _hrtimer_completion(_r, _hrtimer_timerfd) - , _smp_wakeup_completion(_r._notify_eventfd) { + , _smp_wakeup_completion(_r._notify) { // Protect against spurious wakeups - if we get notified that the timer has // expired when it really hasn't, we don't want to block in read(tfd, ...). auto tfd = _r._task_quota_timer.get(); diff --git a/src/core/reactor_backend.hh b/src/core/reactor_backend.hh index 9c6d80f9473..c14df020c46 100644 --- a/src/core/reactor_backend.hh +++ b/src/core/reactor_backend.hh @@ -140,9 +140,12 @@ struct task_quota_aio_completion : public fd_kernel_completion, virtual void complete_with(ssize_t value) override; }; -struct smp_wakeup_aio_completion : public fd_kernel_completion, +struct smp_wakeup_aio_completion final : public kernel_completion, public completion_with_iocb { - smp_wakeup_aio_completion(file_desc& fd); +private: + writeable_eventfd& _efd; +public: + smp_wakeup_aio_completion(writeable_eventfd& fd); virtual void complete_with(ssize_t value) override; }; diff --git a/src/core/thread_pool.cc b/src/core/thread_pool.cc index 818cab826ee..3f8d6e624ab 100644 --- a/src/core/thread_pool.cc +++ b/src/core/thread_pool.cc @@ -39,7 +39,7 @@ module seastar; namespace seastar { -thread_pool::thread_pool(sstring name, file_desc& notify) : _notify_eventfd(notify), _worker_thread([this, name] { work(name); }) { +thread_pool::thread_pool(sstring name, writeable_eventfd& notify) : _notify(notify), _worker_thread([this, name] { work(name); }) { } void thread_pool::work(sstring name) { @@ -68,9 +68,7 @@ void thread_pool::work(sstring name) { // Prevent the following load of _main_thread_idle to be hoisted before the writes to _completed above. std::atomic_thread_fence(std::memory_order_seq_cst); if (_main_thread_idle.load(std::memory_order_relaxed)) { - uint64_t one = 1; - auto res = ::write(_notify_eventfd.get(), &one, 8); - SEASTAR_ASSERT(res == 8 && "write(2) failed on _reactor._notify_eventfd"); + _notify.signal(1); } } } diff --git a/src/core/thread_pool.hh b/src/core/thread_pool.hh index d6e49e72d61..c393da8f5b6 100644 --- a/src/core/thread_pool.hh +++ b/src/core/thread_pool.hh @@ -53,14 +53,14 @@ public: } // namespace internal class thread_pool { - file_desc& _notify_eventfd; + writeable_eventfd& _notify; internal::submit_metrics metrics; syscall_work_queue inter_thread_wq; posix_thread _worker_thread; std::atomic _stopped = { false }; std::atomic _main_thread_idle = { false }; public: - explicit thread_pool(sstring thread_name, file_desc& notify); + explicit thread_pool(sstring thread_name, writeable_eventfd& notify); ~thread_pool(); template future submit(internal::thread_pool_submit_reason reason, Func func) noexcept {