From d73db67a5fe406f4b8507c5fac516902a048043c Mon Sep 17 00:00:00 2001 From: dentiny Date: Sat, 11 Jan 2025 00:58:06 +0000 Subject: [PATCH 1/4] get or create for lru cache Signed-off-by: dentiny --- .../benchmarks/distributed/test_many_tasks.py | 2 +- src/ray/core_worker/core_worker.cc | 17 ++-- src/ray/core_worker/core_worker.h | 3 + src/ray/util/shared_lru.h | 78 ++++++++++++++++++- src/ray/util/tests/shared_lru_test.cc | 37 +++++++++ 5 files changed, 127 insertions(+), 10 deletions(-) diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 6f524cf4e3ea5..941795d19b27b 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -18,7 +18,7 @@ def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task) + @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) def task(): time.sleep(sleep_time) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5c64acbaeb9d6..a4075a3159e6c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2308,19 +2308,22 @@ json CoreWorker::OverrideRuntimeEnv(const json &child, return result_runtime_env; } +std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvInfo( + const std::string &serialized_runtime_env_info) const { + auto factory = [this](const std::string &serialized_runtime_env_info) { + return OverrideTaskOrActorRuntimeEnvInfoImpl(serialized_runtime_env_info); + }; + return runtime_env_json_serialization_cache_.GetOrCreate(serialized_runtime_env_info, + std::move(factory)); +} + // TODO(hjiang): Current implementation is not the most ideal version, since it acquires a // global lock for all operations; it's acceptable for now since no heavy-lifted operation // is involved (considering the overall scheduling overhead is single-digit millisecond // magnitude). But a better solution is LRU cache native providing a native support for // sharding and `GetOrCreate` API. -std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvInfo( +std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvInfoImpl( const std::string &serialized_runtime_env_info) const { - if (auto cached_runtime_env_info = - runtime_env_json_serialization_cache_.Get(serialized_runtime_env_info); - cached_runtime_env_info != nullptr) { - return cached_runtime_env_info; - } - // TODO(Catch-Bull,SongGuyang): task runtime env not support the field eager_install // yet, we will overwrite the filed eager_install when it did. std::shared_ptr parent = nullptr; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a2546ca244b93..74ea9e0aad548 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1384,6 +1384,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::shared_ptr OverrideTaskOrActorRuntimeEnvInfo( const std::string &serialized_runtime_env_info) const; + std::shared_ptr OverrideTaskOrActorRuntimeEnvInfoImpl( + const std::string &serialized_runtime_env_info) const; + void BuildCommonTaskSpec( TaskSpecBuilder &builder, const JobID &job_id, diff --git a/src/ray/util/shared_lru.h b/src/ray/util/shared_lru.h index ec570b383162c..9858dba236d8a 100644 --- a/src/ray/util/shared_lru.h +++ b/src/ray/util/shared_lru.h @@ -27,11 +27,11 @@ // // Check and consume `val`. // // TODO(hjiang): -// 1. Add a `GetOrCreate` interface, which takes factory function to creation value. -// 2. For thread-safe cache, add a sharded container wrapper to reduce lock contention. +// For thread-safe cache, add a sharded container wrapper to reduce lock contention. #pragma once +#include #include #include #include @@ -194,6 +194,69 @@ class ThreadSafeSharedLruCache final { return cache_.Get(std::forward(key)); } + // Get or creation for cached key-value pairs. + // + // WARNING: Currently factory cannot have exception thrown. + // TODO(hjiang): [factory] should support template. + template + std::shared_ptr GetOrCreate(KeyLike &&key, + std::function(Key)> factory) { + std::shared_ptr creation_token; + + { + std::unique_lock lck(mu_); + auto cached_val = cache_.Get(key); + if (cached_val != nullptr) { + return cached_val; + } + + auto creation_iter = ongoing_creation_.find(key); + + // Another thread has requested for the same key-value pair, simply wait for its + // completion. + if (creation_iter != ongoing_creation_.end()) { + creation_token = creation_iter->second; + ++creation_token->count; + creation_token->cv.wait(lck, [creation_token = creation_token.get()]() { + return creation_token->val != nullptr; + }); + + // Creation finished. + auto val = creation_token->val; + --creation_token->count; + if (creation_token->count == 0) { + // [creation_iter] could be invalidated here due to new insertion/deletion. + ongoing_creation_.erase(key); + } + return val; + } + + // Current thread is the first one to request for the key-value pair, perform + // factory function. + creation_iter = + ongoing_creation_.emplace(key, std::make_shared()).first; + creation_token = creation_iter->second; + creation_token->count = 1; + } + + // Place factory out of critical section. + std::shared_ptr val = factory(key); + + { + std::lock_guard lck(mu_); + cache_.Put(key, val); + creation_token->val = val; + creation_token->cv.notify_all(); + int new_count = --creation_token->count; + if (new_count == 0) { + // [creation_iter] could be invalidated here due to new insertion/deletion. + ongoing_creation_.erase(key); + } + } + + return val; + } + // Clear the cache. void Clear() { std::lock_guard lck(mu_); @@ -204,8 +267,19 @@ class ThreadSafeSharedLruCache final { size_t max_entries() const { return cache_.max_entries(); } private: + struct CreationToken { + std::condition_variable cv; + // Nullptr indicate creation unfinished. + std::shared_ptr val; + // Counter for ongoing creation. + int count = 0; + }; + std::mutex mu_; SharedLruCache cache_; + + // Ongoing creation. + absl::flat_hash_map> ongoing_creation_; }; // Same interfaces as `SharedLruCache`, but all cached values are diff --git a/src/ray/util/tests/shared_lru_test.cc b/src/ray/util/tests/shared_lru_test.cc index 4bfba9cfe68de..ff90fa3321d07 100644 --- a/src/ray/util/tests/shared_lru_test.cc +++ b/src/ray/util/tests/shared_lru_test.cc @@ -16,7 +16,9 @@ #include +#include #include +#include #include namespace ray::utils::container { @@ -80,6 +82,41 @@ TEST(SharedLruCache, SameKeyTest) { EXPECT_EQ(2, *val); } +TEST(SharedLruCache, FactoryTest) { + using CacheType = ThreadSafeSharedLruCache; + + std::atomic invoked = {false}; // Used to check only invoke once. + auto factory = [&invoked](const std::string &key) -> std::shared_ptr { + EXPECT_FALSE(invoked.exchange(true)); + // Sleep for a while so multiple threads could kick in and get blocked. + std::this_thread::sleep_for(std::chrono::seconds(3)); + return std::make_shared(key); + }; + + CacheType cache{1}; + + constexpr size_t kFutureNum = 100; + std::vector>> futures; + futures.reserve(kFutureNum); + + const std::string key = "key"; + for (size_t idx = 0; idx < kFutureNum; ++idx) { + futures.emplace_back(std::async(std::launch::async, [&cache, &key, &factory]() { + return cache.GetOrCreate(key, factory); + })); + } + for (auto &fut : futures) { + auto val = fut.get(); + ASSERT_NE(val, nullptr); + ASSERT_EQ(*val, key); + } + + // After we're sure key-value pair exists in cache, make one more call. + auto cached_val = cache.GetOrCreate(key, factory); + ASSERT_NE(cached_val, nullptr); + ASSERT_EQ(*cached_val, key); +} + TEST(SharedLruConstCache, TypeAliasAssertion) { static_assert( std::is_same_v, SharedLruCache>); From 3df76db5b3a0d91cd6fbc9ca294f40f487eb7be6 Mon Sep 17 00:00:00 2001 From: dentiny Date: Sat, 8 Feb 2025 19:40:50 +0000 Subject: [PATCH 2/4] remove todo Signed-off-by: dentiny --- release/benchmarks/distributed/test_many_tasks.py | 2 +- src/ray/core_worker/core_worker.cc | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 941795d19b27b..6f524cf4e3ea5 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -18,7 +18,7 @@ def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) + @ray.remote(num_cpus=cpus_per_task) def task(): time.sleep(sleep_time) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a4075a3159e6c..146cd344b2ed1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2317,11 +2317,6 @@ std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvIn std::move(factory)); } -// TODO(hjiang): Current implementation is not the most ideal version, since it acquires a -// global lock for all operations; it's acceptable for now since no heavy-lifted operation -// is involved (considering the overall scheduling overhead is single-digit millisecond -// magnitude). But a better solution is LRU cache native providing a native support for -// sharding and `GetOrCreate` API. std::shared_ptr CoreWorker::OverrideTaskOrActorRuntimeEnvInfoImpl( const std::string &serialized_runtime_env_info) const { // TODO(Catch-Bull,SongGuyang): task runtime env not support the field eager_install From 6c5b7b23896a8c285dff55ad094e74603f2ec9d4 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 10 Feb 2025 00:37:23 +0000 Subject: [PATCH 3/4] address comment Signed-off-by: dentiny --- src/ray/util/shared_lru.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ray/util/shared_lru.h b/src/ray/util/shared_lru.h index 9858dba236d8a..afb52f1575249 100644 --- a/src/ray/util/shared_lru.h +++ b/src/ray/util/shared_lru.h @@ -199,8 +199,8 @@ class ThreadSafeSharedLruCache final { // WARNING: Currently factory cannot have exception thrown. // TODO(hjiang): [factory] should support template. template - std::shared_ptr GetOrCreate(KeyLike &&key, - std::function(Key)> factory) { + std::shared_ptr GetOrCreate( + KeyLike &&key, std::function(const Key &)> factory) { std::shared_ptr creation_token; { @@ -222,13 +222,12 @@ class ThreadSafeSharedLruCache final { }); // Creation finished. - auto val = creation_token->val; --creation_token->count; if (creation_token->count == 0) { // [creation_iter] could be invalidated here due to new insertion/deletion. ongoing_creation_.erase(key); } - return val; + return creation_token->val; } // Current thread is the first one to request for the key-value pair, perform From 250a58178a64411e402ae1dd996d985e14755556 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 10 Feb 2025 20:20:54 +0000 Subject: [PATCH 4/4] comment Signed-off-by: dentiny --- src/ray/core_worker/core_worker.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 74ea9e0aad548..bbb1bc2123edc 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1384,6 +1384,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::shared_ptr OverrideTaskOrActorRuntimeEnvInfo( const std::string &serialized_runtime_env_info) const; + // Used as the factory function for [OverrideTaskOrActorRuntimeEnvInfo] to create in LRU + // cache. std::shared_ptr OverrideTaskOrActorRuntimeEnvInfoImpl( const std::string &serialized_runtime_env_info) const;