Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objcachev2 #522

Merged
merged 5 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ limitations under the License.
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <mutex>
#include <memory>
#include <thread>
#include <utility>
#ifndef __aarch64__
Expand Down Expand Up @@ -90,16 +90,23 @@ struct PhotonPause : PauseBase {
}
};

template <typename T>
struct is_shared_ptr : std::false_type {};
template <typename T>
struct is_shared_ptr<std::shared_ptr<T>> : std::true_type {};

template <typename T, size_t N>
class LockfreeRingQueueBase {
public:
#if __cplusplus < 201402L
static_assert(std::has_trivial_copy_constructor<T>::value &&
std::has_trivial_copy_assign<T>::value,
static_assert((std::has_trivial_copy_constructor<T>::value &&
std::has_trivial_copy_assign<T>::value) ||
is_shared_ptr<T>::value,
"T should be trivially copyable");
#else
static_assert(std::is_trivially_copy_constructible<T>::value &&
std::is_trivially_copy_assignable<T>::value,
static_assert((std::is_trivially_copy_constructible<T>::value &&
std::is_trivially_copy_assignable<T>::value) ||
is_shared_ptr<T>::value,
"T should be trivially copyable");
#endif

Expand Down
258 changes: 258 additions & 0 deletions common/objectcachev2.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#pragma once

#include <photon/common/alog.h>
#include <photon/common/lockfree_queue.h>
#include <photon/photon.h>
#include <photon/thread/list.h>
#include <photon/thread/thread11.h>
#include <photon/thread/timer.h>
#include <photon/thread/workerpool.h>

#include <memory>
#include <unordered_set>

/**
A simpler but effective object cache implementation.
`ObjectCacheV2` shows better performance compare to `ObjectCache`, with several
improvements:
1. Less lock in both read and write.
2. Object destruction always goes in background photon thread called reclaimer.
3. Self adjustable timeout for reclaimer. No needs to set cycler timer.
4. New `update` API, able to immediately substitute objects.
5. `ObjectCacheV2` no longer support acquire/release API.
It should work as `ObjectCache` in code do not depends on acquire/release API.
**/

template <typename K, typename VPtr>
class ObjectCacheV2 {
protected:
using V = std::remove_pointer_t<VPtr>;

struct Box : public intrusive_list_node<Box> {
const K key;
std::shared_ptr<V> ref;
photon::spinlock boxlock;
uint64_t lastcreate = 0;
uint64_t timestamp = 0;

struct Updater {
Box* box = nullptr;

~Updater() {}

// Returned RefPtr is the old one
// other reader will get new one after updated

std::shared_ptr<V> update(std::shared_ptr<V>& r, uint64_t ts = 0) {
box->lastcreate = ts;
r = std::atomic_exchange(&box->ref, r);
return r;
}
std::shared_ptr<V> update(V* val, uint64_t ts = 0,
std::shared_ptr<V>* newptr = nullptr) {
auto r = std::shared_ptr<V>(val);
if (newptr) *newptr = r;
return update(r, ts);
}

explicit Updater(Box* b) : box(b) {}

Updater(const Updater&) = delete;
Updater(Updater&& rhs) : box(nullptr) { *this = std::move(rhs); }
Updater& operator=(const Updater&) = delete;
Updater& operator=(Updater&& rhs) {
std::swap(box, rhs.box);
return *this;
}
operator bool() const { return box != nullptr; }
};

Box() = default;
template <typename KeyType>
explicit Box(KeyType&& key)
: key(std::forward<KeyType>(key)), ref(nullptr) {}

~Box() {}

Updater writer() { return Updater(this); }
std::shared_ptr<V> reader() { return std::atomic_load(&ref); }
};
struct ItemHash {
size_t operator()(const Box& x) const { return std::hash<K>()(x.key); }
};
struct ItemEqual {
bool operator()(const Box& a, const Box& b) const {
return a.key == b.key;
}
};

photon::thread* reclaimer = nullptr;
photon::common::RingChannel<LockfreeMPMCRingQueue<std::shared_ptr<V>, 4096>>
reclaim_queue;
photon::spinlock maplock;
std::unordered_set<Box, ItemHash, ItemEqual> map;
intrusive_list<Box> cycle_list;
uint64_t lifespan;
photon::Timer _timer;
bool _exit = false;

std::shared_ptr<V> __update(Box& item, std::shared_ptr<V> val) {
auto ret_val = val;
auto old_val = item.writer().update(val, photon::now);
reclaim_queue.template send<PhotonPause>(old_val);
return ret_val;
}

std::shared_ptr<V> __update(Box& item, V* val) {
std::shared_ptr<V> ptr(val);
return __update(item, ptr);
}

template <typename KeyType>
Box& __find_or_create_item(KeyType&& key) {
Box keyitem(key);
Box* item = nullptr;
SCOPED_LOCK(maplock);
auto it = map.find(keyitem);
if (it == map.end()) {
// item = new Box(std::forward<KeyType>(key));
auto rt = map.emplace(key);
if (rt.second) item = (Box*)&*rt.first;
} else
item = (Box*)&*it;
assert(item);
item->timestamp = photon::now;
cycle_list.pop(item);
cycle_list.push_back(item);
return *item;
}

uint64_t __expire() {
intrusive_list<Box> delete_list;
uint64_t now = photon::now;
{
SCOPED_LOCK(maplock);
if (cycle_list.empty()) return 0;
if (photon::sat_add(cycle_list.front()->timestamp, lifespan) >
now) {
return photon::sat_add(cycle_list.front()->timestamp,
lifespan) -
now;
}
auto x = cycle_list.front();
while (x && (photon::sat_add(x->timestamp, lifespan) < now)) {
cycle_list.pop(x);
__update(*x, nullptr);
map.erase(*x);
x = cycle_list.front();
}
}
return 0;
}

public:
struct Borrow {
ObjectCacheV2* _oc = nullptr;
Box* _item = nullptr;
std::shared_ptr<V> _reader;
bool _recycle = false;

Borrow() : _reader(nullptr) {}

Borrow(ObjectCacheV2* oc, Box* item, std::shared_ptr<V>&& reader)
: _oc(oc),
_item(item),
_reader(std::move(reader)),
_recycle(false) {}

Borrow(Borrow&& rhs) : _reader(nullptr) { *this = std::move(rhs); }

Borrow& operator=(Borrow&& rhs) {
std::swap(_oc, rhs._oc);
std::swap(_item, rhs._item);
_reader = std::atomic_exchange(&rhs._reader, _reader);
std::swap(_reader, rhs._reader);
std::swap(_recycle, rhs._recycle);
return *this;
}

~Borrow() {
if (_recycle) {
_oc->__update(*_item, nullptr);
}
}

bool recycle() { return _recycle; }

bool recycle(bool x) { return _recycle = x; }

V& operator*() const { return *_reader; }
V* operator->() const { return &*_reader; }
operator bool() const { return (bool)_reader; }
};

template <typename KeyType, typename Ctor>
Borrow borrow(KeyType&& key, Ctor&& ctor, uint64_t cooldown = 0UL) {
auto& item = __find_or_create_item(std::forward<KeyType>(key));
auto r = item.reader();
while (!r) {
if (item.boxlock.try_lock() == 0) {
DEFER(item.boxlock.unlock());
r = item.reader();
if (!r) {
if (photon::sat_add(item.lastcreate, cooldown) <=
photon::now) {
r = __update(item, ctor());
}
return Borrow(this, &item, std::move(r));
}
}
photon::thread_yield();
r = item.reader();
}
return Borrow(this, &item, std::move(r));
}

template <typename KeyType>
Borrow borrow(KeyType&& key) {
return borrow(std::forward<KeyType>(key),
[&]() { return std::make_shared<V>(); });
}

template <typename KeyType, typename Ctor>
Borrow update(KeyType&& key, Ctor&& ctor) {
auto item = __find_or_create_item(std::forward<KeyType>(key));
auto r = __update(item, ctor());
return Borrow(this, item, std::move(r));
}

ObjectCacheV2(uint64_t lifespan)
: lifespan(lifespan),
_timer(1UL * 1000 * 1000, {this, &ObjectCacheV2::__expire}, true,
photon::DEFAULT_STACK_SIZE),
_exit(false) {
reclaimer = photon::thread_create11([this] {
while (!_exit) {
reclaim_queue.recv();
}
});
photon::thread_enable_join(reclaimer);
}

~ObjectCacheV2() {
_timer.stop();
if (reclaimer) {
_exit = true;
reclaim_queue.template send<PhotonPause>(nullptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to use the template keyword?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary.

Calling template member function needs a template prefix. The send<P> member function has default template argument ThreadPause, but here has to call with argument PhotonPause, makes here have to be with a template prefix.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have following code in RPC's exmaple:

    ExampleServer()
        : skeleton(photon::rpc::new_skeleton()),
          server(photon::net::new_tcp_socket_server()) {
        skeleton->register_service<Testrun, Heartbeat, Echo, ReadBuffer,
                                   WriteBuffer>(this);
    }

We call the template member function, register_service<...> without a template keyword.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but those code in RPC is a part of normal class.

Here is in template define.

ignore that template prefix will leads to erros

error: expected primary-expression before ‘>’ token

in GCC and

Use 'template' keyword to treat 'send' as a dependent template name (fix available)clang(missing_dependent_template_keyword)

in clang

photon::thread_join((photon::join_handle*)reclaimer);
reclaimer = nullptr;
}
SCOPED_LOCK(maplock);
cycle_list.node = nullptr;
}
};
4 changes: 4 additions & 0 deletions common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ add_executable(test-objcache test_objcache.cpp)
target_link_libraries(test-objcache PRIVATE photon_shared)
add_test(NAME test-objcache COMMAND $<TARGET_FILE:test-objcache>)

add_executable(perf-objcache perf_objcache.cpp)
target_link_libraries(perf-objcache PRIVATE photon_shared)
add_test(NAME perf-objcache COMMAND $<TARGET_FILE:test-objcache>)

add_executable(test-common test.cpp)
target_link_libraries(test-common PRIVATE photon_shared)
add_test(NAME test-common COMMAND $<TARGET_FILE:test-common>)
Expand Down
Loading
Loading