diff --git a/src/job_cache/daemon_cache.cpp b/src/job_cache/daemon_cache.cpp index 4595c7e2c..5da3289ff 100644 --- a/src/job_cache/daemon_cache.cpp +++ b/src/job_cache/daemon_cache.cpp @@ -653,8 +653,9 @@ struct CacheDbImpl { OutputSymlinks output_symlinks; Transaction transact; SelectMatchingJobs matching_jobs; + std::unique_ptr policy; - CacheDbImpl(const std::string &_dir) + CacheDbImpl(EvictionConfig config, const std::string &_dir) : db(std::make_unique(_dir)), jobs(db), input_files(db), @@ -663,7 +664,21 @@ struct CacheDbImpl { output_dirs(db), output_symlinks(db), transact(db), - matching_jobs(db) {} + matching_jobs(db) { + switch (config.type) { + case EvictionPolicyType::TTL: + wcl::log::info("Using TTL eviction policy, seconds_to_live = %lu", + config.ttl.seconds_to_live)(); + policy = std::make_unique(config.ttl.seconds_to_live); + break; + case EvictionPolicyType::LRU: + wcl::log::info("Using LRU eviction policy, low = %lu, max = %lu", config.lru.low_size, + config.lru.max_size)(); + policy = std::make_unique(config.lru.low_size, config.lru.max_size); + break; + } + policy->init(db, _dir); + } }; DaemonCache::DaemonCache(std::string dir, std::string bulk_dir, EvictionConfig config) @@ -679,9 +694,7 @@ DaemonCache::DaemonCache(std::string dir, std::string bulk_dir, EvictionConfig c key = rng.unique_name(); listen_socket_fd = create_cache_socket(".", key); - impl = std::make_unique("."); - - launch_evict_loop(); + impl = std::make_unique(config, "."); } int DaemonCache::run() { @@ -983,17 +996,7 @@ FindJobResponse DaemonCache::read(const FindJobRequest &find_request) { redirect_path(input_dir); } - EvictionCommand cmd(EvictionCommandType::Read, job_id); - - std::string msg = cmd.serialize(); - msg += '\0'; - - wcl::log::info("Sending Read command to eviction loop")(); - if (write(evict_stdin, msg.data(), msg.size()) == -1) { - wcl::log::warning("Failed to send eviction update: %s", strerror(errno))(); - } else { - wcl::log::info("Successfully sent eviction the job")(); - } + impl->policy->read(job_id); return FindJobResponse(wcl::make_some(std::move(result))); } @@ -1078,102 +1081,10 @@ void DaemonCache::add(const AddJobRequest &add_request) { std::string job_dir = wcl::join_paths(job_group_dir, std::to_string(job_id)); rename_no_fail(tmp_job_dir.c_str(), job_dir.c_str()); - EvictionCommand cmd(EvictionCommandType::Write, job_id); - - std::string msg = cmd.serialize(); - msg += '\0'; - - wcl::log::info("Sending Write command to eviction loop")(); - if (write(evict_stdin, msg.data(), msg.size()) == -1) { - wcl::log::warning("Failed to send eviction update: %s", strerror(errno))(); - } else { - wcl::log::info("Sucessfully sent eviction add update")(); - } -} - -void DaemonCache::launch_evict_loop() { - const size_t read_side = 0; - const size_t write_side = 1; - - int stdinPipe[2]; - int stdoutPipe[2]; - - if (pipe(stdinPipe) < 0) { - wcl::log::error("Failed to allocate eviction pipe: %s", strerror(errno)).urgent()(); - exit(1); - } - - if (pipe(stdoutPipe) < 0) { - wcl::log::error("Failed to allocate eviction pipe: %s", strerror(errno)).urgent()(); - exit(1); - } - - int pid = fork(); - - // error forking - if (pid < 0) { - wcl::log::error("Failed to fork eviction process: %s", strerror(errno)).urgent()(); - exit(1); - } - - // child - if (pid == 0) { - if (dup2(stdinPipe[read_side], STDIN_FILENO) == -1) { - wcl::log::error("Failed to dup2 stdin pipe for eviction process: %s", strerror(errno)) - .urgent()(); - exit(1); - } - - if (dup2(stdoutPipe[write_side], STDOUT_FILENO) == -1) { - wcl::log::error("Failed to dup2 stdin pipe for eviction process: %s", strerror(errno)) - .urgent()(); - exit(1); - } - - close(stdinPipe[read_side]); - close(stdinPipe[write_side]); - close(stdoutPipe[read_side]); - close(stdoutPipe[write_side]); - - wcl::log::info("Launching eviction loop")(); - - // Finally enter the eviction loop, if it exits cleanly - // go ahead and exit with its result. - std::unique_ptr policy; - switch (config.type) { - case EvictionPolicyType::TTL: - wcl::log::info("Using TTL eviction policy, seconds_to_live = %lu", - config.ttl.seconds_to_live)(); - policy = std::make_unique(config.ttl.seconds_to_live); - break; - case EvictionPolicyType::LRU: - wcl::log::info("Using LRU eviction policy, low = %lu, max = %lu", config.lru.low_size, - config.lru.max_size)(); - policy = std::make_unique(config.lru.low_size, config.lru.max_size); - break; - } - int result = eviction_loop(".", std::move(policy)); - exit(result); - } - - // parent - if (pid > 0) { - close(stdinPipe[read_side]); - close(stdoutPipe[write_side]); - - evict_pid = pid; - evict_stdin = stdinPipe[write_side]; - evict_stdout = stdoutPipe[read_side]; - } -} - -void DaemonCache::reap_evict_loop() { - close(evict_stdin); - close(evict_stdout); - waitpid(evict_pid, nullptr, 0); + impl->policy->write(job_id); } -DaemonCache::~DaemonCache() { reap_evict_loop(); } +DaemonCache::~DaemonCache() {} void DaemonCache::handle_new_client() { // Accept the new client socket. We accept as non-blocking so that we can diff --git a/src/job_cache/daemon_cache.h b/src/job_cache/daemon_cache.h index 7f70f2461..c2ede8df3 100644 --- a/src/job_cache/daemon_cache.h +++ b/src/job_cache/daemon_cache.h @@ -52,9 +52,6 @@ class DaemonCache { std::unordered_map message_senders; bool exit_now = false; - void launch_evict_loop(); - void reap_evict_loop(); - FindJobResponse read(const FindJobRequest &find_request); void add(const AddJobRequest &add_request); void remove_corrupt_job(int64_t job_id); diff --git a/src/job_cache/eviction_policy.cpp b/src/job_cache/eviction_policy.cpp index 6f3785b0b..94a3de30c 100644 --- a/src/job_cache/eviction_policy.cpp +++ b/src/job_cache/eviction_policy.cpp @@ -398,13 +398,10 @@ static void garbage_collect_orphan_folders(std::shared_ptr } } -void LRUEvictionPolicy::init(const std::string& cache_dir) { - std::shared_ptr db = std::make_unique(cache_dir); +void LRUEvictionPolicy::init(std::shared_ptr db, + const std::string& cache_dir) { impl = std::make_unique(cache_dir, db); - - // To keep this thread alive, we assign it to a static thread object. - // This starts the collection but if the programs ends so too will this thread. - gc_thread = std::thread(garbage_collect_orphan_folders, db); + garbage_collect_orphan_folders(db); } void LRUEvictionPolicy::read(int job_id) { impl->mark_new_use(job_id); } @@ -427,14 +424,11 @@ LRUEvictionPolicy::LRUEvictionPolicy(uint64_t low, uint64_t max) LRUEvictionPolicy::~LRUEvictionPolicy() {} -void TTLEvictionPolicy::init(const std::string& cache_dir) { - std::shared_ptr db = std::make_unique(cache_dir); +void TTLEvictionPolicy::init(std::shared_ptr db, + const std::string& cache_dir) { impl = std::make_unique(cache_dir, db); impl->cleanup(seconds_to_live); - - // To keep this thread alive, we assign it to a static thread object. - // This starts the collection but if the programs ends so too will this thread. - gc_thread = std::thread(garbage_collect_orphan_folders, db); + garbage_collect_orphan_folders(db); } void TTLEvictionPolicy::read(int job_id) {} @@ -453,36 +447,4 @@ TTLEvictionPolicy::TTLEvictionPolicy(uint64_t seconds_to_live) : seconds_to_live TTLEvictionPolicy::~TTLEvictionPolicy() {} -int eviction_loop(const std::string& cache_dir, std::unique_ptr policy) { - policy->init(cache_dir); - - // Famous last words "a timeout of a year is plenty" - MessageParser msg_parser(STDIN_FILENO, 60 * 60 * 24 * 30 * 12); - MessageParserState state; - do { - std::vector msgs; - state = msg_parser.read_messages(msgs); - - for (const auto& m : msgs) { - auto cmd = EvictionCommand::parse(m); - if (!cmd) { - exit(EXIT_FAILURE); - } - - switch (cmd->type) { - case EvictionCommandType::Read: - policy->read(cmd->job_id); - break; - case EvictionCommandType::Write: - policy->write(cmd->job_id); - break; - default: - exit(EXIT_FAILURE); - } - } - } while (state == MessageParserState::Continue); - - exit(state == MessageParserState::StopSuccess ? EXIT_SUCCESS : EXIT_FAILURE); -} - } // namespace job_cache diff --git a/src/job_cache/eviction_policy.h b/src/job_cache/eviction_policy.h index 8714972c2..14213d7fd 100644 --- a/src/job_cache/eviction_policy.h +++ b/src/job_cache/eviction_policy.h @@ -24,19 +24,21 @@ #include #include #include -#include + +#include "job_cache/db_helpers.h" namespace job_cache { struct EvictionPolicy { - virtual void init(const std::string& cache_dir) = 0; + virtual void init(std::shared_ptr db, const std::string& cache_dir) = 0; virtual void read(int id) = 0; virtual void write(int id) = 0; virtual ~EvictionPolicy() {} }; struct NilEvictionPolicy : EvictionPolicy { - virtual void init(const std::string& cache_dir) override { + virtual void init(std::shared_ptr db, + const std::string& cache_dir) override { std::cerr << "NilEvictionPolicy::init()" << std::endl; } @@ -56,7 +58,6 @@ class LRUEvictionPolicy : public EvictionPolicy { std::unique_ptr impl; uint64_t max_cache_size; uint64_t low_cache_size; - std::thread gc_thread; public: explicit LRUEvictionPolicy(uint64_t low_cache_size, uint64_t max_cache_size); @@ -65,7 +66,7 @@ class LRUEvictionPolicy : public EvictionPolicy { LRUEvictionPolicy(LRUEvictionPolicy&&) = delete; virtual ~LRUEvictionPolicy(); - virtual void init(const std::string& cache_dir) override; + virtual void init(std::shared_ptr db, const std::string& cache_dir) override; virtual void read(int id) override; @@ -78,7 +79,6 @@ class TTLEvictionPolicy : public EvictionPolicy { // We need to touch the database so we use pimpl to hide the implementation std::unique_ptr impl; uint64_t seconds_to_live; - std::thread gc_thread; public: explicit TTLEvictionPolicy(uint64_t seconds_to_live); @@ -87,13 +87,11 @@ class TTLEvictionPolicy : public EvictionPolicy { TTLEvictionPolicy(TTLEvictionPolicy&&) = delete; virtual ~TTLEvictionPolicy(); - virtual void init(const std::string& cache_dir) override; + virtual void init(std::shared_ptr db, const std::string& cache_dir) override; virtual void read(int id) override; virtual void write(int id) override; }; -int eviction_loop(const std::string& cache_dir, std::unique_ptr policy); - } // namespace job_cache