From 4c7455977e3ea4598658e3332f82b0b76d6cbb9b Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Thu, 25 May 2023 12:12:58 +0200 Subject: [PATCH 1/6] MINIFICPP-2129 Refactor threadpool --- extensions/rocksdb-repos/FlowFileLoader.cpp | 6 +- extensions/rocksdb-repos/FlowFileLoader.h | 2 +- libminifi/include/CronDrivenSchedulingAgent.h | 2 +- .../include/EventDrivenSchedulingAgent.h | 2 +- libminifi/include/FlowController.h | 2 +- libminifi/include/SchedulingAgent.h | 4 +- libminifi/include/ThreadedSchedulingAgent.h | 2 +- .../include/TimerDrivenSchedulingAgent.h | 2 +- libminifi/include/c2/C2Agent.h | 2 +- libminifi/include/core/Processor.h | 1 + libminifi/include/utils/Monitors.h | 82 ++++------------ libminifi/include/utils/ThreadPool.h | 71 +++++--------- libminifi/src/CronDrivenSchedulingAgent.cpp | 3 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 2 +- libminifi/src/FlowController.cpp | 2 +- libminifi/src/ThreadedSchedulingAgent.cpp | 17 +--- libminifi/src/TimerDrivenSchedulingAgent.cpp | 2 +- libminifi/src/c2/C2Agent.cpp | 5 +- libminifi/src/utils/ThreadPool.cpp | 82 ++++++---------- libminifi/test/unit/BackTraceTests.cpp | 96 ++++++++----------- libminifi/test/unit/SchedulingAgentTests.cpp | 57 ++++++----- libminifi/test/unit/SocketTests.cpp | 32 ++++--- libminifi/test/unit/ThreadPoolTests.cpp | 64 ++++--------- nanofi/include/cxx/Instance.h | 5 +- 24 files changed, 209 insertions(+), 336 deletions(-) diff --git a/extensions/rocksdb-repos/FlowFileLoader.cpp b/extensions/rocksdb-repos/FlowFileLoader.cpp index 983cae4494..133f7ecda2 100644 --- a/extensions/rocksdb-repos/FlowFileLoader.cpp +++ b/extensions/rocksdb-repos/FlowFileLoader.cpp @@ -41,11 +41,11 @@ FlowFileLoader::~FlowFileLoader() { std::future FlowFileLoader::load(std::vector flow_files) { auto promise = std::make_shared>(); std::future future = promise->get_future(); - utils::Worker task{[this, flow_files = std::move(flow_files), promise = std::move(promise)] { + utils::Worker task{[this, flow_files = std::move(flow_files), promise = std::move(promise)] { return loadImpl(flow_files, promise); }, - "", // doesn't matter that tasks alias by name, as we never actually query their status or stop a single task - std::make_unique()}; + ""}; // doesn't matter that tasks alias by name, as we never actually query their status or stop a single task + // the dummy_future is for the return value of the Worker's lambda, rerunning this lambda // depends on run_determinant + result // we could create a custom run_determinant to instead determine if/when it should be rerun diff --git a/extensions/rocksdb-repos/FlowFileLoader.h b/extensions/rocksdb-repos/FlowFileLoader.h index 9c7ee6d514..2287c7ddfa 100644 --- a/extensions/rocksdb-repos/FlowFileLoader.h +++ b/extensions/rocksdb-repos/FlowFileLoader.h @@ -52,7 +52,7 @@ class FlowFileLoader { private: utils::TaskRescheduleInfo loadImpl(const std::vector& flow_files, const std::shared_ptr>& output); - utils::ThreadPool thread_pool_{thread_count_, false, nullptr, "FlowFileLoaderThreadPool"}; + utils::ThreadPool thread_pool_{thread_count_, nullptr, "FlowFileLoaderThreadPool"}; gsl::not_null db_; diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h index 88ee42e6b4..7d422e7855 100644 --- a/libminifi/include/CronDrivenSchedulingAgent.h +++ b/libminifi/include/CronDrivenSchedulingAgent.h @@ -41,7 +41,7 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent { std::shared_ptr flow_repo, std::shared_ptr content_repo, std::shared_ptr configuration, - utils::ThreadPool& thread_pool) + utils::ThreadPool& thread_pool) : ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) { } diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index 07a173059e..f8664cc258 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -37,7 +37,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { public: EventDrivenSchedulingAgent(const gsl::not_null controller_service_provider, std::shared_ptr repo, std::shared_ptr flow_repo, std::shared_ptr content_repo, std::shared_ptr configuration, - utils::ThreadPool &thread_pool) + utils::ThreadPool &thread_pool) : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) { using namespace std::literals::chrono_literals; diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 1d3cbf9e52..84724920f7 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -200,7 +200,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); // Thread pool for schedulers - utils::ThreadPool thread_pool_; + utils::ThreadPool thread_pool_; }; } // namespace org::apache::nifi::minifi diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index fec5e9598f..4911f5e90e 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -53,7 +53,7 @@ namespace org::apache::nifi::minifi { class SchedulingAgent { public: SchedulingAgent(const gsl::not_null controller_service_provider, std::shared_ptr repo, std::shared_ptr flow_repo, - std::shared_ptr content_repo, std::shared_ptr configuration, utils::ThreadPool &thread_pool) + std::shared_ptr content_repo, std::shared_ptr configuration, utils::ThreadPool& thread_pool) : admin_yield_duration_(), bored_yield_duration_(0), configure_(configuration), @@ -122,7 +122,7 @@ class SchedulingAgent { std::shared_ptr flow_repo_; std::shared_ptr content_repo_; - utils::ThreadPool &thread_pool_; + utils::ThreadPool& thread_pool_; gsl::not_null controller_service_provider_; private: diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index e512791544..14a7039481 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -40,7 +40,7 @@ class ThreadedSchedulingAgent : public SchedulingAgent { public: ThreadedSchedulingAgent(const gsl::not_null controller_service_provider, std::shared_ptr repo, std::shared_ptr flow_repo, std::shared_ptr content_repo, - std::shared_ptr configuration, utils::ThreadPool &thread_pool) + std::shared_ptr configuration, utils::ThreadPool &thread_pool) : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) { } ~ThreadedSchedulingAgent() override = default; diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 9e74fec2df..b91522f964 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -32,7 +32,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { public: TimerDrivenSchedulingAgent(const gsl::not_null controller_service_provider, std::shared_ptr repo, std::shared_ptr flow_repo, std::shared_ptr content_repo, std::shared_ptr configure, - utils::ThreadPool &thread_pool) + utils::ThreadPool &thread_pool) : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) { } diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index 1c2e27b510..92d86afb3c 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -227,7 +227,7 @@ class C2Agent : public state::UpdateController { std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); - utils::ThreadPool thread_pool_; + utils::ThreadPool thread_pool_; std::vector task_ids_; diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index f2266ea4ba..ee3141423d 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -176,6 +176,7 @@ class Processor : public Connectable, public ConfigurableComponent, public state void clearYield(); + std::chrono::steady_clock::time_point getYieldExpirationTime() const { return yield_expiration_; } std::chrono::steady_clock::duration getYieldTime() const; // Whether flow file queue full in any of the outgoing connection bool flowFilesOutGoingFull() const; diff --git a/libminifi/include/utils/Monitors.h b/libminifi/include/utils/Monitors.h index aed00ce2d3..10e7411c46 100644 --- a/libminifi/include/utils/Monitors.h +++ b/libminifi/include/utils/Monitors.h @@ -23,90 +23,50 @@ #if defined(WIN32) #include // This is required to work around a VS2017 bug, see the details below #endif -#include "utils/gsl.h" namespace org::apache::nifi::minifi::utils { -/** - * Worker task helper that determines - * whether or not we will run - */ -template -class AfterExecute { +class TaskRescheduleInfo { public: - virtual ~AfterExecute() = default; - - AfterExecute() = default; - AfterExecute(AfterExecute&& /*other*/) noexcept = default; - virtual bool isFinished(const T &result) = 0; - virtual bool isCancelled(const T &result) = 0; - /** - * Time to wait before re-running this task if necessary - * @return milliseconds since epoch after which we are eligible to re-run this task. - */ - virtual std::chrono::steady_clock::duration wait_time() = 0; -}; - -/** - * Uses the wait time for a given worker to determine if it is eligible to run - */ + TaskRescheduleInfo(bool result, std::chrono::steady_clock::time_point next_execution_time) + : next_execution_time_(next_execution_time), finished_(result) {} -struct TaskRescheduleInfo { - TaskRescheduleInfo(bool result, std::chrono::steady_clock::duration wait_time) - : wait_time_(wait_time), finished_(result) { - gsl_Expects(wait_time >= std::chrono::milliseconds(0)); + static TaskRescheduleInfo Done() { + return {true, std::chrono::steady_clock::time_point::min()}; } - std::chrono::steady_clock::duration wait_time_; - bool finished_; + static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point next_execution_time) { + return {false, next_execution_time}; + } - static TaskRescheduleInfo Done() { - return {true, std::chrono::steady_clock::duration(0)}; + static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration duration) { + return {false, std::chrono::steady_clock::now()+duration}; } - static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration interval) { - return {false, interval}; + static TaskRescheduleInfo RetryImmediately() { + return {false, std::chrono::steady_clock::time_point::min()}; } - static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point time_point) { - auto interval = std::max(time_point - std::chrono::steady_clock::now(), std::chrono::steady_clock::duration(0)); - return {false, interval}; + std::chrono::steady_clock::time_point getNextExecutionTime() const { + return next_execution_time_; } - static TaskRescheduleInfo RetryImmediately() { - return {false, std::chrono::steady_clock::duration(0)}; + bool isFinished() const { + return finished_; } + private: + std::chrono::steady_clock::time_point next_execution_time_; + bool finished_; + #if defined(WIN32) // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack. private: - TaskRescheduleInfo() : wait_time_(std::chrono::steady_clock::duration(0)), finished_(true) {} + TaskRescheduleInfo() : next_execution_time_(std::chrono::steady_clock::time_point::min()), finished_(true) {} friend class std::_Associated_state; #endif }; -class ComplexMonitor : public utils::AfterExecute { - public: - ComplexMonitor() = default; - - bool isFinished(const TaskRescheduleInfo &result) override { - if (result.finished_) { - return true; - } - current_wait_.store(result.wait_time_); - return false; - } - bool isCancelled(const TaskRescheduleInfo& /*result*/) override { - return false; - } - - std::chrono::steady_clock::duration wait_time() override { - return current_wait_.load(); - } - - private: - std::atomic current_wait_ {std::chrono::steady_clock::duration(0)}; -}; } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index dde8679095..78c7e0d417 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -50,27 +50,17 @@ using TaskId = std::string; * purpose: Provides a wrapper for the functor * and returns a future based on the template argument. */ -template class Worker { public: - explicit Worker(const std::function &task, const TaskId &identifier, std::unique_ptr> run_determinant) - : identifier_(identifier), + explicit Worker(const std::function &task, TaskId identifier) + : identifier_(std::move(identifier)), next_exec_time_(std::chrono::steady_clock::now()), - task(task), - run_determinant_(std::move(run_determinant)) { - promise = std::make_shared>(); + task(task) { + promise = std::make_shared>(); } - explicit Worker(const std::function &task, const TaskId &identifier) - : identifier_(identifier), - next_exec_time_(std::chrono::steady_clock::now()), - task(task), - run_determinant_(nullptr) { - promise = std::make_shared>(); - } - - explicit Worker(const TaskId& identifier = {}) - : identifier_(identifier), + explicit Worker(TaskId identifier = {}) + : identifier_(std::move(identifier)), next_exec_time_(std::chrono::steady_clock::now()) { } @@ -89,50 +79,40 @@ class Worker { * true == run again */ virtual bool run() { - T result = task(); - if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) { + TaskRescheduleInfo result = task(); + if (result.isFinished()) { promise->set_value(result); return false; } - next_exec_time_ = std::max(next_exec_time_, std::chrono::steady_clock::now() + run_determinant_->wait_time()); - return true; - } - virtual void setIdentifier(const TaskId& identifier) { - identifier_ = identifier; + next_exec_time_ = result.getNextExecutionTime(); + return true; } - virtual std::chrono::steady_clock::time_point getNextExecutionTime() const { + [[nodiscard]] virtual std::chrono::steady_clock::time_point getNextExecutionTime() const { return next_exec_time_; } - std::shared_ptr> getPromise() const; + [[nodiscard]] std::shared_ptr> getPromise() const { return promise; } - const TaskId &getIdentifier() const { + [[nodiscard]] const TaskId &getIdentifier() const { return identifier_; } protected: TaskId identifier_; std::chrono::steady_clock::time_point next_exec_time_; - std::function task; - std::unique_ptr> run_determinant_; - std::shared_ptr> promise; + std::function task; + std::shared_ptr> promise; }; -template class DelayedTaskComparator { public: - bool operator()(Worker &a, Worker &b) { + bool operator()(Worker &a, Worker &b) { return a.getNextExecutionTime() > b.getNextExecutionTime(); } }; -template -std::shared_ptr> Worker::getPromise() const { - return promise; -} - class WorkerThread { public: explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker") @@ -155,16 +135,15 @@ class WorkerThread { * ThreadPoolExecutor * Design: Locked control over a manager thread that controls the worker threads */ -template class ThreadPool { public: - ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, + ThreadPool(int max_worker_threads = 2, core::controller::ControllerServiceProvider* controller_service_provider = nullptr, std::string name = "NamelessPool"); - ThreadPool(const ThreadPool &other) = delete; - ThreadPool& operator=(const ThreadPool &other) = delete; - ThreadPool(ThreadPool &&other) = delete; - ThreadPool& operator=(ThreadPool &&other) = delete; + ThreadPool(const ThreadPool &other) = delete; + ThreadPool& operator=(const ThreadPool &other) = delete; + ThreadPool(ThreadPool &&other) = delete; + ThreadPool& operator=(ThreadPool &&other) = delete; ~ThreadPool() { shutdown(); @@ -177,7 +156,7 @@ class ThreadPool { * the worker task * @param future future to move new promise to */ - void execute(Worker &&task, std::future &future); + void execute(Worker &&task, std::future &future); /** * attempts to stop tasks with the provided identifier. @@ -286,8 +265,6 @@ class ThreadPool { } } -// determines if threads are detached - bool daemon_threads_; std::atomic thread_reduction_count_; // max worker threads int max_worker_threads_; @@ -310,8 +287,8 @@ class ThreadPool { // thread queue for the recently deceased threads. ConcurrentQueue> deceased_thread_queue_; // worker queue of worker objects - ConditionConcurrentQueue> worker_queue_; - std::priority_queue, std::vector>, DelayedTaskComparator> delayed_worker_queue_; + ConditionConcurrentQueue worker_queue_; + std::priority_queue, DelayedTaskComparator> delayed_worker_queue_; // mutex to protect task status and delayed queue std::mutex worker_queue_mutex_; // notification for new delayed tasks that's before the current ones diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp index fdc8857472..905c309edc 100644 --- a/libminifi/src/CronDrivenSchedulingAgent.cpp +++ b/libminifi/src/CronDrivenSchedulingAgent.cpp @@ -32,7 +32,6 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces using namespace std::literals::chrono_literals; using std::chrono::ceil; using std::chrono::seconds; - using std::chrono::milliseconds; using std::chrono::time_point_cast; using std::chrono::system_clock; @@ -58,7 +57,7 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces last_exec_[uuid] = current_time.get_local_time(); if (processor->isYield()) - return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time())) return utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time()); diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index b50d2d6f38..38073fa128 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -43,7 +43,7 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { - return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); } } return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 5f26a95629..451ac2d18c 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -54,7 +54,7 @@ FlowController::FlowController(std::shared_ptr provenance_repo : core::controller::ForwardingControllerServiceProvider(core::className()), running_(false), initialized_(false), - thread_pool_(5, false, nullptr, "Flowcontroller threadpool"), + thread_pool_(5, nullptr, "Flowcontroller threadpool"), configuration_(std::move(configure)), provenance_repo_(std::move(provenance_repo)), flow_file_repo_(std::move(flow_file_repo)), diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index fe6cf46ad5..7f48101623 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -34,17 +34,13 @@ #include "core/ProcessorNode.h" #include "core/ProcessContext.h" #include "core/ProcessContextBuilder.h" -#include "core/ProcessSession.h" #include "core/ProcessSessionFactory.h" #include "utils/ValueParser.h" using namespace std::literals::chrono_literals; -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { void ThreadedSchedulingAgent::schedule(core::Processor* processor) { std::lock_guard lock(mutex_); @@ -103,12 +99,8 @@ void ThreadedSchedulingAgent::schedule(core::Processor* processor) { return agent->run(processor, processContext, sessionFactory); }; - // create a functor that will be submitted to the thread pool. - utils::Worker functor(f_ex, processor->getUUIDStr(), std::make_unique()); - // move the functor into the thread pool. While a future is returned - // we aren't terribly concerned with the result. std::future future; - thread_pool_.execute(std::move(functor), future); + thread_pool_.execute(utils::Worker{f_ex, processor->getUUIDStr()}, future); } logger_->log_debug("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName()); processors_running_.insert(processor->getUUID()); @@ -141,7 +133,4 @@ void ThreadedSchedulingAgent::unschedule(core::Processor* processor) { processors_running_.erase(processor->getUUID()); } -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace org::apache::nifi::minifi diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 113cee8f89..cbe5c2306c 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -31,7 +31,7 @@ utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* proce auto trigger_start_time = std::chrono::steady_clock::now(); this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) - return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); return utils::TaskRescheduleInfo::RetryAfter(trigger_start_time + processor->getSchedulingPeriod()); } diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index b3b6fdef00..9891fb9dee 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -58,7 +58,7 @@ C2Agent::C2Agent(std::shared_ptr configuration, configuration_(std::move(configuration)), node_reporter_(std::move(node_reporter)), filesystem_(std::move(filesystem)), - thread_pool_(2, false, nullptr, "C2 threadpool"), + thread_pool_(2, nullptr, "C2 threadpool"), request_restart_(std::move(request_restart)), last_run_(std::chrono::steady_clock::now()) { if (!configuration_->getAgentClass()) { @@ -102,9 +102,8 @@ void C2Agent::start() { for (const auto& function : functions_) { utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate(); task_ids_.push_back(uuid); - utils::Worker functor(function, uuid.to_string(), std::make_unique()); std::future future; - thread_pool_.execute(std::move(functor), future); + thread_pool_.execute(utils::Worker{function, uuid.to_string()}, future); } controller_running_ = true; thread_pool_.start(); diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp index 06c2eb4e1b..a731b8d595 100644 --- a/libminifi/src/utils/ThreadPool.cpp +++ b/libminifi/src/utils/ThreadPool.cpp @@ -16,28 +16,26 @@ */ #include "utils/ThreadPool.h" -#include "core/state/UpdateController.h" + +using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::utils { -template -ThreadPool::ThreadPool(int max_worker_threads, bool daemon_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name) - : daemon_threads_(daemon_threads), - thread_reduction_count_(0), +ThreadPool::ThreadPool(int max_worker_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name) + : thread_reduction_count_(0), max_worker_threads_(max_worker_threads), adjust_threads_(false), running_(false), controller_service_provider_(controller_service_provider), name_(std::move(name)), - logger_(core::logging::LoggerFactory>::getLogger()) { + logger_(core::logging::LoggerFactory::getLogger()) { current_workers_ = 0; thread_manager_ = nullptr; } -template -void ThreadPool::run_tasks(const std::shared_ptr& thread) { +void ThreadPool::run_tasks(const std::shared_ptr& thread) { thread->is_running_ = true; while (running_.load()) { if (UNLIKELY(thread_reduction_count_ > 0)) { @@ -50,7 +48,7 @@ void ThreadPool::run_tasks(const std::shared_ptr& thread) { } } - Worker task; + Worker task; if (worker_queue_.dequeueWait(task)) { { std::unique_lock lock(worker_queue_mutex_); @@ -101,15 +99,14 @@ void ThreadPool::run_tasks(const std::shared_ptr& thread) { current_workers_--; } -template -void ThreadPool::manage_delayed_queue() { +void ThreadPool::manage_delayed_queue() { while (running_) { std::unique_lock lock(worker_queue_mutex_); // Put the tasks ready to run in the worker queue while (!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) { // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted. - Worker task = std::move(const_cast&>(delayed_worker_queue_.top())); + Worker task = std::move(const_cast(delayed_worker_queue_.top())); delayed_worker_queue_.pop(); worker_queue_.enqueue(std::move(task)); } @@ -122,30 +119,25 @@ void ThreadPool::manage_delayed_queue() { } } -template -void ThreadPool::execute(Worker &&task, std::future &future) { +void ThreadPool::execute(Worker &&task, std::future &future) { { std::unique_lock lock(worker_queue_mutex_); task_status_[task.getIdentifier()] = true; } - future = std::move(task.getPromise()->get_future()); + future = task.getPromise()->get_future(); worker_queue_.enqueue(std::move(task)); } -template -void ThreadPool::manageWorkers() { - for (int i = 0; i < max_worker_threads_; i++) { - std::stringstream thread_name; - thread_name << name_ << " #" << i; - auto worker_thread = std::make_shared(thread_name.str()); - worker_thread->thread_ = createThread([this, worker_thread] { run_tasks(worker_thread); }); - thread_queue_.push_back(worker_thread); - current_workers_++; - } - - if (daemon_threads_) { - for (auto &thread : thread_queue_) { - thread->thread_.detach(); +void ThreadPool::manageWorkers() { + { + std::unique_lock lock(worker_queue_mutex_); + for (int i = 0; i < max_worker_threads_; i++) { + std::stringstream thread_name; + thread_name << name_ << " #" << i; + auto worker_thread = std::make_shared(thread_name.str()); + worker_thread->thread_ = createThread([this, worker_thread] { run_tasks(worker_thread); }); + thread_queue_.push_back(worker_thread); + current_workers_++; } } @@ -171,9 +163,6 @@ void ThreadPool::manageWorkers() { std::unique_lock worker_queue_lock(worker_queue_mutex_); auto worker_thread = std::make_shared(); worker_thread->thread_ = createThread([this, worker_thread] { run_tasks(worker_thread); }); - if (daemon_threads_) { - worker_thread->thread_.detach(); - } thread_queue_.push_back(worker_thread); current_workers_++; } @@ -195,8 +184,7 @@ void ThreadPool::manageWorkers() { } } -template -std::shared_ptr ThreadPool::createThreadManager() const { +std::shared_ptr ThreadPool::createThreadManager() const { if (!controller_service_provider_) { return nullptr; } @@ -213,8 +201,7 @@ std::shared_ptr ThreadPool::createThrea return thread_manager_service; } -template -void ThreadPool::start() { +void ThreadPool::start() { std::lock_guard lock(manager_mutex_); if (!running_) { thread_manager_ = createThreadManager(); @@ -224,22 +211,21 @@ void ThreadPool::start() { manager_thread_ = std::thread(&ThreadPool::manageWorkers, this); std::lock_guard quee_lock(worker_queue_mutex_); - delayed_scheduler_thread_ = std::thread(&ThreadPool::manage_delayed_queue, this); + delayed_scheduler_thread_ = std::thread(&ThreadPool::manage_delayed_queue, this); } } -template -void ThreadPool::stopTasks(const TaskId &identifier) { +void ThreadPool::stopTasks(const TaskId &identifier) { std::unique_lock lock(worker_queue_mutex_); task_status_[identifier] = false; // remove tasks belonging to identifier from worker_queue_ - worker_queue_.remove([&] (const Worker& worker) { return worker.getIdentifier() == identifier; }); + worker_queue_.remove([&] (const Worker& worker) { return worker.getIdentifier() == identifier; }); // also remove from delayed_worker_queue_ decltype(delayed_worker_queue_) new_delayed_worker_queue; while (!delayed_worker_queue_.empty()) { - Worker task = std::move(const_cast&>(delayed_worker_queue_.top())); + Worker task = std::move(const_cast(delayed_worker_queue_.top())); delayed_worker_queue_.pop(); if (task.getIdentifier() != identifier) { new_delayed_worker_queue.push(std::move(task)); @@ -254,22 +240,19 @@ void ThreadPool::stopTasks(const TaskId &identifier) { }); } -template -void ThreadPool::resume() { +void ThreadPool::resume() { if (!worker_queue_.isRunning()) { worker_queue_.start(); } } -template -void ThreadPool::pause() { +void ThreadPool::pause() { if (worker_queue_.isRunning()) { worker_queue_.stop(); } } -template -void ThreadPool::shutdown() { +void ThreadPool::shutdown() { if (running_.load()) { std::lock_guard lock(manager_mutex_); running_.store(false); @@ -307,9 +290,4 @@ void ThreadPool::shutdown() { } } -template class utils::ThreadPool; -template class utils::ThreadPool; -template class utils::ThreadPool; -template class utils::ThreadPool; - } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp index 46363638dd..eccba681fc 100644 --- a/libminifi/test/unit/BackTraceTests.cpp +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -24,76 +24,60 @@ #include "utils/BackTrace.h" #include "utils/Monitors.h" #include "utils/ThreadPool.h" +#include "range/v3/algorithm/any_of.hpp" using namespace std::literals::chrono_literals; -class WorkerNumberExecutions : public utils::AfterExecute { - public: - explicit WorkerNumberExecutions(int tasks) - : tasks(tasks) { - } - - WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept - : runs(other.runs), - tasks(other.tasks) { - } - - bool isFinished(const int &result) override { - return result <= 0 || ++runs >= tasks; - } - bool isCancelled(const int& /*result*/) override { - return false; - } - - std::chrono::steady_clock::duration wait_time() override { - return 50ms; - } - - protected: - int runs{0}; - int tasks; -}; - TEST_CASE("BT1", "[TPT1]") { const BackTrace trace = TraceResolver::getResolver().getBackTrace("BT1"); #ifdef HAS_EXECINFO - REQUIRE(!trace.getTraces().empty()); + CHECK(!trace.getTraces().empty()); #endif } -TEST_CASE("BT2", "[TPT2]") { - std::atomic counter = 0; - utils::ThreadPool pool(4); - pool.start(); - std::this_thread::sleep_for(std::chrono::milliseconds(150)); - for (int i = 0; i < 3; i++) { - std::unique_ptr> after_execute = std::unique_ptr>(new WorkerNumberExecutions(5)); - utils::Worker functor([&counter]() { return ++counter; }, "id", std::move(after_execute)); +void inner_function(std::atomic_flag& ready_to_check, std::atomic_flag& checking_done) { + ready_to_check.test_and_set(); + ready_to_check.notify_all(); + checking_done.wait(false); +} - std::future fut; - pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) - } +void outer_function(std::atomic_flag& ready_to_check, std::atomic_flag& checking_done) { + inner_function(ready_to_check, checking_done); +} - std::unique_ptr> after_execute = std::unique_ptr>(new WorkerNumberExecutions(5)); - utils::Worker functor([&counter]() { return ++counter; }, "id", std::move(after_execute)); - std::future fut; - pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) +TEST_CASE("BT2", "[TPT2]") { + std::atomic_flag ready_for_checking; + std::atomic_flag done_with_checking; - std::vector traces = pool.getTraces(); - for (const auto &trace : traces) { - std::cerr << "Thread name: " << trace.getName() << std::endl; - const auto &trace_strings = trace.getTraces(); -#ifdef HAS_EXECINFO - REQUIRE(trace_strings.size() > 2); - for (const auto& trace_string : trace_strings) { - std::cerr << " - " << trace_string << std::endl; - } - if (trace_strings.at(0).find("sleep_for") != std::string::npos) { - REQUIRE(trace_strings.at(1).find("counterFunction") != std::string::npos); + constexpr std::string_view thread_pool_name = "Winnie the pool"; + constexpr size_t number_of_worker_threads = 3; + utils::ThreadPool pool(number_of_worker_threads, nullptr, thread_pool_name.data()); + utils::Worker worker([&]() -> utils::TaskRescheduleInfo { + outer_function(ready_for_checking, done_with_checking); + return utils::TaskRescheduleInfo::Done(); + }, "id"); + std::future future; + pool.execute(std::move(worker), future); + + pool.start(); + { + ready_for_checking.wait(false); + std::vector traces = pool.getTraces(); + CHECK(traces.size() <= number_of_worker_threads); + for (const auto &trace : traces) { + CHECK(trace.getName().starts_with(thread_pool_name)); } +#ifdef HAS_EXECINFO + auto first_worker_trace = traces.front(); + CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& trace_line) { return trace_line.find("run_tasks") != trace_line.npos; })); +#ifdef DEBUG + CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& trace_line) { return trace_line.find("outer_function") != trace_line.npos; })); + CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& trace_line) { return trace_line.find("inner_function") != trace_line.npos; })); #endif +#endif + done_with_checking.test_and_set(); + done_with_checking.notify_all(); } - fut.wait(); + future.wait(); } - diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp index c3cf6638af..b3ffed29dd 100644 --- a/libminifi/test/unit/SchedulingAgentTests.cpp +++ b/libminifi/test/unit/SchedulingAgentTests.cpp @@ -63,7 +63,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto controller_services_ = std::make_shared(); auto configuration = std::make_shared(); auto controller_services_provider_ = std::make_shared(controller_services_, configuration); - utils::ThreadPool thread_pool; + utils::ThreadPool thread_pool; auto count_proc = std::make_shared("count_proc"); count_proc->incrementActiveTasks(); count_proc->setScheduledState(core::RUNNING); @@ -79,21 +79,21 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto timer_driven_agent = std::make_shared(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); timer_driven_agent->start(); auto first_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.finished_); - CHECK(first_task_reschedule_info.wait_time_ <= 125ms); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 125ms); CHECK(count_proc->getNumberOfTriggers() == 1); count_proc->setOnTriggerDuration(50ms); auto second_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.finished_); - CHECK(second_task_reschedule_info.wait_time_ <= 75ms); + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 75ms); CHECK(count_proc->getNumberOfTriggers() == 2); count_proc->setOnTriggerDuration(150ms); auto third_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); - CHECK(!third_task_reschedule_info.finished_); - CHECK(third_task_reschedule_info.wait_time_ == 0ms); + CHECK(!third_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); CHECK(count_proc->getNumberOfTriggers() == 3); } @@ -101,33 +101,42 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto event_driven_agent = std::make_shared(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); event_driven_agent->start(); auto first_task_reschedule_info = event_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.finished_); - CHECK(first_task_reschedule_info.wait_time_ == 0ms); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); auto count_num_after_one_schedule = count_proc->getNumberOfTriggers(); CHECK(count_num_after_one_schedule > 100); auto second_task_reschedule_info = event_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.finished_); - CHECK(second_task_reschedule_info.wait_time_ == 0ms); + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(second_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); auto count_num_after_two_schedule = count_proc->getNumberOfTriggers(); CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100); } SECTION("Cron Driven every year") { +#ifdef WIN32 + date::set_install(TZ_DATA_DIR); +#endif count_proc->setCronPeriod("0 0 0 1 1 ?"); auto cron_driven_agent = std::make_shared(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); cron_driven_agent->start(); auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.finished_); - if (first_task_reschedule_info.wait_time_ > 1min) { // To avoid possibly failing around dec 31 23:59:59 - auto next_run_time_point = std::chrono::round(std::chrono::system_clock::now() + first_task_reschedule_info.wait_time_); - CHECK(next_run_time_point == std::chrono::ceil(std::chrono::system_clock::now())); + CHECK(!first_task_reschedule_info.isFinished()); + if (first_task_reschedule_info.getNextExecutionTime() > std::chrono::steady_clock::now() + 1min) { // To avoid possibly failing around dec 31 23:59:59 + auto wait_time_till_next_execution_time = std::chrono::round(first_task_reschedule_info.getNextExecutionTime() - std::chrono::steady_clock::now()); + + auto current_time = date::make_zoned(date::current_zone(), std::chrono::time_point_cast(std::chrono::system_clock::now())); + auto current_year_month_day = date::year_month_day(date::floor(current_time.get_local_time())); + auto new_years_day = date::make_zoned(date::current_zone(), date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1}); + + auto time_until_new_years_day = new_years_day.get_local_time() - current_time.get_local_time(); + + CHECK(std::chrono::round(time_until_new_years_day - wait_time_till_next_execution_time) == 0min); CHECK(count_proc->getNumberOfTriggers() == 0); auto second_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.finished_); - next_run_time_point = std::chrono::round(std::chrono::system_clock::now() + first_task_reschedule_info.wait_time_); - CHECK(next_run_time_point == std::chrono::ceil(std::chrono::system_clock::now())); + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(std::chrono::round(first_task_reschedule_info.getNextExecutionTime() - second_task_reschedule_info.getNextExecutionTime()) == 0min); CHECK(count_proc->getNumberOfTriggers() == 0); } } @@ -137,14 +146,14 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto cron_driven_agent = std::make_shared(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); cron_driven_agent->start(); auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.finished_); - CHECK(first_task_reschedule_info.wait_time_ <= 1s); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s); CHECK(count_proc->getNumberOfTriggers() == 0); - std::this_thread::sleep_for(first_task_reschedule_info.wait_time_ + 1ms); + std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime()); auto second_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.finished_); - CHECK(second_task_reschedule_info.wait_time_ <= 1s); + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(second_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s); CHECK(count_proc->getNumberOfTriggers() == 1); } @@ -153,7 +162,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto cron_driven_agent = std::make_shared(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); cron_driven_agent->start(); auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(first_task_reschedule_info.finished_); + CHECK(first_task_reschedule_info.isFinished()); } } } // namespace org::apache::nifi::minifi::testing diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index 5c23e236ce..0c5ac92bb0 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -27,9 +27,13 @@ #include "../Catch.h" #include "io/StreamFactory.h" #include "io/Sockets.h" -#include "utils/ThreadPool.h" #include "properties/Configuration.h" +#include +#include +#include +#include + namespace minifi = org::apache::nifi::minifi; namespace io = minifi::io; using io::Socket; @@ -163,7 +167,7 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket7]") { #ifdef OPENSSL_SUPPORT std::atomic counter; std::mt19937_64 seed { std::random_device { }() }; -bool createSocket() { +asio::awaitable createSocket() { counter++; std::shared_ptr configuration = std::make_shared(); @@ -175,7 +179,7 @@ bool createSocket() { socketA->initialize(); } - return true; + co_return true; } /** * MINIFI-320 was created to address reallocations within TLSContext @@ -183,22 +187,20 @@ bool createSocket() { * to ensure we no longer see the segfaults. */ TEST_CASE("TestTLSContextCreation", "[TestSocket8]") { - utils::ThreadPool pool(20, true); + constexpr size_t number_of_threads = 20; + asio::thread_pool pool(number_of_threads); std::vector> futures; - for (int i = 0; i < 20; i++) { - std::function f_ex = createSocket; - utils::Worker functor(f_ex, "id"); - std::future fut; - pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) - futures.push_back(std::move(fut)); + futures.reserve(number_of_threads); + for (size_t i = 0; i < number_of_threads; i++) { + futures.push_back(asio::co_spawn(pool, createSocket(), asio::use_future)); } - pool.start(); + pool.join(); for (auto &&future : futures) { - future.wait(); + CHECK(future.valid()); } - REQUIRE(20 == counter.load()); + REQUIRE(number_of_threads == counter.load()); } /** @@ -211,7 +213,7 @@ TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") { auto factory = io::StreamFactory::getInstance(configure); std::string host = Socket::getMyHostName(); Socket *socket = factory->createSocket(host, 10001).release(); - io::TLSSocket *tls = dynamic_cast(socket); + auto *tls = dynamic_cast(socket); REQUIRE(tls == nullptr); } @@ -225,7 +227,7 @@ TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket10]") { auto factory = io::StreamFactory::getInstance(configure); std::string host = Socket::getMyHostName(); io::Socket *socket = factory->createSecureSocket(host, 10001, nullptr).release(); - io::TLSSocket *tls = dynamic_cast(socket); + auto *tls = dynamic_cast(socket); REQUIRE(tls == nullptr); } #endif // OPENSSL_SUPPORT diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index 948fc73e54..cbf0547b5a 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -25,67 +25,43 @@ using namespace std::literals::chrono_literals; -class WorkerNumberExecutions : public utils::AfterExecute { - public: - explicit WorkerNumberExecutions(int tasks) - : tasks(tasks) { - } - - explicit WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept - : runs(other.runs), - tasks(other.tasks) { - } - - bool isFinished(const int &result) override { - return result <= 0 || ++runs >= tasks; - } - bool isCancelled(const int& /*result*/) override { - return false; - } - - std::chrono::steady_clock::duration wait_time() override { - return 50ms; - } - - protected: - int runs{0}; - int tasks; -}; - TEST_CASE("ThreadPoolTest1", "[TPT1]") { - utils::ThreadPool pool(5); - utils::Worker functor([](){ return true; }, "id"); + utils::ThreadPool pool(5); + utils::Worker functor([](){ return utils::TaskRescheduleInfo::Done(); }, "id"); pool.start(); - std::future fut; + std::future fut; pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) - fut.wait(); - REQUIRE(true == fut.get()); + CHECK(fut.get().isFinished()); } TEST_CASE("ThreadPoolTest2", "[TPT2]") { - std::atomic counter = 0; - utils::ThreadPool pool(5); - std::unique_ptr> after_execute = std::unique_ptr>(new WorkerNumberExecutions(20)); - utils::Worker functor([&counter]() { return ++counter; }, "id", std::move(after_execute)); + constexpr size_t max_counter = 20; + std::atomic counter = 0; + utils::ThreadPool pool(5); + utils::Worker functor([&counter](){ + if (++counter == max_counter) + return utils::TaskRescheduleInfo::Done(); + + return utils::TaskRescheduleInfo::RetryImmediately();}, "id"); pool.start(); - std::future fut; - pool.execute(std::move(functor), fut); - fut.wait(); - REQUIRE(20 == fut.get()); + std::future fut; + pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) + CHECK(fut.get().isFinished()); + REQUIRE(20 == counter); } TEST_CASE("Worker wait time should be relative to the last run") { std::vector worker_execution_time_points; - utils::ThreadPool pool(1); + utils::ThreadPool pool(1); auto wait_time_between_tasks = 10ms; - utils::Worker worker([&]()->utils::TaskRescheduleInfo { + utils::Worker worker([&]()->utils::TaskRescheduleInfo { worker_execution_time_points.push_back(std::chrono::steady_clock::now()); if (worker_execution_time_points.size() == 2) { return utils::TaskRescheduleInfo::Done(); } else { return utils::TaskRescheduleInfo::RetryIn(wait_time_between_tasks); } - }, "id", std::make_unique()); + }, "id"); std::this_thread::sleep_for(wait_time_between_tasks + 1ms); // Pre-waiting should not matter std::future task_future; @@ -94,7 +70,7 @@ TEST_CASE("Worker wait time should be relative to the last run") { auto final_task_reschedule_info = task_future.get(); - CHECK(final_task_reschedule_info.finished_); + CHECK(final_task_reschedule_info.isFinished()); REQUIRE(worker_execution_time_points.size() == 2); CHECK(worker_execution_time_points[1] - worker_execution_time_points[0] >= wait_time_between_tasks); } diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h index 427eed117f..c17130371b 100644 --- a/nanofi/include/cxx/Instance.h +++ b/nanofi/include/cxx/Instance.h @@ -148,9 +148,8 @@ class Instance { // run all functions independently for (auto function : functions) { - utils::Worker functor(function, "listeners"); std::future future; - listener_thread_pool_.execute(std::move(functor), future); + listener_thread_pool_.execute(utils::Worker{function, "listeners"}, future); } } @@ -169,7 +168,7 @@ class Instance { std::string url_; std::shared_ptr configure_; - utils::ThreadPool listener_thread_pool_; + utils::ThreadPool listener_thread_pool_; }; } // namespace org::apache::nifi::minifi From 2cb98d02aa7572ba346f3a9167903253798aaba6 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 17 Jul 2023 16:42:44 +0200 Subject: [PATCH 2/6] replaced DelayedTaskComparator with decltype removed unneccesary comments on ThreadPool members removed duplicate using chrono_literals from ThreadPool.cpp --- libminifi/include/utils/ThreadPool.h | 38 +++------------------------- libminifi/src/utils/ThreadPool.cpp | 2 -- 2 files changed, 4 insertions(+), 36 deletions(-) diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 78c7e0d417..22627610fe 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -106,13 +106,6 @@ class Worker { std::shared_ptr> promise; }; -class DelayedTaskComparator { - public: - bool operator()(Worker &a, Worker &b) { - return a.getNextExecutionTime() > b.getNextExecutionTime(); - } -}; - class WorkerThread { public: explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker") @@ -266,56 +259,33 @@ class ThreadPool { } std::atomic thread_reduction_count_; -// max worker threads int max_worker_threads_; -// current worker tasks. std::atomic current_workers_; -// thread queue std::vector> thread_queue_; -// manager thread std::thread manager_thread_; -// the thread responsible for putting delayed tasks to the worker queue when they had to be put std::thread delayed_scheduler_thread_; -// conditional that's used to adjust the threads std::atomic adjust_threads_; -// atomic running boolean std::atomic running_; -// controller service provider core::controller::ControllerServiceProvider* controller_service_provider_; -// integrated power manager std::shared_ptr thread_manager_; - // thread queue for the recently deceased threads. ConcurrentQueue> deceased_thread_queue_; -// worker queue of worker objects ConditionConcurrentQueue worker_queue_; - std::priority_queue, DelayedTaskComparator> delayed_worker_queue_; -// mutex to protect task status and delayed queue + std::priority_queue, + /* comparator: */ decltype([](const Worker& lhs, const Worker& rhs) noexcept { return lhs.getNextExecutionTime() > rhs.getNextExecutionTime(); }) + > delayed_worker_queue_; std::mutex worker_queue_mutex_; -// notification for new delayed tasks that's before the current ones std::condition_variable delayed_task_available_; -// map to identify if a task should be std::map task_status_; -// manager mutex std::recursive_mutex manager_mutex_; - // thread pool name std::string name_; - // count of running tasks by ID std::unordered_map running_task_count_by_id_; - // variable to signal task running completion std::condition_variable task_run_complete_; std::shared_ptr logger_; - /** - * Call for the manager to start worker threads - */ - void manageWorkers(); - /** - * Runs worker tasks - */ + void manageWorkers(); void run_tasks(const std::shared_ptr& thread); - void manage_delayed_queue(); }; diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp index a731b8d595..b6343aaa9f 100644 --- a/libminifi/src/utils/ThreadPool.cpp +++ b/libminifi/src/utils/ThreadPool.cpp @@ -19,8 +19,6 @@ using namespace std::literals::chrono_literals; -using namespace std::literals::chrono_literals; - namespace org::apache::nifi::minifi::utils { ThreadPool::ThreadPool(int max_worker_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name) From 0f0a3396bfec16a0454ddeae4ff8018f56fd7d35 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Wed, 19 Jul 2023 14:46:40 +0200 Subject: [PATCH 3/6] reverted decltype change in ThreadPool.h due to warning --- libminifi/include/utils/ThreadPool.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 22627610fe..cafcb2b94e 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -106,6 +106,13 @@ class Worker { std::shared_ptr> promise; }; +class DelayedTaskComparator { + public: + bool operator()(Worker &a, Worker &b) { + return a.getNextExecutionTime() > b.getNextExecutionTime(); + } +}; + class WorkerThread { public: explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker") @@ -270,9 +277,7 @@ class ThreadPool { std::shared_ptr thread_manager_; ConcurrentQueue> deceased_thread_queue_; ConditionConcurrentQueue worker_queue_; - std::priority_queue, - /* comparator: */ decltype([](const Worker& lhs, const Worker& rhs) noexcept { return lhs.getNextExecutionTime() > rhs.getNextExecutionTime(); }) - > delayed_worker_queue_; + std::priority_queue, DelayedTaskComparator> delayed_worker_queue_; std::mutex worker_queue_mutex_; std::condition_variable delayed_task_available_; std::map task_status_; From 73845754887d2d7c132528d20494deb379aa7b90 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 14 Aug 2023 11:13:23 +0200 Subject: [PATCH 4/6] fix indentation --- libminifi/test/unit/BackTraceTests.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp index eccba681fc..c1ecfeb37b 100644 --- a/libminifi/test/unit/BackTraceTests.cpp +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -76,8 +76,8 @@ TEST_CASE("BT2", "[TPT2]") { CHECK(ranges::any_of(first_worker_trace.getTraces(), [](const std::string& trace_line) { return trace_line.find("inner_function") != trace_line.npos; })); #endif #endif - done_with_checking.test_and_set(); - done_with_checking.notify_all(); + done_with_checking.test_and_set(); + done_with_checking.notify_all(); } future.wait(); } From fa2f86b2f9df02ff2cb806040f9cc7faf5aaf00f Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 14 Aug 2023 11:15:21 +0200 Subject: [PATCH 5/6] remove old comments from SocketTests.cpp --- libminifi/test/unit/SocketTests.cpp | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index 0c5ac92bb0..a8d91f44e6 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -181,11 +181,7 @@ asio::awaitable createSocket() { co_return true; } -/** - * MINIFI-320 was created to address reallocations within TLSContext - * This test will create 20 threads that attempt to create contexts - * to ensure we no longer see the segfaults. - */ + TEST_CASE("TestTLSContextCreation", "[TestSocket8]") { constexpr size_t number_of_threads = 20; asio::thread_pool pool(number_of_threads); @@ -203,10 +199,6 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket8]") { REQUIRE(number_of_threads == counter.load()); } -/** - * MINIFI-329 was created in regards to an option existing but not - * being properly evaluated. - */ TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") { std::shared_ptr configure = std::make_shared(); configure->set(minifi::Configuration::nifi_remote_input_secure, "false"); @@ -217,10 +209,6 @@ TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") { REQUIRE(tls == nullptr); } -/** - * MINIFI-329 was created in regards to an option existing but not - * being properly evaluated. - */ TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket10]") { std::shared_ptr configure = std::make_shared(); configure->set(minifi::Configuration::nifi_remote_input_secure, "false"); From 975ca26943d34af162a44871b04eb272a9598487 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 14 Aug 2023 15:16:16 +0200 Subject: [PATCH 6/6] REQUIRE(!traces.empty()); --- libminifi/test/unit/BackTraceTests.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp index c1ecfeb37b..87840b13ef 100644 --- a/libminifi/test/unit/BackTraceTests.cpp +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -65,6 +65,7 @@ TEST_CASE("BT2", "[TPT2]") { ready_for_checking.wait(false); std::vector traces = pool.getTraces(); CHECK(traces.size() <= number_of_worker_threads); + REQUIRE(!traces.empty()); for (const auto &trace : traces) { CHECK(trace.getName().starts_with(thread_pool_name)); }