diff --git a/common/enumerable.h b/common/enumerable.h index 70f25a26..ab86c4e7 100644 --- a/common/enumerable.h +++ b/common/enumerable.h @@ -42,7 +42,7 @@ struct Enumerable this->obj = nullptr; } using R = typename std::result_of::type; - R operator*() { return obj ? obj->get() : nullptr; } + R operator*() { return obj ? obj->get() : R{}; } bool operator==(const iterator& rhs) const { return obj == rhs.obj; } bool operator!=(const iterator& rhs) const { return !(*this == rhs); } iterator& operator++() diff --git a/common/objectcachev2.h b/common/objectcachev2.h index 0085e3f9..6c1886ed 100644 --- a/common/objectcachev2.h +++ b/common/objectcachev2.h @@ -10,6 +10,7 @@ #include #include +#include /** @@ -36,154 +37,156 @@ class ObjectCacheV2 { struct Box : public intrusive_list_node { const K key; std::shared_ptr ref; - photon::spinlock boxlock; + // protect ref with mutex + // If multiple threads of execution access the same shared_ptr object + // without synchronization and any of those accesses uses a non-const + // member function of shared_ptr then a data race will occur + // DTOR of V may cause photon thread yield. + photon::mutex reflock; + // prevent create multiple time when borrow + photon::spinlock createlock; + // create timestamp, for cool-down of borrow uint64_t lastcreate = 0; + // reclaim timestamp uint64_t timestamp = 0; - - struct Updater { - Box* box = nullptr; - - ~Updater() {} - - // Returned RefPtr is the old one - // other reader will get new one after updated - - std::shared_ptr update(std::shared_ptr& r, uint64_t ts = 0) { - box->lastcreate = ts; - r = std::atomic_exchange(&box->ref, r); - return r; - } - std::shared_ptr update(V* val, uint64_t ts = 0, - std::shared_ptr* newptr = nullptr) { - auto r = std::shared_ptr(val); - if (newptr) *newptr = r; - return update(r, ts); - } - - explicit Updater(Box* b) : box(b) {} - - Updater(const Updater&) = delete; - Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); } - Updater& operator=(const Updater&) = delete; - Updater& operator=(Updater&& rhs) { - std::swap(box, rhs.box); - return *this; - } - operator bool() const { return box != nullptr; } - }; + // Box reference count + std::atomic rc{0}; Box() = default; template explicit Box(KeyType&& key) : key(std::forward(key)), ref(nullptr) {} - ~Box() {} + std::shared_ptr update(std::shared_ptr r, uint64_t ts = 0) { + SCOPED_LOCK(reflock); + lastcreate = ts; + std::swap(r, ref); + return r; + } + std::shared_ptr reset(uint64_t ts = 0) { + return update({nullptr}, ts); + } + std::shared_ptr reader() { return ref; } - Updater writer() { return Updater(this); } - std::shared_ptr reader() { return std::atomic_load(&ref); } + void acquire() { + timestamp = photon::now; + rc.fetch_add(1, std::memory_order_relaxed); + } + + void release() { + timestamp = photon::now; + // release reference should use stronger order + rc.fetch_sub(1, std::memory_order_seq_cst); + } }; - struct ItemHash { + // Hash and Equal for Box, for unordered_set + // simply use hash/equal of key + struct BoxHash { size_t operator()(const Box& x) const { return std::hash()(x.key); } }; - struct ItemEqual { + struct BoxEqual { bool operator()(const Box& a, const Box& b) const { return a.key == b.key; } }; - photon::thread* reclaimer = nullptr; - photon::common::RingChannel, 4096>> - reclaim_queue; + // protect object cache map photon::spinlock maplock; - std::unordered_set map; - intrusive_list cycle_list; + // protect lru list + std::unordered_set map; + intrusive_list lru_list; uint64_t lifespan; photon::Timer _timer; bool _exit = false; - std::shared_ptr __update(Box& item, std::shared_ptr val) { - auto ret_val = val; - auto old_val = item.writer().update(val, photon::now); - reclaim_queue.template send(old_val); - return ret_val; - } - - std::shared_ptr __update(Box& item, V* val) { - std::shared_ptr ptr(val); - return __update(item, ptr); - } - template - Box& __find_or_create_item(KeyType&& key) { - Box keyitem(key); - Box* item = nullptr; + Box& __find_or_create_box(KeyType&& key) { + Box keybox(key); + Box* box = nullptr; SCOPED_LOCK(maplock); - auto it = map.find(keyitem); + auto it = map.find(keybox); if (it == map.end()) { - // item = new Box(std::forward(key)); - auto rt = map.emplace(key); - if (rt.second) item = (Box*)&*rt.first; + auto rt = map.emplace(std::forward(key)); + if (rt.second) box = (Box*)&*rt.first; } else - item = (Box*)&*it; - assert(item); - item->timestamp = photon::now; - cycle_list.pop(item); - cycle_list.push_back(item); - return *item; + box = (Box*)&*it; + assert(box); + lru_list.pop(box); + box->acquire(); + return *box; } - uint64_t __expire() { - intrusive_list delete_list; +#if __cplusplus < 201703L + std::vector> to_release; +#else + intrusive_list to_release; +#endif uint64_t now = photon::now; + uint64_t reclaim_before = photon::sat_sub(now, lifespan); { SCOPED_LOCK(maplock); - if (cycle_list.empty()) return 0; - if (photon::sat_add(cycle_list.front()->timestamp, lifespan) > - now) { - return photon::sat_add(cycle_list.front()->timestamp, - lifespan) - - now; + if (lru_list.empty()) return 0; + if (lru_list.front()->timestamp > reclaim_before) { + return lru_list.front()->timestamp - reclaim_before; } - auto x = cycle_list.front(); - while (x && (photon::sat_add(x->timestamp, lifespan) < now)) { - cycle_list.pop(x); - __update(*x, nullptr); - map.erase(*x); - x = cycle_list.front(); + auto x = lru_list.front(); + // here requires lock dance for lru_list and kv + while (x && x->timestamp < reclaim_before) { + lru_list.pop(x); + if (x->rc == 0) { + // make vector holds those shared_ptr + // prevent object destroy in critical zone +#if __cplusplus < 201703L + to_release.push_back(x->reset()); + map.erase(*x); +#else + // Here is a hacked implementation + auto node = map.extract(*x); + to_release.push_back(&node.value()); + /// node_handle should be just key&val pointer + // Force make it empty can prevent destroy object + // when node_handle goes to destroy + memset((void*)&node, 0, sizeof(node)); +#endif + } + x = lru_list.front(); } } +#if __cplusplus < 201703L + to_release.clear(); +#else + to_release.delete_all(); +#endif return 0; } public: struct Borrow { ObjectCacheV2* _oc = nullptr; - Box* _item = nullptr; + Box* _box = nullptr; std::shared_ptr _reader; bool _recycle = false; Borrow() : _reader(nullptr) {} - Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr&& reader) - : _oc(oc), - _item(item), - _reader(std::move(reader)), - _recycle(false) {} + Borrow(ObjectCacheV2* oc, Box* box, const std::shared_ptr& reader) + : _oc(oc), _box(box), _reader(reader), _recycle(false) { + _box->acquire(); + } Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); } - Borrow& operator=(Borrow&& rhs) { - std::swap(_oc, rhs._oc); - std::swap(_item, rhs._item); - _reader = std::atomic_exchange(&rhs._reader, _reader); - std::swap(_reader, rhs._reader); - std::swap(_recycle, rhs._recycle); - return *this; - } + Borrow& operator=(Borrow&& rhs) = default; ~Borrow() { if (_recycle) { - _oc->__update(*_item, nullptr); + _box->reset(); + } + _box->release(); + if (_box->rc == 0) { + SCOPED_LOCK(_oc->maplock); + _oc->lru_list.pop(_box); + _oc->lru_list.push_back(_box); } } @@ -198,24 +201,27 @@ class ObjectCacheV2 { template Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) { - auto& item = __find_or_create_item(std::forward(key)); - auto r = item.reader(); + auto& box = __find_or_create_box(std::forward(key)); + DEFER(box.release()); + std::shared_ptr r{}; while (!r) { - if (item.boxlock.try_lock() == 0) { - DEFER(item.boxlock.unlock()); - r = item.reader(); + if (box.createlock.try_lock() == 0) { + DEFER(box.createlock.unlock()); + r = box.reader(); if (!r) { - if (photon::sat_add(item.lastcreate, cooldown) <= + if (photon::sat_add(box.lastcreate, cooldown) <= photon::now) { - r = __update(item, ctor()); + auto r = std::shared_ptr(ctor()); + box.update(r, photon::now); + return Borrow(this, &box, r); } - return Borrow(this, &item, std::move(r)); + return Borrow(this, &box, r); } } photon::thread_yield(); - r = item.reader(); + r = box.reader(); } - return Borrow(this, &item, std::move(r)); + return Borrow(this, &box, r); } template @@ -226,33 +232,22 @@ class ObjectCacheV2 { template Borrow update(KeyType&& key, Ctor&& ctor) { - auto item = __find_or_create_item(std::forward(key)); - auto r = __update(item, ctor()); - return Borrow(this, item, std::move(r)); + auto& box = __find_or_create_box(std::forward(key)); + DEFER(box.release()); + auto r = std::shared_ptr(ctor()); + box.update(r, photon::now); + return Borrow(this, &box, r); } ObjectCacheV2(uint64_t lifespan) : lifespan(lifespan), _timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true, - photon::DEFAULT_STACK_SIZE), - _exit(false) { - reclaimer = photon::thread_create11([this] { - while (!_exit) { - reclaim_queue.recv(); - } - }); - photon::thread_enable_join(reclaimer); - } + photon::DEFAULT_STACK_SIZE) {} ~ObjectCacheV2() { _timer.stop(); - if (reclaimer) { - _exit = true; - reclaim_queue.template send(nullptr); - photon::thread_join((photon::join_handle*)reclaimer); - reclaimer = nullptr; - } - SCOPED_LOCK(maplock); - cycle_list.node = nullptr; + // Should be no other access during dtor. + // modify lru_list do not need a lock. + lru_list.node = nullptr; } }; \ No newline at end of file diff --git a/common/ring.cpp b/common/ring.cpp index b87eafba..2ff5eac6 100644 --- a/common/ring.cpp +++ b/common/ring.cpp @@ -60,7 +60,7 @@ ssize_t RingBuffer::do_read(void *buf, size_t count) ssize_t RingBuffer::readv(const struct iovec *iov, int iovcnt) { ssize_t size = 0; - scoped_lock lock(m_read_lock); + photon::scoped_lock lock(m_read_lock); for (auto& x: ptr_array(iov, iovcnt)) { auto ret = do_read(x.iov_base, x.iov_len); @@ -110,7 +110,7 @@ ssize_t RingBuffer::do_write(const void* buf, size_t count) ssize_t RingBuffer::writev(const struct iovec *iov, int iovcnt) { ssize_t size = 0; - scoped_lock lock(m_write_lock); + photon::scoped_lock lock(m_write_lock); for (auto& x: ptr_array(iov, iovcnt)) { auto ret = do_write(x.iov_base, x.iov_len); diff --git a/common/test/CMakeLists.txt b/common/test/CMakeLists.txt index 1a30526c..6fe657bc 100644 --- a/common/test/CMakeLists.txt +++ b/common/test/CMakeLists.txt @@ -6,6 +6,7 @@ target_link_libraries(test-objcache PRIVATE photon_shared) add_test(NAME test-objcache COMMAND $) add_executable(perf-objcache perf_objcache.cpp) +set_target_properties(perf-objcache PROPERTIES CXX_STANDARD 17) target_link_libraries(perf-objcache PRIVATE photon_shared) add_test(NAME perf-objcache COMMAND $) diff --git a/common/test/perf_objcache.cpp b/common/test/perf_objcache.cpp index 71a7e558..64e8c103 100644 --- a/common/test/perf_objcache.cpp +++ b/common/test/perf_objcache.cpp @@ -40,7 +40,7 @@ void *task(void *arg) { for (const auto &x : k) { auto strx = std::to_string(x); auto b = oc->borrow(strx, [&strx] { - // photon::thread_usleep(1 * 1000); + photon::thread_usleep(1 * 1000); // LOG_INFO("CTOR `", photon::now); return new std::string(strx); });