Skip to content

Commit

Permalink
introduce waker and wait_queue abstractions. fixes boostorg#251, boos…
Browse files Browse the repository at this point in the history
…torg#259

* remove context::wait_hook_ and context::twstatus in flavor to waker_epoch_ and waker class
* this avoids data races in case of wait_until() operations, when the context
  could be timeouted and rescheduled on the other OS thread. In this case could
  be data races with context::wait_hook_ and inconsistences context::twstatus
  states.
* using context::waker_epoch_ introduces mechanism when the old wakers become
  outdated and waker::wake() is just no op. This fixes data races explained in
  the previous point
* fibers waiting queue with timeouts and notification mechanisms are incapsulated into
  wait_queue class. This introduces simple abstraction level to be used in
  different synchronization primitives
  • Loading branch information
Dmitry Khominich committed Oct 14, 2020
1 parent e440623 commit f1ab9c7
Show file tree
Hide file tree
Showing 20 changed files with 311 additions and 680 deletions.
1 change: 1 addition & 0 deletions build/Jamfile.v2
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lib boost_fiber
condition_variable.cpp
context.cpp
fiber.cpp
waker.cpp
future.cpp
mutex.cpp
properties.cpp
Expand Down
286 changes: 23 additions & 263 deletions include/boost/fiber/buffered_channel.hpp

Large diffs are not rendered by default.

29 changes: 5 additions & 24 deletions include/boost/fiber/condition_variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -44,10 +45,8 @@ enum class cv_status {

class BOOST_FIBERS_DECL condition_variable_any {
private:
using wait_queue_t = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_t wait_queue_{};
wait_queue wait_queue_{};

public:
condition_variable_any() = default;
Expand All @@ -69,13 +68,9 @@ class BOOST_FIBERS_DECL condition_variable_any {
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
active_ctx->suspend( lk);
wait_queue_.suspend_and_wait( lk, active_ctx);

// relock external again before returning
try {
lt.lock();
Expand All @@ -86,8 +81,6 @@ class BOOST_FIBERS_DECL condition_variable_any {
} catch (...) {
std::terminate();
}
// post-conditions
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}

template< typename LockType, typename Pred >
Expand All @@ -105,20 +98,10 @@ class BOOST_FIBERS_DECL condition_variable_any {
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
if ( ! wait_queue_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
status = cv_status::timeout;
// relock local lk
lk.lock();
// remove from waiting-queue
wait_queue_.remove( * active_ctx);
// unlock local lk
lk.unlock();
}
// relock external again before returning
try {
Expand All @@ -130,8 +113,6 @@ class BOOST_FIBERS_DECL condition_variable_any {
} catch (...) {
std::terminate();
}
// post-conditions
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
return status;
}

Expand Down
14 changes: 12 additions & 2 deletions include/boost/fiber/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <boost/fiber/properties.hpp>
#include <boost/fiber/segmented_stack.hpp>
#include <boost/fiber/type.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand Down Expand Up @@ -178,12 +179,13 @@ class BOOST_FIBERS_DECL context {
public:
detail::wait_hook wait_hook_{};
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
std::atomic< std::intptr_t > twstatus{ 0 };
std::atomic<size_t> waker_epoch_{ 0 };
#endif
private:
scheduler * scheduler_{ nullptr };
fss_data_t fss_data_{};
detail::sleep_hook sleep_hook_{};
waker sleep_waker_{};
detail::ready_hook ready_hook_{};
detail::terminated_hook terminated_hook_{};
detail::worker_hook worker_hook_{};
Expand Down Expand Up @@ -305,7 +307,15 @@ class BOOST_FIBERS_DECL context {

bool wait_until( std::chrono::steady_clock::time_point const&) noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
detail::spinlock_lock &,
waker &&) noexcept;

bool wake(const size_t) noexcept;

waker create_waker() noexcept {
// this operation makes all previously created wakers to be outdated
return { this, ++waker_epoch_ };
}

void schedule( context *) noexcept;

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -33,10 +34,8 @@ class BOOST_FIBERS_DECL mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };

public:
Expand Down
2 changes: 0 additions & 2 deletions include/boost/fiber/operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ template< typename Clock, typename Duration >
void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) {
std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_);
fibers::context * active_ctx = fibers::context::active();
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
active_ctx->wait_until( sleep_time);
}

template< typename Rep, typename Period >
void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) {
fibers::context * active_ctx = fibers::context::active();
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
active_ctx->wait_until( std::chrono::steady_clock::now() + timeout_duration);
}

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/recursive_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -37,10 +38,8 @@ class BOOST_FIBERS_DECL recursive_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/recursive_timed_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -39,10 +40,8 @@ class BOOST_FIBERS_DECL recursive_timed_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };

Expand Down
4 changes: 3 additions & 1 deletion include/boost/fiber/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ class BOOST_FIBERS_DECL scheduler {

bool wait_until( context *,
std::chrono::steady_clock::time_point const&) noexcept;

bool wait_until( context *,
std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
detail::spinlock_lock &,
waker &&) noexcept;

void suspend() noexcept;
void suspend( detail::spinlock_lock &) noexcept;
Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/timed_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -35,10 +36,8 @@ class BOOST_FIBERS_DECL timed_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };

bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept;
Expand Down
Loading

0 comments on commit f1ab9c7

Please sign in to comment.