Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2129 Refactor threadpool #1606

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions extensions/rocksdb-repos/FlowFileLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ FlowFileLoader::~FlowFileLoader() {
std::future<FlowFileLoader::FlowFilePtrVec> FlowFileLoader::load(std::vector<SwappedFlowFile> flow_files) {
auto promise = std::make_shared<std::promise<FlowFilePtrVec>>();
std::future<FlowFilePtrVec> future = promise->get_future();
utils::Worker<utils::TaskRescheduleInfo> task{[this, flow_files = std::move(flow_files), promise = std::move(promise)] {
utils::Worker task{[this, flow_files = std::move(flow_files), promise = std::move(promise)] {
return loadImpl(flow_files, promise);
},
"", // doesn't matter that tasks alias by name, as we never actually query their status or stop a single task
std::make_unique<utils::ComplexMonitor>()};
""}; // doesn't matter that tasks alias by name, as we never actually query their status or stop a single task

// the dummy_future is for the return value of the Worker's lambda, rerunning this lambda
// depends on run_determinant + result
// we could create a custom run_determinant to instead determine if/when it should be rerun
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/FlowFileLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class FlowFileLoader {
private:
utils::TaskRescheduleInfo loadImpl(const std::vector<SwappedFlowFile>& flow_files, const std::shared_ptr<std::promise<FlowFilePtrVec>>& output);

utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_{thread_count_, false, nullptr, "FlowFileLoaderThreadPool"};
utils::ThreadPool thread_pool_{thread_count_, nullptr, "FlowFileLoaderThreadPool"};

gsl::not_null<minifi::internal::RocksDatabase*> db_;

Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/CronDrivenSchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent {
std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo>& thread_pool)
utils::ThreadPool& thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) {
}

Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/EventDrivenSchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent {
public:
EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
utils::ThreadPool &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
using namespace std::literals::chrono_literals;

Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/FlowController.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FlowController>::getLogger();

// Thread pool for schedulers
utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
utils::ThreadPool thread_pool_;
};

} // namespace org::apache::nifi::minifi
4 changes: 2 additions & 2 deletions libminifi/include/SchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace org::apache::nifi::minifi {
class SchedulingAgent {
public:
SchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool& thread_pool)
: admin_yield_duration_(),
bored_yield_duration_(0),
configure_(configuration),
Expand Down Expand Up @@ -122,7 +122,7 @@ class SchedulingAgent {
std::shared_ptr<core::Repository> flow_repo_;

std::shared_ptr<core::ContentRepository> content_repo_;
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
utils::ThreadPool& thread_pool_;
gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_;

private:
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/ThreadedSchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
public:
ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
std::shared_ptr<Configure> configuration, utils::ThreadPool &thread_pool)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
}
~ThreadedSchedulingAgent() override = default;
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/TimerDrivenSchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent {
public:
TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
utils::ThreadPool &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) {
}

Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/c2/C2Agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class C2Agent : public state::UpdateController {

std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<C2Agent>::getLogger();

utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
utils::ThreadPool thread_pool_;

std::vector<utils::Identifier> task_ids_;

Expand Down
1 change: 1 addition & 0 deletions libminifi/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class Processor : public Connectable, public ConfigurableComponent, public state

void clearYield();

std::chrono::steady_clock::time_point getYieldExpirationTime() const { return yield_expiration_; }
std::chrono::steady_clock::duration getYieldTime() const;
// Whether flow file queue full in any of the outgoing connection
bool flowFilesOutGoingFull() const;
Expand Down
82 changes: 21 additions & 61 deletions libminifi/include/utils/Monitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,90 +23,50 @@
#if defined(WIN32)
#include <future> // This is required to work around a VS2017 bug, see the details below
#endif
#include "utils/gsl.h"

namespace org::apache::nifi::minifi::utils {

/**
* Worker task helper that determines
* whether or not we will run
*/
template<typename T>
class AfterExecute {
class TaskRescheduleInfo {
public:
virtual ~AfterExecute() = default;

AfterExecute() = default;
AfterExecute(AfterExecute&& /*other*/) noexcept = default;
virtual bool isFinished(const T &result) = 0;
virtual bool isCancelled(const T &result) = 0;
/**
* Time to wait before re-running this task if necessary
* @return milliseconds since epoch after which we are eligible to re-run this task.
*/
virtual std::chrono::steady_clock::duration wait_time() = 0;
};

/**
* Uses the wait time for a given worker to determine if it is eligible to run
*/
TaskRescheduleInfo(bool result, std::chrono::steady_clock::time_point next_execution_time)
: next_execution_time_(next_execution_time), finished_(result) {}

struct TaskRescheduleInfo {
TaskRescheduleInfo(bool result, std::chrono::steady_clock::duration wait_time)
: wait_time_(wait_time), finished_(result) {
gsl_Expects(wait_time >= std::chrono::milliseconds(0));
static TaskRescheduleInfo Done() {
return {true, std::chrono::steady_clock::time_point::min()};
}

std::chrono::steady_clock::duration wait_time_;
bool finished_;
static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point next_execution_time) {
return {false, next_execution_time};
}

static TaskRescheduleInfo Done() {
return {true, std::chrono::steady_clock::duration(0)};
static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration duration) {
return {false, std::chrono::steady_clock::now()+duration};
}

static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration interval) {
return {false, interval};
static TaskRescheduleInfo RetryImmediately() {
return {false, std::chrono::steady_clock::time_point::min()};
}

static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point time_point) {
auto interval = std::max(time_point - std::chrono::steady_clock::now(), std::chrono::steady_clock::duration(0));
return {false, interval};
std::chrono::steady_clock::time_point getNextExecutionTime() const {
return next_execution_time_;
}

static TaskRescheduleInfo RetryImmediately() {
return {false, std::chrono::steady_clock::duration(0)};
bool isFinished() const {
return finished_;
}

private:
std::chrono::steady_clock::time_point next_execution_time_;
bool finished_;

#if defined(WIN32)
// https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
// Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
private:
TaskRescheduleInfo() : wait_time_(std::chrono::steady_clock::duration(0)), finished_(true) {}
TaskRescheduleInfo() : next_execution_time_(std::chrono::steady_clock::time_point::min()), finished_(true) {}
friend class std::_Associated_state<TaskRescheduleInfo>;
#endif
};

class ComplexMonitor : public utils::AfterExecute<TaskRescheduleInfo> {
public:
ComplexMonitor() = default;

bool isFinished(const TaskRescheduleInfo &result) override {
if (result.finished_) {
return true;
}
current_wait_.store(result.wait_time_);
return false;
}
bool isCancelled(const TaskRescheduleInfo& /*result*/) override {
return false;
}

std::chrono::steady_clock::duration wait_time() override {
return current_wait_.load();
}

private:
std::atomic<std::chrono::steady_clock::duration> current_wait_ {std::chrono::steady_clock::duration(0)};
};

} // namespace org::apache::nifi::minifi::utils
Loading
Loading