diff --git a/common/objectcachev2.h b/common/objectcachev2.h index de07cd4c..dc9c88d6 100644 --- a/common/objectcachev2.h +++ b/common/objectcachev2.h @@ -36,110 +36,118 @@ class ObjectCacheV2 { struct Box : public intrusive_list_node { const K key; std::shared_ptr ref; - photon::spinlock boxlock; + // protect ref with mutex + // If multiple threads of execution access the same shared_ptr object + // without synchronization and any of those accesses uses a non-const + // member function of shared_ptr then a data race will occur + // DTOR of V may cause photon thread yield. + photon::mutex reflock; + // prevent create multiple time when borrow + photon::spinlock createlock; + // create timestamp, for cool-down of borrow uint64_t lastcreate = 0; + // reclaim timestamp uint64_t timestamp = 0; - - struct Updater { - Box* box = nullptr; - - ~Updater() {} - - // Returned RefPtr is the old one - // other reader will get new one after updated - - std::shared_ptr update(std::shared_ptr& r, uint64_t ts = 0) { - box->lastcreate = ts; - r = std::atomic_exchange(&box->ref, r); - return r; - } - std::shared_ptr update(V* val, uint64_t ts = 0, - std::shared_ptr* newptr = nullptr) { - auto r = std::shared_ptr(val); - if (newptr) *newptr = r; - return update(r, ts); - } - - explicit Updater(Box* b) : box(b) {} - - Updater(const Updater&) = delete; - Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); } - Updater& operator=(const Updater&) = delete; - Updater& operator=(Updater&& rhs) { - std::swap(box, rhs.box); - return *this; - } - operator bool() const { return box != nullptr; } - }; + // Box reference count + std::atomic rc{0}; Box() = default; template explicit Box(KeyType&& key) : key(std::forward(key)), ref(nullptr) {} - ~Box() {} + ~Box() { + assert(rc == 0); + } + + std::shared_ptr update(std::shared_ptr r, uint64_t ts = 0) { + SCOPED_LOCK(reflock); + lastcreate = ts; + std::swap(r, ref); + return r; + } + void clear(uint64_t ts = 0) { + SCOPED_LOCK(reflock); + lastcreate = ts; + ref.reset(); + } + std::shared_ptr reader() { return ref; } + + void acquire() { + timestamp = photon::now; + rc.fetch_add(1, std::memory_order_relaxed); + } - Updater writer() { return Updater(this); } - std::shared_ptr reader() { return std::atomic_load(&ref); } + void release() { + timestamp = photon::now; + // release reference should use stronger order + rc.fetch_sub(1, std::memory_order_seq_cst); + } }; - struct ItemHash { + // Hash and Equal for Box, for unordered_set + // simply use hash/equal of key + struct BoxHash { size_t operator()(const Box& x) const { return std::hash()(x.key); } }; - struct ItemEqual { + struct BoxEqual { bool operator()(const Box& a, const Box& b) const { return a.key == b.key; } }; - std::unordered_set map; + // protect object cache map + photon::spinlock maplock; + // protect lru list + photon::spinlock listlock; + std::unordered_set map; intrusive_list lru_list; uint64_t lifespan; photon::Timer _timer; - photon::spinlock maplock; - - std::shared_ptr __update(Box& item, std::shared_ptr val) { - auto ret_val = val; - item.writer().update(val, photon::now); - return ret_val; - } - - std::shared_ptr __update(Box& item, V* val) { - return __update(item, std::shared_ptr(val)); - } + bool _exit = false; template - Box& __find_or_create_item(KeyType&& key) { - Box keyitem(key); - Box* item = nullptr; - SCOPED_LOCK(maplock); - auto it = map.find(keyitem); - if (it == map.end()) { - auto rt = map.emplace(std::forward(key)); - if (rt.second) item = (Box*)&*rt.first; - } else { - item = (Box*)&*it; + Box& __find_or_create_box(KeyType&& key) { + Box keybox(key); + Box* box = nullptr; + { + SCOPED_LOCK(maplock); + auto it = map.find(keybox); + if (it == map.end()) { + auto rt = map.emplace(std::forward(key)); + if (rt.second) box = (Box*)&*rt.first; + } else + box = (Box*)&*it; } - assert(item); - item->timestamp = photon::now; - lru_list.pop(item); - lru_list.push_back(item); - return *item; + assert(box); + box->acquire(); + return *box; } - uint64_t __expire() { uint64_t now = photon::now; - SCOPED_LOCK(maplock); - if (lru_list.empty()) return 0; - if (photon::sat_add(lru_list.front()->timestamp, lifespan) > now) { - return photon::sat_add(lru_list.front()->timestamp, lifespan) - - now; - } - auto x = lru_list.front(); - while (x && (photon::sat_add(x->timestamp, lifespan) < now)) { - lru_list.pop(x); - __update(*x, nullptr); - map.erase(*x); - x = lru_list.front(); + { + SCOPED_LOCK(listlock); + if (lru_list.empty()) return 0; + if (photon::sat_add(lru_list.front()->timestamp, lifespan) > now) { + return photon::sat_add(lru_list.front()->timestamp, lifespan) - + now; + } + auto x = lru_list.front(); + // here requires lock dance for lru_list and kv + while (x && (photon::sat_add(x->timestamp, lifespan) < now)) { + lru_list.pop(x); + // modify map have to hold map lock + // but holding both lock may cause deadlock + if (x->rc == 0) { + listlock.unlock(); + x->clear(); + { + SCOPED_LOCK(maplock); + map.erase(*x); + } + listlock.lock(); + } + x = lru_list.front(); + } } return 0; } @@ -147,28 +155,29 @@ class ObjectCacheV2 { public: struct Borrow { ObjectCacheV2* _oc = nullptr; - Box* _item = nullptr; + Box* _box = nullptr; std::shared_ptr _reader; bool _recycle = false; Borrow() : _reader(nullptr) {} - Borrow(ObjectCacheV2* oc, Box* item, const std::shared_ptr& reader) - : _oc(oc), _item(item), _reader(reader), _recycle(false) {} + Borrow(ObjectCacheV2* oc, Box* box, const std::shared_ptr& reader) + : _oc(oc), _box(box), _reader(reader), _recycle(false) { + _box->acquire(); + } Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); } - Borrow& operator=(Borrow&& rhs) { - std::swap(_oc, rhs._oc); - std::swap(_item, rhs._item); - std::swap(_recycle, rhs._recycle); - rhs._reader = std::atomic_exchange(&_reader, std::move(rhs._reader)); - return *this; - } + Borrow& operator=(Borrow&& rhs) = default; ~Borrow() { if (_recycle) { - _oc->__update(*_item, nullptr); + _box->clear(); + } + _box->release(); + if (_box->rc == 0) { + SCOPED_LOCK(_oc->listlock); + _oc->lru_list.push_back(_box); } } @@ -183,24 +192,27 @@ class ObjectCacheV2 { template Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) { - auto& item = __find_or_create_item(std::forward(key)); - auto r = item.reader(); + auto& box = __find_or_create_box(std::forward(key)); + DEFER(box.release()); + std::shared_ptr r{}; while (!r) { - if (item.boxlock.try_lock() == 0) { - DEFER(item.boxlock.unlock()); - r = item.reader(); + if (box.createlock.try_lock() == 0) { + DEFER(box.createlock.unlock()); + r = box.reader(); if (!r) { - if (photon::sat_add(item.lastcreate, cooldown) <= + if (photon::sat_add(box.lastcreate, cooldown) <= photon::now) { - r = __update(item, ctor()); + auto r = std::shared_ptr(ctor()); + box.update(r, photon::now); + return Borrow(this, &box, r); } - return Borrow(this, &item, r); + return Borrow(this, &box, r); } } photon::thread_yield(); - r = item.reader(); + r = box.reader(); } - return Borrow(this, &item, r); + return Borrow(this, &box, r); } template @@ -211,9 +223,11 @@ class ObjectCacheV2 { template Borrow update(KeyType&& key, Ctor&& ctor) { - auto item = __find_or_create_item(std::forward(key)); - auto r = __update(item, ctor()); - return Borrow(this, item, r); + auto box = __find_or_create_box(std::forward(key)); + DEFER(box.release()); + auto r = ctor(); + box.update(r, photon::now); + return Borrow(this, box, r); } ObjectCacheV2(uint64_t lifespan) @@ -223,7 +237,7 @@ class ObjectCacheV2 { ~ObjectCacheV2() { _timer.stop(); - SCOPED_LOCK(maplock); + SCOPED_LOCK(listlock); lru_list.node = nullptr; } }; \ No newline at end of file