Skip to content

Commit

Permalink
Refractoring and Fix reclaiming
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Aug 6, 2024
1 parent f77b7c3 commit fbf812b
Showing 1 changed file with 117 additions and 103 deletions.
220 changes: 117 additions & 103 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,139 +36,148 @@ class ObjectCacheV2 {
struct Box : public intrusive_list_node<Box> {
const K key;
std::shared_ptr<V> ref;
photon::spinlock boxlock;
// protect ref with mutex
// If multiple threads of execution access the same shared_ptr object
// without synchronization and any of those accesses uses a non-const
// member function of shared_ptr then a data race will occur
// DTOR of V may cause photon thread yield.
photon::mutex reflock;
// prevent create multiple time when borrow
photon::spinlock createlock;
// create timestamp, for cool-down of borrow
uint64_t lastcreate = 0;
// reclaim timestamp
uint64_t timestamp = 0;

struct Updater {
Box* box = nullptr;

~Updater() {}

// Returned RefPtr is the old one
// other reader will get new one after updated

std::shared_ptr<V> update(std::shared_ptr<V>& r, uint64_t ts = 0) {
box->lastcreate = ts;
r = std::atomic_exchange(&box->ref, r);
return r;
}
std::shared_ptr<V> update(V* val, uint64_t ts = 0,
std::shared_ptr<V>* newptr = nullptr) {
auto r = std::shared_ptr<V>(val);
if (newptr) *newptr = r;
return update(r, ts);
}

explicit Updater(Box* b) : box(b) {}

Updater(const Updater&) = delete;
Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); }
Updater& operator=(const Updater&) = delete;
Updater& operator=(Updater&& rhs) {
std::swap(box, rhs.box);
return *this;
}
operator bool() const { return box != nullptr; }
};
// Box reference count
std::atomic<uint64_t> rc{0};

Box() = default;
template <typename KeyType>
explicit Box(KeyType&& key)
: key(std::forward<KeyType>(key)), ref(nullptr) {}

~Box() {}
~Box() {
assert(rc == 0);
}

std::shared_ptr<V> update(std::shared_ptr<V> r, uint64_t ts = 0) {
SCOPED_LOCK(reflock);
lastcreate = ts;
std::swap(r, ref);
return r;
}
void clear(uint64_t ts = 0) {
SCOPED_LOCK(reflock);
lastcreate = ts;
ref.reset();
}
std::shared_ptr<V> reader() { return ref; }

void acquire() {
timestamp = photon::now;
rc.fetch_add(1, std::memory_order_relaxed);
}

Updater writer() { return Updater(this); }
std::shared_ptr<V> reader() { return std::atomic_load(&ref); }
void release() {
timestamp = photon::now;
// release reference should use stronger order
rc.fetch_sub(1, std::memory_order_seq_cst);
}
};
struct ItemHash {
// Hash and Equal for Box, for unordered_set
// simply use hash/equal of key
struct BoxHash {
size_t operator()(const Box& x) const { return std::hash<K>()(x.key); }
};
struct ItemEqual {
struct BoxEqual {
bool operator()(const Box& a, const Box& b) const {
return a.key == b.key;
}
};

std::unordered_set<Box, ItemHash, ItemEqual> map;
// protect object cache map
photon::spinlock maplock;
// protect lru list
photon::spinlock listlock;
std::unordered_set<Box, BoxHash, BoxEqual> map;
intrusive_list<Box> lru_list;
uint64_t lifespan;
photon::Timer _timer;
photon::spinlock maplock;

std::shared_ptr<V> __update(Box& item, std::shared_ptr<V> val) {
auto ret_val = val;
item.writer().update(val, photon::now);
return ret_val;
}

std::shared_ptr<V> __update(Box& item, V* val) {
return __update(item, std::shared_ptr<V>(val));
}
bool _exit = false;

template <typename KeyType>
Box& __find_or_create_item(KeyType&& key) {
Box keyitem(key);
Box* item = nullptr;
SCOPED_LOCK(maplock);
auto it = map.find(keyitem);
if (it == map.end()) {
auto rt = map.emplace(std::forward<KeyType>(key));
if (rt.second) item = (Box*)&*rt.first;
} else {
item = (Box*)&*it;
Box& __find_or_create_box(KeyType&& key) {
Box keybox(key);
Box* box = nullptr;
{
SCOPED_LOCK(maplock);
auto it = map.find(keybox);
if (it == map.end()) {
auto rt = map.emplace(std::forward<KeyType>(key));
if (rt.second) box = (Box*)&*rt.first;
} else
box = (Box*)&*it;
}
assert(item);
item->timestamp = photon::now;
lru_list.pop(item);
lru_list.push_back(item);
return *item;
assert(box);
box->acquire();
return *box;
}

uint64_t __expire() {
uint64_t now = photon::now;
SCOPED_LOCK(maplock);
if (lru_list.empty()) return 0;
if (photon::sat_add(lru_list.front()->timestamp, lifespan) > now) {
return photon::sat_add(lru_list.front()->timestamp, lifespan) -
now;
}
auto x = lru_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
lru_list.pop(x);
__update(*x, nullptr);
map.erase(*x);
x = lru_list.front();
{
SCOPED_LOCK(listlock);
if (lru_list.empty()) return 0;
if (photon::sat_add(lru_list.front()->timestamp, lifespan) > now) {
return photon::sat_add(lru_list.front()->timestamp, lifespan) -
now;
}
auto x = lru_list.front();
// here requires lock dance for lru_list and kv
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
lru_list.pop(x);
// modify map have to hold map lock
// but holding both lock may cause deadlock
if (x->rc == 0) {
listlock.unlock();
x->clear();
{
SCOPED_LOCK(maplock);
map.erase(*x);
}
listlock.lock();
}
x = lru_list.front();
}
}
return 0;
}

public:
struct Borrow {
ObjectCacheV2* _oc = nullptr;
Box* _item = nullptr;
Box* _box = nullptr;
std::shared_ptr<V> _reader;
bool _recycle = false;

Borrow() : _reader(nullptr) {}

Borrow(ObjectCacheV2* oc, Box* item, const std::shared_ptr<V>& reader)
: _oc(oc), _item(item), _reader(reader), _recycle(false) {}
Borrow(ObjectCacheV2* oc, Box* box, const std::shared_ptr<V>& reader)
: _oc(oc), _box(box), _reader(reader), _recycle(false) {
_box->acquire();
}

Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); }

Borrow& operator=(Borrow&& rhs) {
std::swap(_oc, rhs._oc);
std::swap(_item, rhs._item);
std::swap(_recycle, rhs._recycle);
rhs._reader = std::atomic_exchange(&_reader, std::move(rhs._reader));
return *this;
}
Borrow& operator=(Borrow&& rhs) = default;

~Borrow() {
if (_recycle) {
_oc->__update(*_item, nullptr);
_box->clear();
}
_box->release();
if (_box->rc == 0) {
SCOPED_LOCK(_oc->listlock);
_oc->lru_list.push_back(_box);
}
}

Expand All @@ -183,24 +192,27 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) {
auto& item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item.reader();
auto& box = __find_or_create_box(std::forward<KeyType>(key));
DEFER(box.release());
std::shared_ptr<V> r{};
while (!r) {
if (item.boxlock.try_lock() == 0) {
DEFER(item.boxlock.unlock());
r = item.reader();
if (box.createlock.try_lock() == 0) {
DEFER(box.createlock.unlock());
r = box.reader();
if (!r) {
if (photon::sat_add(item.lastcreate, cooldown) <=
if (photon::sat_add(box.lastcreate, cooldown) <=
photon::now) {
r = __update(item, ctor());
auto r = std::shared_ptr<V>(ctor());
box.update(r, photon::now);
return Borrow(this, &box, r);
}
return Borrow(this, &item, r);
return Borrow(this, &box, r);
}
}
photon::thread_yield();
r = item.reader();
r = box.reader();
}
return Borrow(this, &item, r);
return Borrow(this, &box, r);
}

template <typename KeyType>
Expand All @@ -211,9 +223,11 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow update(KeyType&& key, Ctor&& ctor) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = __update(item, ctor());
return Borrow(this, item, r);
auto box = __find_or_create_box(std::forward<KeyType>(key));
DEFER(box.release());
auto r = ctor();
box.update(r, photon::now);
return Borrow(this, box, r);
}

ObjectCacheV2(uint64_t lifespan)
Expand All @@ -223,7 +237,7 @@ class ObjectCacheV2 {

~ObjectCacheV2() {
_timer.stop();
SCOPED_LOCK(maplock);
SCOPED_LOCK(listlock);
lru_list.node = nullptr;
}
};

0 comments on commit fbf812b

Please sign in to comment.