Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some modify and fix in ObjectCacheV2 #531

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 29 additions & 58 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,20 @@ class ObjectCacheV2 {
}
};

photon::thread* reclaimer = nullptr;
photon::common::RingChannel<LockfreeMPMCRingQueue<std::shared_ptr<V>, 4096>>
reclaim_queue;
photon::spinlock maplock;
std::unordered_set<Box, ItemHash, ItemEqual> map;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use box/Box instead of item/Item

intrusive_list<Box> cycle_list;
intrusive_list<Box> lru_list;
uint64_t lifespan;
photon::Timer _timer;
bool _exit = false;
photon::spinlock maplock;

std::shared_ptr<V> __update(Box& item, std::shared_ptr<V> val) {
auto ret_val = val;
auto old_val = item.writer().update(val, photon::now);
reclaim_queue.template send<PhotonPause>(old_val);
item.writer().update(val, photon::now);
return ret_val;
}

std::shared_ptr<V> __update(Box& item, V* val) {
std::shared_ptr<V> ptr(val);
return __update(item, ptr);
return __update(item, std::shared_ptr<V>(val));
}

template <typename KeyType>
Expand All @@ -120,37 +114,32 @@ class ObjectCacheV2 {
SCOPED_LOCK(maplock);
auto it = map.find(keyitem);
if (it == map.end()) {
// item = new Box(std::forward<KeyType>(key));
auto rt = map.emplace(key);
auto rt = map.emplace(std::forward<KeyType>(key));
if (rt.second) item = (Box*)&*rt.first;
} else
} else {
item = (Box*)&*it;
}
assert(item);
item->timestamp = photon::now;
cycle_list.pop(item);
cycle_list.push_back(item);
lru_list.pop(item);
lru_list.push_back(item);
return *item;
}

uint64_t __expire() {
intrusive_list<Box> delete_list;
uint64_t now = photon::now;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's simpler to use now - lifespan

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if lifespan has been set a extra-big number (like UINT64_MAX, present object in objcache never reclaim

sat_add is necessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sat_sub(now, lifespan)

{
SCOPED_LOCK(maplock);
if (cycle_list.empty()) return 0;
if (photon::sat_add(cycle_list.front()->timestamp, lifespan) >
now) {
return photon::sat_add(cycle_list.front()->timestamp,
lifespan) -
now;
}
auto x = cycle_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
cycle_list.pop(x);
__update(*x, nullptr);
map.erase(*x);
x = cycle_list.front();
}
SCOPED_LOCK(maplock);
if (lru_list.empty()) return 0;
if (photon::sat_add(lru_list.front()->timestamp, lifespan) > now) {
return photon::sat_add(lru_list.front()->timestamp, lifespan) -
now;
}
auto x = lru_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
lru_list.pop(x);
__update(*x, nullptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more performant to realize x->reset(), which simply invokes shared_ptr::reset().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In new implementation still use exchange/swap like stuff instead of reset().

swapped out object still have at lease one reference if result is assigned to another variable, so it could able to control when to perform object destruction.

map.erase(*x);
x = lru_list.front();
}
return 0;
}
Expand All @@ -164,20 +153,16 @@ class ObjectCacheV2 {

Borrow() : _reader(nullptr) {}

Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr<V>&& reader)
: _oc(oc),
_item(item),
_reader(std::move(reader)),
_recycle(false) {}
Borrow(ObjectCacheV2* oc, Box* item, const std::shared_ptr<V>& reader)
: _oc(oc), _item(item), _reader(reader), _recycle(false) {}

Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); }

Borrow& operator=(Borrow&& rhs) {
Copy link
Collaborator

@lihuiba lihuiba Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of swap/exchange, it's more semantically right to:

assert(_oc == rhs._oc);
std::atomic_exchange(&_reader, rhs._reader);
_item = rhs._item;
_recycle = rhs._recycle;
rhs._reader.reset();
rhs._item = nullptr;
rhs._recycle = false;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to cppreference https://en.cppreference.com/w/cpp/memory/shared_ptr

If multiple threads of execution access the same shared_ptr object without synchronization and any of those accesses uses a non-const member function of shared_ptr then a data race will occur

Since atomic<shared_ptr> is not supported by c++11/14/17, an new implementation using photon::mutex helps keep shared_ptr update in critical zone.

std::swap(_oc, rhs._oc);
std::swap(_item, rhs._item);
_reader = std::atomic_exchange(&rhs._reader, _reader);
std::swap(_reader, rhs._reader);
std::swap(_recycle, rhs._recycle);
rhs._reader = std::atomic_exchange(&_reader, std::move(rhs._reader));
return *this;
}

Expand Down Expand Up @@ -209,13 +194,13 @@ class ObjectCacheV2 {
photon::now) {
r = __update(item, ctor());
}
return Borrow(this, &item, std::move(r));
return Borrow(this, &item, r);
}
}
photon::thread_yield();
r = item.reader();
}
return Borrow(this, &item, std::move(r));
return Borrow(this, &item, r);
}

template <typename KeyType>
Expand All @@ -228,31 +213,17 @@ class ObjectCacheV2 {
Borrow update(KeyType&& key, Ctor&& ctor) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = __update(item, ctor());
return Borrow(this, item, std::move(r));
return Borrow(this, item, r);
}

ObjectCacheV2(uint64_t lifespan)
: lifespan(lifespan),
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true,
photon::DEFAULT_STACK_SIZE),
_exit(false) {
reclaimer = photon::thread_create11([this] {
while (!_exit) {
reclaim_queue.recv();
}
});
photon::thread_enable_join(reclaimer);
}
photon::DEFAULT_STACK_SIZE) {}

~ObjectCacheV2() {
_timer.stop();
if (reclaimer) {
_exit = true;
reclaim_queue.template send<PhotonPause>(nullptr);
photon::thread_join((photon::join_handle*)reclaimer);
reclaimer = nullptr;
}
SCOPED_LOCK(maplock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use maplock only to protect a single assignment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code here want to access intrusive_list only for preventing hit assert of list.

Technically in dtor, other threads should not have any other accesses to the whole object cache, the lock here could be ignored.

I will remove the lock acquiring here

cycle_list.node = nullptr;
lru_list.node = nullptr;
}
};
Loading