Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some modify and fix in ObjectCacheV2 #531

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/enumerable.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct Enumerable
this->obj = nullptr;
}
using R = typename std::result_of<decltype(&T::get)(T)>::type;
R operator*() { return obj ? obj->get() : nullptr; }
R operator*() { return obj ? obj->get() : R{}; }
bool operator==(const iterator& rhs) const { return obj == rhs.obj; }
bool operator!=(const iterator& rhs) const { return !(*this == rhs); }
iterator& operator++()
Expand Down
249 changes: 122 additions & 127 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <memory>
#include <unordered_set>
#include <vector>

/**

Expand All @@ -36,154 +37,156 @@ class ObjectCacheV2 {
struct Box : public intrusive_list_node<Box> {
const K key;
std::shared_ptr<V> ref;
photon::spinlock boxlock;
// protect ref with mutex
// If multiple threads of execution access the same shared_ptr object
// without synchronization and any of those accesses uses a non-const
// member function of shared_ptr then a data race will occur
// DTOR of V may cause photon thread yield.
photon::mutex reflock;
// prevent create multiple time when borrow
photon::spinlock createlock;
// create timestamp, for cool-down of borrow
uint64_t lastcreate = 0;
// reclaim timestamp
uint64_t timestamp = 0;

struct Updater {
Box* box = nullptr;

~Updater() {}

// Returned RefPtr is the old one
// other reader will get new one after updated

std::shared_ptr<V> update(std::shared_ptr<V>& r, uint64_t ts = 0) {
box->lastcreate = ts;
r = std::atomic_exchange(&box->ref, r);
return r;
}
std::shared_ptr<V> update(V* val, uint64_t ts = 0,
std::shared_ptr<V>* newptr = nullptr) {
auto r = std::shared_ptr<V>(val);
if (newptr) *newptr = r;
return update(r, ts);
}

explicit Updater(Box* b) : box(b) {}

Updater(const Updater&) = delete;
Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); }
Updater& operator=(const Updater&) = delete;
Updater& operator=(Updater&& rhs) {
std::swap(box, rhs.box);
return *this;
}
operator bool() const { return box != nullptr; }
};
// Box reference count
std::atomic<uint64_t> rc{0};

Box() = default;
template <typename KeyType>
explicit Box(KeyType&& key)
: key(std::forward<KeyType>(key)), ref(nullptr) {}

~Box() {}
std::shared_ptr<V> update(std::shared_ptr<V> r, uint64_t ts = 0) {
SCOPED_LOCK(reflock);
lastcreate = ts;
std::swap(r, ref);
return r;
}
std::shared_ptr<V> reset(uint64_t ts = 0) {
return update({nullptr}, ts);
}
std::shared_ptr<V> reader() { return ref; }

Updater writer() { return Updater(this); }
std::shared_ptr<V> reader() { return std::atomic_load(&ref); }
void acquire() {
timestamp = photon::now;
rc.fetch_add(1, std::memory_order_relaxed);
}

void release() {
timestamp = photon::now;
// release reference should use stronger order
rc.fetch_sub(1, std::memory_order_seq_cst);
}
};
struct ItemHash {
// Hash and Equal for Box, for unordered_set
// simply use hash/equal of key
struct BoxHash {
size_t operator()(const Box& x) const { return std::hash<K>()(x.key); }
};
struct ItemEqual {
struct BoxEqual {
bool operator()(const Box& a, const Box& b) const {
return a.key == b.key;
}
};

photon::thread* reclaimer = nullptr;
photon::common::RingChannel<LockfreeMPMCRingQueue<std::shared_ptr<V>, 4096>>
reclaim_queue;
// protect object cache map
photon::spinlock maplock;
std::unordered_set<Box, ItemHash, ItemEqual> map;
intrusive_list<Box> cycle_list;
// protect lru list
std::unordered_set<Box, BoxHash, BoxEqual> map;
intrusive_list<Box> lru_list;
uint64_t lifespan;
photon::Timer _timer;
bool _exit = false;

std::shared_ptr<V> __update(Box& item, std::shared_ptr<V> val) {
auto ret_val = val;
auto old_val = item.writer().update(val, photon::now);
reclaim_queue.template send<PhotonPause>(old_val);
return ret_val;
}

std::shared_ptr<V> __update(Box& item, V* val) {
std::shared_ptr<V> ptr(val);
return __update(item, ptr);
}

template <typename KeyType>
Box& __find_or_create_item(KeyType&& key) {
Box keyitem(key);
Box* item = nullptr;
Box& __find_or_create_box(KeyType&& key) {
Box keybox(key);
Box* box = nullptr;
SCOPED_LOCK(maplock);
auto it = map.find(keyitem);
auto it = map.find(keybox);
if (it == map.end()) {
// item = new Box(std::forward<KeyType>(key));
auto rt = map.emplace(key);
if (rt.second) item = (Box*)&*rt.first;
auto rt = map.emplace(std::forward<KeyType>(key));
if (rt.second) box = (Box*)&*rt.first;
} else
item = (Box*)&*it;
assert(item);
item->timestamp = photon::now;
cycle_list.pop(item);
cycle_list.push_back(item);
return *item;
box = (Box*)&*it;
assert(box);
lru_list.pop(box);
box->acquire();
return *box;
}

uint64_t __expire() {
intrusive_list<Box> delete_list;
#if __cplusplus < 201703L
std::vector<std::shared_ptr<V>> to_release;
#else
intrusive_list<Box> to_release;
#endif
uint64_t now = photon::now;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's simpler to use now - lifespan

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if lifespan has been set a extra-big number (like UINT64_MAX, present object in objcache never reclaim

sat_add is necessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sat_sub(now, lifespan)

uint64_t reclaim_before = photon::sat_sub(now, lifespan);
{
SCOPED_LOCK(maplock);
if (cycle_list.empty()) return 0;
if (photon::sat_add(cycle_list.front()->timestamp, lifespan) >
now) {
return photon::sat_add(cycle_list.front()->timestamp,
lifespan) -
now;
if (lru_list.empty()) return 0;
if (lru_list.front()->timestamp > reclaim_before) {
return lru_list.front()->timestamp - reclaim_before;
}
auto x = cycle_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
cycle_list.pop(x);
__update(*x, nullptr);
map.erase(*x);
x = cycle_list.front();
auto x = lru_list.front();
// here requires lock dance for lru_list and kv
while (x && x->timestamp < reclaim_before) {
lru_list.pop(x);
if (x->rc == 0) {
// make vector holds those shared_ptr
// prevent object destroy in critical zone
#if __cplusplus < 201703L
to_release.push_back(x->reset());
map.erase(*x);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use map.extract().value() obtain the Box element, so as to use the intrusive_list for to_release.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems add macro to check c++ version to show different implementation for those above C++17 is a good idea.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem that C++17 node_handle owns the element Box, can not get out the pointer to Box without hacking.

#else
// Here is a hacked implementation
auto node = map.extract(*x);
to_release.push_back(&node.value());
/// node_handle should be just key&val pointer
// Force make it empty can prevent destroy object
// when node_handle goes to destroy
memset((void*)&node, 0, sizeof(node));
#endif
}
x = lru_list.front();
}
}
#if __cplusplus < 201703L
to_release.clear();
#else
to_release.delete_all();
#endif
return 0;
}

public:
struct Borrow {
ObjectCacheV2* _oc = nullptr;
Box* _item = nullptr;
Box* _box = nullptr;
std::shared_ptr<V> _reader;
bool _recycle = false;

Borrow() : _reader(nullptr) {}

Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr<V>&& reader)
: _oc(oc),
_item(item),
_reader(std::move(reader)),
_recycle(false) {}
Borrow(ObjectCacheV2* oc, Box* box, const std::shared_ptr<V>& reader)
: _oc(oc), _box(box), _reader(reader), _recycle(false) {
_box->acquire();
}

Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); }

Borrow& operator=(Borrow&& rhs) {
std::swap(_oc, rhs._oc);
std::swap(_item, rhs._item);
_reader = std::atomic_exchange(&rhs._reader, _reader);
std::swap(_reader, rhs._reader);
std::swap(_recycle, rhs._recycle);
return *this;
}
Borrow& operator=(Borrow&& rhs) = default;

~Borrow() {
if (_recycle) {
_oc->__update(*_item, nullptr);
_box->reset();
}
_box->release();
if (_box->rc == 0) {
SCOPED_LOCK(_oc->maplock);
_oc->lru_list.pop(_box);
_oc->lru_list.push_back(_box);
}
}

Expand All @@ -198,24 +201,27 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) {
auto& item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item.reader();
auto& box = __find_or_create_box(std::forward<KeyType>(key));
DEFER(box.release());
std::shared_ptr<V> r{};
while (!r) {
if (item.boxlock.try_lock() == 0) {
DEFER(item.boxlock.unlock());
r = item.reader();
if (box.createlock.try_lock() == 0) {
DEFER(box.createlock.unlock());
r = box.reader();
if (!r) {
if (photon::sat_add(item.lastcreate, cooldown) <=
if (photon::sat_add(box.lastcreate, cooldown) <=
photon::now) {
r = __update(item, ctor());
auto r = std::shared_ptr<V>(ctor());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctor() may return a shared_ptr?

box.update(r, photon::now);
return Borrow(this, &box, r);
}
return Borrow(this, &item, std::move(r));
return Borrow(this, &box, r);
}
}
photon::thread_yield();
r = item.reader();
r = box.reader();
}
return Borrow(this, &item, std::move(r));
return Borrow(this, &box, r);
}

template <typename KeyType>
Expand All @@ -226,33 +232,22 @@ class ObjectCacheV2 {

template <typename KeyType, typename Ctor>
Borrow update(KeyType&& key, Ctor&& ctor) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = __update(item, ctor());
return Borrow(this, item, std::move(r));
auto& box = __find_or_create_box(std::forward<KeyType>(key));
DEFER(box.release());
auto r = std::shared_ptr<V>(ctor());
box.update(r, photon::now);
return Borrow(this, &box, r);
}

ObjectCacheV2(uint64_t lifespan)
: lifespan(lifespan),
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true,
photon::DEFAULT_STACK_SIZE),
_exit(false) {
reclaimer = photon::thread_create11([this] {
while (!_exit) {
reclaim_queue.recv();
}
});
photon::thread_enable_join(reclaimer);
}
photon::DEFAULT_STACK_SIZE) {}

~ObjectCacheV2() {
_timer.stop();
if (reclaimer) {
_exit = true;
reclaim_queue.template send<PhotonPause>(nullptr);
photon::thread_join((photon::join_handle*)reclaimer);
reclaimer = nullptr;
}
SCOPED_LOCK(maplock);
cycle_list.node = nullptr;
// Should be no other access during dtor.
// modify lru_list do not need a lock.
lru_list.node = nullptr;
}
};
4 changes: 2 additions & 2 deletions common/ring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ ssize_t RingBuffer::do_read(void *buf, size_t count)
ssize_t RingBuffer::readv(const struct iovec *iov, int iovcnt)
{
ssize_t size = 0;
scoped_lock lock(m_read_lock);
photon::scoped_lock lock(m_read_lock);
for (auto& x: ptr_array(iov, iovcnt))
{
auto ret = do_read(x.iov_base, x.iov_len);
Expand Down Expand Up @@ -110,7 +110,7 @@ ssize_t RingBuffer::do_write(const void* buf, size_t count)
ssize_t RingBuffer::writev(const struct iovec *iov, int iovcnt)
{
ssize_t size = 0;
scoped_lock lock(m_write_lock);
photon::scoped_lock lock(m_write_lock);
for (auto& x: ptr_array(iov, iovcnt))
{
auto ret = do_write(x.iov_base, x.iov_len);
Expand Down
1 change: 1 addition & 0 deletions common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ target_link_libraries(test-objcache PRIVATE photon_shared)
add_test(NAME test-objcache COMMAND $<TARGET_FILE:test-objcache>)

add_executable(perf-objcache perf_objcache.cpp)
set_target_properties(perf-objcache PROPERTIES CXX_STANDARD 17)
target_link_libraries(perf-objcache PRIVATE photon_shared)
add_test(NAME perf-objcache COMMAND $<TARGET_FILE:test-objcache>)

Expand Down
2 changes: 1 addition & 1 deletion common/test/perf_objcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void *task(void *arg) {
for (const auto &x : k) {
auto strx = std::to_string(x);
auto b = oc->borrow(strx, [&strx] {
// photon::thread_usleep(1 * 1000);
photon::thread_usleep(1 * 1000);
// LOG_INFO("CTOR `", photon::now);
return new std::string(strx);
});
Expand Down
Loading