Skip to content

Commit

Permalink
Merge pull request #531 from Coldwings/objcache_v2
Browse files Browse the repository at this point in the history
Some modify and fix in ObjectCacheV2
  • Loading branch information
lihuiba authored Aug 8, 2024
2 parents 277231c + 9cf989c commit 33e3251
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 131 deletions.
2 changes: 1 addition & 1 deletion common/enumerable.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct Enumerable
this->obj = nullptr;
}
using R = typename std::result_of<decltype(&T::get)(T)>::type;
R operator*() { return obj ? obj->get() : nullptr; }
R operator*() { return obj ? obj->get() : R{}; }
bool operator==(const iterator& rhs) const { return obj == rhs.obj; }
bool operator!=(const iterator& rhs) const { return !(*this == rhs); }
iterator& operator++()
Expand Down
249 changes: 122 additions & 127 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <memory>
#include <unordered_set>
#include <vector>

/**
Expand All @@ -36,154 +37,156 @@ class ObjectCacheV2 {
struct Box : public intrusive_list_node<Box> {
const K key;
std::shared_ptr<V> ref;
photon::spinlock boxlock;
// protect ref with mutex
// If multiple threads of execution access the same shared_ptr object
// without synchronization and any of those accesses uses a non-const
// member function of shared_ptr then a data race will occur
// DTOR of V may cause photon thread yield.
photon::mutex reflock;
// prevent create multiple time when borrow
photon::spinlock createlock;
// create timestamp, for cool-down of borrow
uint64_t lastcreate = 0;
// reclaim timestamp
uint64_t timestamp = 0;

struct Updater {
Box* box = nullptr;

~Updater() {}

// Returned RefPtr is the old one
// other reader will get new one after updated

std::shared_ptr<V> update(std::shared_ptr<V>& r, uint64_t ts = 0) {
box->lastcreate = ts;
r = std::atomic_exchange(&box->ref, r);
return r;
}
std::shared_ptr<V> update(V* val, uint64_t ts = 0,
std::shared_ptr<V>* newptr = nullptr) {
auto r = std::shared_ptr<V>(val);
if (newptr) *newptr = r;
return update(r, ts);
}

explicit Updater(Box* b) : box(b) {}

Updater(const Updater&) = delete;
Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); }
Updater& operator=(const Updater&) = delete;
Updater& operator=(Updater&& rhs) {
std::swap(box, rhs.box);
return *this;
}
operator bool() const { return box != nullptr; }
};
// Box reference count
std::atomic<uint64_t> rc{0};

Box() = default;
template <typename KeyType>
explicit Box(KeyType&& key)
: key(std::forward<KeyType>(key)), ref(nullptr) {}

~Box() {}
std::shared_ptr<V> update(std::shared_ptr<V> r, uint64_t ts = 0) {
SCOPED_LOCK(reflock);
lastcreate = ts;
std::swap(r, ref);
return r;
}
std::shared_ptr<V> reset(uint64_t ts = 0) {
return update({nullptr}, ts);
}
std::shared_ptr<V> reader() { return ref; }

Updater writer() { return Updater(this); }
std::shared_ptr<V> reader() { return std::atomic_load(&ref); }
void acquire() {
timestamp = photon::now;
rc.fetch_add(1, std::memory_order_relaxed);
}

void release() {
timestamp = photon::now;
// release reference should use stronger order
rc.fetch_sub(1, std::memory_order_seq_cst);
}
};
struct ItemHash {
// Hash and Equal for Box, for unordered_set
// simply use hash/equal of key
struct BoxHash {
size_t operator()(const Box& x) const { return std::hash<K>()(x.key); }
};
struct ItemEqual {
struct BoxEqual {
bool operator()(const Box& a, const Box& b) const {
return a.key == b.key;
}
};

photon::thread* reclaimer = nullptr;
photon::common::RingChannel<LockfreeMPMCRingQueue<std::shared_ptr<V>, 4096>>
reclaim_queue;
// protect object cache map
photon::spinlock maplock;
std::unordered_set<Box, ItemHash, ItemEqual> map;
intrusive_list<Box> cycle_list;
// protect lru list
std::unordered_set<Box, BoxHash, BoxEqual> map;
intrusive_list<Box> lru_list;
uint64_t lifespan;
photon::Timer _timer;
bool _exit = false;

std::shared_ptr<V> __update(Box& item, std::shared_ptr<V> val) {
auto ret_val = val;
auto old_val = item.writer().update(val, photon::now);
reclaim_queue.template send<PhotonPause>(old_val);
return ret_val;
}

std::shared_ptr<V> __update(Box& item, V* val) {
std::shared_ptr<V> ptr(val);
return __update(item, ptr);
}

template <typename KeyType>
Box& __find_or_create_item(KeyType&& key) {
Box keyitem(key);
Box* item = nullptr;
Box& __find_or_create_box(KeyType&& key) {
Box keybox(key);
Box* box = nullptr;
SCOPED_LOCK(maplock);
auto it = map.find(keyitem);
auto it = map.find(keybox);
if (it == map.end()) {
// item = new Box(std::forward<KeyType>(key));
auto rt = map.emplace(key);
if (rt.second) item = (Box*)&*rt.first;
auto rt = map.emplace(std::forward<KeyType>(key));
if (rt.second) box = (Box*)&*rt.first;
} else
item = (Box*)&*it;
assert(item);
item->timestamp = photon::now;
cycle_list.pop(item);
cycle_list.push_back(item);
return *item;
box = (Box*)&*it;
assert(box);
lru_list.pop(box);
box->acquire();
return *box;
}

uint64_t __expire() {
intrusive_list<Box> delete_list;
#if __cplusplus < 201703L
std::vector<std::shared_ptr<V>> to_release;
#else
intrusive_list<Box> to_release;
#endif
uint64_t now = photon::now;
uint64_t reclaim_before = photon::sat_sub(now, lifespan);
{
SCOPED_LOCK(maplock);
if (cycle_list.empty()) return 0;
if (photon::sat_add(cycle_list.front()->timestamp, lifespan) >
now) {
return photon::sat_add(cycle_list.front()->timestamp,
lifespan) -
now;
if (lru_list.empty()) return 0;
if (lru_list.front()->timestamp > reclaim_before) {
return lru_list.front()->timestamp - reclaim_before;
}
auto x = cycle_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
cycle_list.pop(x);
__update(*x, nullptr);
map.erase(*x);
x = cycle_list.front();
auto x = lru_list.front();
// here requires lock dance for lru_list and kv
while (x && x->timestamp < reclaim_before) {
lru_list.pop(x);
if (x->rc == 0) {
// make vector holds those shared_ptr
// prevent object destroy in critical zone
#if __cplusplus < 201703L
to_release.push_back(x->reset());
map.erase(*x);
#else
// Here is a hacked implementation
auto node = map.extract(*x);
to_release.push_back(&node.value());
/// node_handle should be just key&val pointer
// Force make it empty can prevent destroy object
// when node_handle goes to destroy
memset((void*)&node, 0, sizeof(node));
#endif
}
x = lru_list.front();
}
}
#if __cplusplus < 201703L
to_release.clear();
#else
to_release.delete_all();
#endif
return 0;
}

public:
struct Borrow {
ObjectCacheV2* _oc = nullptr;
Box* _item = nullptr;
Box* _box = nullptr;
std::shared_ptr<V> _reader;
bool _recycle = false;

Borrow() : _reader(nullptr) {}

Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr<V>&& reader)
: _oc(oc),
_item(item),
_reader(std::move(reader)),
_recycle(false) {}
Borrow(ObjectCacheV2* oc, Box* box, const std::shared_ptr<V>& reader)
: _oc(oc), _box(box), _reader(reader), _recycle(false) {
_box->acquire();
}

Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); }

Borrow& operator=(Borrow&& rhs) {
std::swap(_oc, rhs._oc);
std::swap(_item, rhs._item);
_reader = std::atomic_exchange(&rhs._reader, _reader);
std::swap(_reader, rhs._reader);
std::swap(_recycle, rhs._recycle);
return *this;
}
Borrow& operator=(Borrow&& rhs) = default;

~Borrow() {
if (_recycle) {
_oc->__update(*_item, nullptr);
_box->reset();
}
_box->release();
if (_box->rc == 0) {
SCOPED_LOCK(_oc->maplock);
_oc->lru_list.pop(_box);
_oc->lru_list.push_back(_box);
}
}

Expand All @@ -198,24 +201,27 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) {
auto& item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item.reader();
auto& box = __find_or_create_box(std::forward<KeyType>(key));
DEFER(box.release());
std::shared_ptr<V> r{};
while (!r) {
if (item.boxlock.try_lock() == 0) {
DEFER(item.boxlock.unlock());
r = item.reader();
if (box.createlock.try_lock() == 0) {
DEFER(box.createlock.unlock());
r = box.reader();
if (!r) {
if (photon::sat_add(item.lastcreate, cooldown) <=
if (photon::sat_add(box.lastcreate, cooldown) <=
photon::now) {
r = __update(item, ctor());
auto r = std::shared_ptr<V>(ctor());
box.update(r, photon::now);
return Borrow(this, &box, r);
}
return Borrow(this, &item, std::move(r));
return Borrow(this, &box, r);
}
}
photon::thread_yield();
r = item.reader();
r = box.reader();
}
return Borrow(this, &item, std::move(r));
return Borrow(this, &box, r);
}

template <typename KeyType>
Expand All @@ -226,33 +232,22 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow update(KeyType&& key, Ctor&& ctor) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = __update(item, ctor());
return Borrow(this, item, std::move(r));
auto& box = __find_or_create_box(std::forward<KeyType>(key));
DEFER(box.release());
auto r = std::shared_ptr<V>(ctor());
box.update(r, photon::now);
return Borrow(this, &box, r);
}

ObjectCacheV2(uint64_t lifespan)
: lifespan(lifespan),
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true,
photon::DEFAULT_STACK_SIZE),
_exit(false) {
reclaimer = photon::thread_create11([this] {
while (!_exit) {
reclaim_queue.recv();
}
});
photon::thread_enable_join(reclaimer);
}
photon::DEFAULT_STACK_SIZE) {}

~ObjectCacheV2() {
_timer.stop();
if (reclaimer) {
_exit = true;
reclaim_queue.template send<PhotonPause>(nullptr);
photon::thread_join((photon::join_handle*)reclaimer);
reclaimer = nullptr;
}
SCOPED_LOCK(maplock);
cycle_list.node = nullptr;
// Should be no other access during dtor.
// modify lru_list do not need a lock.
lru_list.node = nullptr;
}
};
4 changes: 2 additions & 2 deletions common/ring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ ssize_t RingBuffer::do_read(void *buf, size_t count)
ssize_t RingBuffer::readv(const struct iovec *iov, int iovcnt)
{
ssize_t size = 0;
scoped_lock lock(m_read_lock);
photon::scoped_lock lock(m_read_lock);
for (auto& x: ptr_array(iov, iovcnt))
{
auto ret = do_read(x.iov_base, x.iov_len);
Expand Down Expand Up @@ -110,7 +110,7 @@ ssize_t RingBuffer::do_write(const void* buf, size_t count)
ssize_t RingBuffer::writev(const struct iovec *iov, int iovcnt)
{
ssize_t size = 0;
scoped_lock lock(m_write_lock);
photon::scoped_lock lock(m_write_lock);
for (auto& x: ptr_array(iov, iovcnt))
{
auto ret = do_write(x.iov_base, x.iov_len);
Expand Down
1 change: 1 addition & 0 deletions common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ target_link_libraries(test-objcache PRIVATE photon_shared)
add_test(NAME test-objcache COMMAND $<TARGET_FILE:test-objcache>)

add_executable(perf-objcache perf_objcache.cpp)
set_target_properties(perf-objcache PROPERTIES CXX_STANDARD 17)
target_link_libraries(perf-objcache PRIVATE photon_shared)
add_test(NAME perf-objcache COMMAND $<TARGET_FILE:test-objcache>)

Expand Down
2 changes: 1 addition & 1 deletion common/test/perf_objcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void *task(void *arg) {
for (const auto &x : k) {
auto strx = std::to_string(x);
auto b = oc->borrow(strx, [&strx] {
// photon::thread_usleep(1 * 1000);
photon::thread_usleep(1 * 1000);
// LOG_INFO("CTOR `", photon::now);
return new std::string(strx);
});
Expand Down

0 comments on commit 33e3251

Please sign in to comment.