Skip to content

Commit

Permalink
RCUPtr for photon thread environment
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Jul 18, 2024
1 parent ac06081 commit 95901ce
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 0 deletions.
166 changes: 166 additions & 0 deletions common/rcuptr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#include <assert.h>
#include <photon/common/callback.h>
#include <photon/thread/list.h>
#include <photon/thread/thread-local.h>
#include <photon/thread/thread11.h>

#include <atomic>

/***
RCUPtr<T>
Make a RCU protected pointer.
Reader of this pointer should call quiescent() after read done.
Writer of this pointer should use update(T*) to make object update, then
* use `synchronize(domain)` to wait all readers finished reading old object then
delete it, or
* use `rcu_call(old)` to create a standalone photon thread waiting others finish
reading then clear it.
CAUTION: RCUPtr is a mechanism to protect high concurrent read with a little
write. This implemention is based on userspace qsbr RCU, but basic interface
likes kernel RCU, but with a few limitations. Photon thread will be take as
Reader and Writer instead of system thread.
If you dont know how to use it, or not sure if you need it, just ignore it.
***/

struct RCUDomain;

// photon thread local counter
struct RCUReader : public intrusive_list_node<RCUReader> {
std::atomic<uint64_t> ctr;
RCUDomain *domain;

explicit RCUReader(RCUDomain *domain);
~RCUReader();

uint64_t get_gp() { return ctr.load(std::memory_order_acquire); }

void quiescent();

void offline() { ctr.store(0, std::memory_order_release); }

void online() { quiescent(); }
};

// Global gp counter
struct RCUDomain {
std::atomic<uint64_t> ctr;
intrusive_list<RCUReader> tlist;
photon::mutex tlock;
photon::thread_local_ptr<RCUReader, RCUDomain *> reader;

RCUDomain() : ctr(0), tlock(), reader(this) {}

~RCUDomain() { tlist.delete_all(); }

uint64_t get_gp() { return ctr.load(std::memory_order_acquire); }

void register_reader(RCUReader *x) {
SCOPED_LOCK(tlock);
tlist.push_back(x);
}

void unregister_reader(RCUReader *x) {
SCOPED_LOCK(tlock);
tlist.pop(x);
}

void online_cuurent() { reader->online(); }

void offline_current() { reader->offline(); }

void __do_synchronize(uint64_t ts) {
for (;;) {
{
reader->quiescent();
SCOPED_LOCK(tlock);
auto all_release = true;
auto self = &*reader;
for (auto t : tlist) {
if (t == self) continue;
auto x = reader->get_gp();
if (x && (x < ts)) {
all_release = false;
break;
}
}
if (all_release) return;
}
photon::thread_yield();
}
}

void synchronize() {
auto x = ctr.fetch_add(1, std::memory_order_acq_rel);
__do_synchronize(x);
}

template <typename T>
void call_rcu(T *data, Delegate<void, T *> func) {
auto x = ctr.fetch_add(1, std::memory_order_acq_rel);
photon::thread_create11([x, data, func, this]() {
__do_synchronize(x);
func(data);
});
}
};

inline RCUReader::RCUReader(RCUDomain *domain) : ctr(0), domain(domain) {
domain->register_reader(this);
}

inline RCUReader::~RCUReader() { domain->unregister_reader(this); }

inline void RCUReader::quiescent() {
ctr.store(domain->get_gp(), std::memory_order_release);
}

inline RCUDomain *global_rcu_domain() {
static RCUDomain domain;
return &domain;
}

template <typename T>
struct RCUPtr {
std::atomic<T *> ptr;

RCUPtr() : ptr(nullptr) {}
~RCUPtr() { assert(ptr.load() == nullptr); }

static void __default_deleter(void *, T *x) { delete x; }

// read_lock/unlock is just a non-action
void read_lock(RCUDomain *domain = nullptr) {}
void read_unlock(RCUDomain *domain = nullptr) {}

// Reader should call quiescent in a few rounds of read
// and make sure all readers finished reading old object when calling it.
void quiescent(RCUDomain *domain = nullptr) {
if (!domain) domain = global_rcu_domain();
domain->reader->quiescent();
}
// A atomic update for writer
T *update(T *new_ptr, RCUDomain *domain = nullptr) {
return ptr.exchange(new_ptr, std::memory_order_acq_rel);
}
// synchronize with domain, wait for grace period
void synchronize(RCUDomain *domain = nullptr) {
if (!domain) domain = global_rcu_domain();
domain->synchronize();
}
// async wait for grace period
void rcu_call(T *old, RCUDomain *domain = nullptr,
Delegate<void, T *> func = {nullptr,
&RCUPtr::__default_deleter}) {
if (!domain) domain = global_rcu_domain();
domain->call_rcu(old, func);
}
T *dereference() const { return ptr.load(std::memory_order_acquire); }
T &operator*() const { return *dereference(); }
T *operator->() const { return &*dereference(); }
};
3 changes: 3 additions & 0 deletions common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ add_test(NAME test-lockfree COMMAND $<TARGET_FILE:test-lockfree>)
add_executable(test-alog test_alog.cpp x.cpp)
target_link_libraries(test-alog PRIVATE photon_shared)
add_test(NAME test-alog COMMAND $<TARGET_FILE:test-alog>)

add_executable(perf-rcuptr perf_rcuptr.cpp)
target_link_libraries(perf-rcuptr PRIVATE photon_shared)
127 changes: 127 additions & 0 deletions common/test/perf_rcuptr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#include <photon/common/alog.h>
#include <photon/common/rcuptr.h>
#include <photon/photon.h>
#include <photon/thread/thread11.h>

#include <chrono>
#include <thread>
#include <vector>

struct IntObj {
int val;
IntObj(int val) : val(val) {}
~IntObj() {}
};

int main() {
photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE,
{.libaio_queue_depth = 32,
.use_pooled_stack_allocator = true,
.bypass_threadpool = true});
DEFER(photon::fini());
photon::semaphore sem(0);
std::atomic<int> cntr(1);
{
RCUPtr<IntObj> ptr;
// ptr.write_lock();
auto n = new IntObj(cntr.fetch_add(1, std::memory_order_acq_rel));
auto old = ptr.update(n);
// ptr.write_unlock();
ptr.synchronize();
delete old;
std::vector<std::thread> ths;
for (int th = 0; th < 10; th++) {
ths.emplace_back([&] {
photon::vcpu_init();
DEFER(photon::vcpu_fini());
for (int i = 0; i < 100; i++) {
photon::thread_create11([&cntr, &ptr, &sem]() {
for (int j = 0; j < 1000; j++) {
auto x = rand() % 10;
if (x) { // 99% reader
// ptr.read_lock();
auto x = ptr->val;
(void)x;
ptr.quiescent();
// ptr.read_unlock();
photon::thread_yield();
} else { // 1% writer
// ptr.write_lock();
auto n = new IntObj(cntr.fetch_add(
1, std::memory_order_acq_rel));
auto old = ptr.update(n);
// ptr.write_unlock();
// ptr.rcu_call(old);
ptr.synchronize();
delete old;
photon::thread_yield();
}
}
sem.signal(1);
});
}
});
}

auto start = std::chrono::high_resolution_clock::now();
sem.wait(1000);
auto done = std::chrono::high_resolution_clock::now();

for (auto &x : ths) x.join();

// ptr.write_lock();
auto last = ptr.update(nullptr);
// ptr.write_unlock();
ptr.synchronize();
delete last;

LOG_INFO(VALUE(
std::chrono::duration_cast<std::chrono::microseconds>(done - start)
.count()));
}
LOG_INFO("!!!!!!!!!!");
{
photon::rwlock rwlock;
std::atomic<IntObj *> ptr(
new IntObj(cntr.fetch_add(1, std::memory_order_acq_rel)));
std::vector<std::thread> ths;
for (int th = 0; th < 10; th++) {
ths.emplace_back([&] {
photon::vcpu_init();
DEFER(photon::vcpu_fini());
for (int i = 0; i < 100; i++) {
photon::thread_create11([&cntr, &ptr, &sem, &rwlock]() {
for (int j = 0; j < 1000; j++) {
auto x = rand() % 10;
if (x) { // 90% reader
photon::scoped_rwlock _(rwlock, photon::RLOCK);
(void)ptr.load()->val;
} else { // 10% writer
photon::scoped_rwlock _(rwlock, photon::WLOCK);
auto n = new IntObj(cntr.fetch_add(
1, std::memory_order_acq_rel));
auto old = ptr.exchange(n);
delete old;
}
}
sem.signal(1);
});
}
});
}

auto start = std::chrono::high_resolution_clock::now();
sem.wait(1000);
auto done = std::chrono::high_resolution_clock::now();

for (auto &x : ths) x.join();

delete ptr;

LOG_INFO(VALUE(
std::chrono::duration_cast<std::chrono::microseconds>(done - start)
.count()));
}

return 0;
}
1 change: 1 addition & 0 deletions include/photon/common/rcuptr.h

0 comments on commit 95901ce

Please sign in to comment.