diff --git a/common/rcuptr.h b/common/rcuptr.h new file mode 100644 index 00000000..9f1ee7b1 --- /dev/null +++ b/common/rcuptr.h @@ -0,0 +1,166 @@ +#include +#include +#include +#include +#include + +#include + +/*** +RCUPtr + +Make a RCU protected pointer. + +Reader of this pointer should call quiescent() after read done. + +Writer of this pointer should use update(T*) to make object update, then +* use `synchronize(domain)` to wait all readers finished reading old object then +delete it, or +* use `rcu_call(old)` to create a standalone photon thread waiting others finish +reading then clear it. + +CAUTION: RCUPtr is a mechanism to protect high concurrent read with a little +write. This implemention is based on userspace qsbr RCU, but basic interface +likes kernel RCU, but with a few limitations. Photon thread will be take as +Reader and Writer instead of system thread. + +If you dont know how to use it, or not sure if you need it, just ignore it. + +***/ + +struct RCUDomain; + +// photon thread local counter +struct RCUReader : public intrusive_list_node { + std::atomic ctr; + RCUDomain *domain; + + explicit RCUReader(RCUDomain *domain); + ~RCUReader(); + + uint64_t get_gp() { return ctr.load(std::memory_order_acquire); } + + void quiescent(); + + void offline() { ctr.store(0, std::memory_order_release); } + + void online() { quiescent(); } +}; + +// Global gp counter +struct RCUDomain { + std::atomic ctr; + intrusive_list tlist; + photon::mutex tlock; + photon::thread_local_ptr reader; + + RCUDomain() : ctr(0), tlock(), reader(this) {} + + ~RCUDomain() { tlist.delete_all(); } + + uint64_t get_gp() { return ctr.load(std::memory_order_acquire); } + + void register_reader(RCUReader *x) { + SCOPED_LOCK(tlock); + tlist.push_back(x); + } + + void unregister_reader(RCUReader *x) { + SCOPED_LOCK(tlock); + tlist.pop(x); + } + + void online_cuurent() { reader->online(); } + + void offline_current() { reader->offline(); } + + void __do_synchronize(uint64_t ts) { + for (;;) { + { + reader->quiescent(); + SCOPED_LOCK(tlock); + auto all_release = true; + auto self = &*reader; + for (auto t : tlist) { + if (t == self) continue; + auto x = reader->get_gp(); + if (x && (x < ts)) { + all_release = false; + break; + } + } + if (all_release) return; + } + photon::thread_yield(); + } + } + + void synchronize() { + auto x = ctr.fetch_add(1, std::memory_order_acq_rel); + __do_synchronize(x); + } + + template + void call_rcu(T *data, Delegate func) { + auto x = ctr.fetch_add(1, std::memory_order_acq_rel); + photon::thread_create11([x, data, func, this]() { + __do_synchronize(x); + func(data); + }); + } +}; + +inline RCUReader::RCUReader(RCUDomain *domain) : ctr(0), domain(domain) { + domain->register_reader(this); +} + +inline RCUReader::~RCUReader() { domain->unregister_reader(this); } + +inline void RCUReader::quiescent() { + ctr.store(domain->get_gp(), std::memory_order_release); +} + +inline RCUDomain *global_rcu_domain() { + static RCUDomain domain; + return &domain; +} + +template +struct RCUPtr { + std::atomic ptr; + + RCUPtr() : ptr(nullptr) {} + ~RCUPtr() { assert(ptr.load() == nullptr); } + + static void __default_deleter(void *, T *x) { delete x; } + + // read_lock/unlock is just a non-action + void read_lock(RCUDomain *domain = nullptr) {} + void read_unlock(RCUDomain *domain = nullptr) {} + + // Reader should call quiescent in a few rounds of read + // and make sure all readers finished reading old object when calling it. + void quiescent(RCUDomain *domain = nullptr) { + if (!domain) domain = global_rcu_domain(); + domain->reader->quiescent(); + } + // A atomic update for writer + T *update(T *new_ptr, RCUDomain *domain = nullptr) { + return ptr.exchange(new_ptr, std::memory_order_acq_rel); + } + // synchronize with domain, wait for grace period + void synchronize(RCUDomain *domain = nullptr) { + if (!domain) domain = global_rcu_domain(); + domain->synchronize(); + } + // async wait for grace period + void rcu_call(T *old, RCUDomain *domain = nullptr, + Delegate func = {nullptr, + &RCUPtr::__default_deleter}) { + if (!domain) domain = global_rcu_domain(); + domain->call_rcu(old, func); + } + T *dereference() const { return ptr.load(std::memory_order_acquire); } + T &operator*() const { return *dereference(); } + T *operator->() const { return &*dereference(); } +}; diff --git a/common/test/CMakeLists.txt b/common/test/CMakeLists.txt index a4febc05..71871cc0 100644 --- a/common/test/CMakeLists.txt +++ b/common/test/CMakeLists.txt @@ -28,3 +28,6 @@ add_test(NAME test-lockfree COMMAND $) add_executable(test-alog test_alog.cpp x.cpp) target_link_libraries(test-alog PRIVATE photon_shared) add_test(NAME test-alog COMMAND $) + +add_executable(perf-rcuptr perf_rcuptr.cpp) +target_link_libraries(perf-rcuptr PRIVATE photon_shared) diff --git a/common/test/perf_rcuptr.cpp b/common/test/perf_rcuptr.cpp new file mode 100644 index 00000000..ea791564 --- /dev/null +++ b/common/test/perf_rcuptr.cpp @@ -0,0 +1,127 @@ +#include +#include +#include +#include + +#include +#include +#include + +struct IntObj { + int val; + IntObj(int val) : val(val) {} + ~IntObj() {} +}; + +int main() { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE, + {.libaio_queue_depth = 32, + .use_pooled_stack_allocator = true, + .bypass_threadpool = true}); + DEFER(photon::fini()); + photon::semaphore sem(0); + std::atomic cntr(1); + { + RCUPtr ptr; + // ptr.write_lock(); + auto n = new IntObj(cntr.fetch_add(1, std::memory_order_acq_rel)); + auto old = ptr.update(n); + // ptr.write_unlock(); + ptr.synchronize(); + delete old; + std::vector ths; + for (int th = 0; th < 10; th++) { + ths.emplace_back([&] { + photon::vcpu_init(); + DEFER(photon::vcpu_fini()); + for (int i = 0; i < 100; i++) { + photon::thread_create11([&cntr, &ptr, &sem]() { + for (int j = 0; j < 1000; j++) { + auto x = rand() % 10; + if (x) { // 99% reader + // ptr.read_lock(); + auto x = ptr->val; + (void)x; + ptr.quiescent(); + // ptr.read_unlock(); + photon::thread_yield(); + } else { // 1% writer + // ptr.write_lock(); + auto n = new IntObj(cntr.fetch_add( + 1, std::memory_order_acq_rel)); + auto old = ptr.update(n); + // ptr.write_unlock(); + // ptr.rcu_call(old); + ptr.synchronize(); + delete old; + photon::thread_yield(); + } + } + sem.signal(1); + }); + } + }); + } + + auto start = std::chrono::high_resolution_clock::now(); + sem.wait(1000); + auto done = std::chrono::high_resolution_clock::now(); + + for (auto &x : ths) x.join(); + + // ptr.write_lock(); + auto last = ptr.update(nullptr); + // ptr.write_unlock(); + ptr.synchronize(); + delete last; + + LOG_INFO(VALUE( + std::chrono::duration_cast(done - start) + .count())); + } + LOG_INFO("!!!!!!!!!!"); + { + photon::rwlock rwlock; + std::atomic ptr( + new IntObj(cntr.fetch_add(1, std::memory_order_acq_rel))); + std::vector ths; + for (int th = 0; th < 10; th++) { + ths.emplace_back([&] { + photon::vcpu_init(); + DEFER(photon::vcpu_fini()); + for (int i = 0; i < 100; i++) { + photon::thread_create11([&cntr, &ptr, &sem, &rwlock]() { + for (int j = 0; j < 1000; j++) { + auto x = rand() % 10; + if (x) { // 90% reader + photon::scoped_rwlock _(rwlock, photon::RLOCK); + (void)ptr.load()->val; + } else { // 10% writer + photon::scoped_rwlock _(rwlock, photon::WLOCK); + auto n = new IntObj(cntr.fetch_add( + 1, std::memory_order_acq_rel)); + auto old = ptr.exchange(n); + delete old; + } + } + sem.signal(1); + }); + } + }); + } + + auto start = std::chrono::high_resolution_clock::now(); + sem.wait(1000); + auto done = std::chrono::high_resolution_clock::now(); + + for (auto &x : ths) x.join(); + + delete ptr; + + LOG_INFO(VALUE( + std::chrono::duration_cast(done - start) + .count())); + } + + return 0; +} diff --git a/include/photon/common/rcuptr.h b/include/photon/common/rcuptr.h new file mode 120000 index 00000000..97297609 --- /dev/null +++ b/include/photon/common/rcuptr.h @@ -0,0 +1 @@ +../../../common/rcuptr.h \ No newline at end of file