Skip to content

Commit

Permalink
MINIFICPP-2377 Support process group level controller services
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Oct 16, 2024
1 parent 6c0979f commit 2fec604
Show file tree
Hide file tree
Showing 114 changed files with 982 additions and 801 deletions.
4 changes: 2 additions & 2 deletions extensions/aws/processors/DeleteS3Object.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


template<typename T>
class S3TestsFixture;
class FlowProcessorS3TestsFixture;

namespace org::apache::nifi::minifi::aws::processors {

Expand Down Expand Up @@ -74,7 +74,7 @@ class DeleteS3Object : public S3Processor {
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

private:
friend class ::S3TestsFixture<DeleteS3Object>;
friend class ::FlowProcessorS3TestsFixture<DeleteS3Object>;

explicit DeleteS3Object(std::string_view name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
: S3Processor(name, uuid, core::logging::LoggerFactory<DeleteS3Object>::getLogger(uuid), std::move(s3_request_sender)) {
Expand Down
4 changes: 2 additions & 2 deletions extensions/aws/processors/FetchS3Object.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include "utils/GeneralUtils.h"

template<typename T>
class S3TestsFixture;
class FlowProcessorS3TestsFixture;

namespace org::apache::nifi::minifi::aws::processors {

Expand Down Expand Up @@ -85,7 +85,7 @@ class FetchS3Object : public S3Processor {
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

private:
friend class ::S3TestsFixture<FetchS3Object>;
friend class ::FlowProcessorS3TestsFixture<FetchS3Object>;

explicit FetchS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
: S3Processor(name, uuid, core::logging::LoggerFactory<FetchS3Object>::getLogger(uuid), std::move(s3_request_sender)) {
Expand Down
4 changes: 2 additions & 2 deletions extensions/aws/processors/PutS3Object.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#include "utils/Id.h"

template<typename T>
class S3TestsFixture;
class FlowProcessorS3TestsFixture;

namespace org::apache::nifi::minifi::aws::processors {

Expand Down Expand Up @@ -181,7 +181,7 @@ class PutS3Object : public S3Processor {
static constexpr uint64_t MIN_PART_SIZE = 5_MiB;
static constexpr uint64_t MAX_UPLOAD_SIZE = 5_GiB;

friend class ::S3TestsFixture<PutS3Object>;
friend class ::FlowProcessorS3TestsFixture<PutS3Object>;

explicit PutS3Object(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
: S3Processor(name, uuid, core::logging::LoggerFactory<PutS3Object>::getLogger(uuid), std::move(s3_request_sender)) {
Expand Down
4 changes: 2 additions & 2 deletions extensions/aws/processors/S3Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentialsFromContr
return std::nullopt;
}

std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name);
std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name, getUUID());
if (!service) {
logger_->log_error("AWS credentials service with name: '{}' could not be found", service_name);
return std::nullopt;
Expand Down Expand Up @@ -121,7 +121,7 @@ void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSession

if (auto communications_timeout = context.getProperty<core::TimePeriodValue>(CommunicationsTimeout)) {
logger_->log_debug("S3Processor: Communications Timeout {}", communications_timeout->getMilliseconds());
client_config_->connectTimeoutMs = gsl::narrow<long>(communications_timeout->getMilliseconds().count()); // NOLINT(runtime/int)
client_config_->connectTimeoutMs = gsl::narrow<long>(communications_timeout->getMilliseconds().count()); // NOLINT(runtime/int,google-runtime-int)
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid");
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/aws/tests/PutS3ObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PutS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::

class PutS3ObjectLimitChanged : public minifi::aws::processors::PutS3Object {
protected:
friend class ::S3TestsFixture<PutS3ObjectLimitChanged>;
friend class ::FlowProcessorS3TestsFixture<PutS3ObjectLimitChanged>;

explicit PutS3ObjectLimitChanged(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<minifi::aws::s3::S3RequestSender> s3_request_sender)
: PutS3Object(name, uuid, std::move(s3_request_sender)) {
Expand Down
23 changes: 15 additions & 8 deletions extensions/aws/tests/S3TestsFixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ class S3TestsFixture {

// Build MiNiFi processing graph
plan = test_controller.createPlan();
mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(mock_s3_request_sender_ptr);
s3_processor = std::shared_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
aws_credentials_service = plan->addController("AWSCredentialsService", "AWSCredentialsService");
}

Expand Down Expand Up @@ -121,8 +118,8 @@ class S3TestsFixture {
TestController test_controller;
std::shared_ptr<TestPlan> plan;
MockS3RequestSender* mock_s3_request_sender_ptr;
std::shared_ptr<core::Processor> s3_processor;
std::shared_ptr<core::Processor> update_attribute;
core::Processor* s3_processor;
core::Processor* update_attribute;
std::shared_ptr<core::controller::ControllerServiceNode> aws_credentials_service;
};

Expand All @@ -136,6 +133,11 @@ class FlowProcessorS3TestsFixture : public S3TestsFixture<T> {
LogTestController::getInstance().setTrace<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>();

this->mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(this->mock_s3_request_sender_ptr);
auto s3_processor_unique_ptr = std::unique_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
this->s3_processor = s3_processor_unique_ptr.get();

auto input_dir = this->test_controller.createTempDirectory();
std::ofstream input_file_stream(input_dir / INPUT_FILENAME);
input_file_stream << INPUT_DATA;
Expand All @@ -149,7 +151,7 @@ class FlowProcessorS3TestsFixture : public S3TestsFixture<T> {
core::Relationship("success", "d"),
true);
this->plan->addProcessor(
this->s3_processor,
std::move(s3_processor_unique_ptr),
"S3Processor",
core::Relationship("success", "d"),
true);
Expand Down Expand Up @@ -186,15 +188,20 @@ class FlowProcessorS3TestsFixture : public S3TestsFixture<T> {
}

protected:
std::shared_ptr<core::Processor> update_attribute;
core::Processor* update_attribute;
};

template<typename T>
class FlowProducerS3TestsFixture : public S3TestsFixture<T> {
public:
FlowProducerS3TestsFixture() {
this->mock_s3_request_sender_ptr = new MockS3RequestSender();
std::unique_ptr<minifi::aws::s3::S3RequestSender> mock_s3_request_sender(this->mock_s3_request_sender_ptr);
auto s3_processor_unique_ptr = std::unique_ptr<T>(new T("S3Processor", utils::Identifier(), std::move(mock_s3_request_sender)));
this->s3_processor = s3_processor_unique_ptr.get();

this->plan->addProcessor(
this->s3_processor,
std::move(s3_processor_unique_ptr),
"S3Processor");
auto log_attribute = this->plan->addProcessor(
"LogAttribute",
Expand Down
2 changes: 1 addition & 1 deletion extensions/azure/processors/AzureStorageProcessorBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ std::tuple<AzureStorageProcessorBase::GetCredentialsFromControllerResult, std::o
return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_EMPTY, std::nullopt);
}

std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name);
std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name, getUUID());
if (nullptr == service) {
logger_->log_error("Azure Storage credentials service with name: '{}' could not be found", service_name);
return std::make_tuple(GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID, std::nullopt);
Expand Down
15 changes: 8 additions & 7 deletions extensions/azure/tests/AzureBlobStorageTestsFixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class AzureBlobStorageTestsFixture {
plan_ = test_controller_.createPlan();
auto mock_blob_storage = std::make_unique<MockBlobStorage>();
mock_blob_storage_ptr_ = mock_blob_storage.get();
azure_blob_storage_processor_ = std::shared_ptr<ProcessorType>(
auto azure_blob_storage_processor_unique_ptr = std::unique_ptr<ProcessorType>(
new ProcessorType("AzureBlobStorageProcessor", utils::Identifier(), std::move(mock_blob_storage)));
azure_blob_storage_processor_ = azure_blob_storage_processor_unique_ptr.get();
auto input_dir = test_controller_.createTempDirectory();
std::ofstream input_file_stream(input_dir / GET_FILE_NAME);
input_file_stream << TEST_DATA;
Expand All @@ -71,7 +72,7 @@ class AzureBlobStorageTestsFixture {
plan_->setProperty(get_file_processor_, minifi::processors::GetFile::Directory, input_dir.string());
plan_->setProperty(get_file_processor_, minifi::processors::GetFile::KeepSourceFile, "false");
update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true);
plan_->addProcessor(azure_blob_storage_processor_, "AzureBlobStorageProcessor", { {"success", "d"} }, true);
plan_->addProcessor(std::move(azure_blob_storage_processor_unique_ptr), "AzureBlobStorageProcessor", { {"success", "d"} }, true);
auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
success_putfile_ = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
plan_->addConnection(logattribute, {"success", "d"}, success_putfile_);
Expand Down Expand Up @@ -122,11 +123,11 @@ class AzureBlobStorageTestsFixture {
TestController test_controller_;
std::shared_ptr<TestPlan> plan_;
MockBlobStorage* mock_blob_storage_ptr_;
std::shared_ptr<core::Processor> azure_blob_storage_processor_;
std::shared_ptr<core::Processor> get_file_processor_;
std::shared_ptr<core::Processor> update_attribute_processor_;
std::shared_ptr<core::Processor> success_putfile_;
std::shared_ptr<core::Processor> failure_putfile_;
core::Processor* azure_blob_storage_processor_;
core::Processor* get_file_processor_;
core::Processor* update_attribute_processor_;
core::Processor* success_putfile_;
core::Processor* failure_putfile_;
std::filesystem::path failure_output_dir_;
std::filesystem::path success_output_dir_;
};
15 changes: 8 additions & 7 deletions extensions/azure/tests/AzureDataLakeStorageTestsFixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ class AzureDataLakeStorageTestsFixture {
plan_ = test_controller_.createPlan();
auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>();
mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
azure_data_lake_storage_ = std::shared_ptr<AzureDataLakeStorageProcessor>(
auto azure_data_lake_storage_unique_ptr = std::unique_ptr<AzureDataLakeStorageProcessor>(
new AzureDataLakeStorageProcessor("AzureDataLakeStorageProcessor", utils::Identifier(), std::move(mock_data_lake_storage_client)));
azure_data_lake_storage_ = azure_data_lake_storage_unique_ptr.get();
auto input_dir = test_controller_.createTempDirectory();
minifi::test::utils::putFileToDir(input_dir, GETFILE_FILE_NAME, TEST_DATA);

Expand All @@ -70,7 +71,7 @@ class AzureDataLakeStorageTestsFixture {
plan_->setProperty(get_file_, minifi::processors::GetFile::KeepSourceFile, "false");

update_attribute_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true);
plan_->addProcessor(azure_data_lake_storage_, "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true);
plan_->addProcessor(std::move(azure_data_lake_storage_unique_ptr), "AzureDataLakeStorageProcessor", { {"success", "d"}, {"failure", "d"} }, true);
auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);

success_putfile_ = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false);
Expand Down Expand Up @@ -127,11 +128,11 @@ class AzureDataLakeStorageTestsFixture {
TestController test_controller_;
std::shared_ptr<TestPlan> plan_;
MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
std::shared_ptr<core::Processor> azure_data_lake_storage_;
std::shared_ptr<core::Processor> get_file_;
std::shared_ptr<core::Processor> update_attribute_;
std::shared_ptr<core::Processor> success_putfile_;
std::shared_ptr<core::Processor> failure_putfile_;
core::Processor* azure_data_lake_storage_;
core::Processor* get_file_;
core::Processor* update_attribute_;
core::Processor* success_putfile_;
core::Processor* failure_putfile_;
std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
std::filesystem::path failure_output_dir_;
std::filesystem::path success_output_dir_;
Expand Down
7 changes: 4 additions & 3 deletions extensions/azure/tests/ListAzureBlobStorageTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ class ListAzureBlobStorageTestsFixture {
plan_ = test_controller_.createPlan();
auto mock_blob_storage = std::make_unique<MockBlobStorage>();
mock_blob_storage_ptr_ = mock_blob_storage.get();
list_azure_blob_storage_ = std::make_shared<minifi::azure::processors::ListAzureBlobStorage>("ListAzureBlobStorage", std::move(mock_blob_storage));
auto list_azure_blob_storage_unique_ptr = std::make_unique<minifi::azure::processors::ListAzureBlobStorage>("ListAzureBlobStorage", std::move(mock_blob_storage));
list_azure_blob_storage_ = list_azure_blob_storage_unique_ptr.get();

plan_->addProcessor(list_azure_blob_storage_, "ListAzureBlobStorage", { {"success", "d"} });
plan_->addProcessor(std::move(list_azure_blob_storage_unique_ptr), "ListAzureBlobStorage", { {"success", "d"} });
auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
plan_->setProperty(logattribute, minifi::processors::LogAttribute::FlowFilesToLog, "0");

Expand All @@ -72,7 +73,7 @@ class ListAzureBlobStorageTestsFixture {
TestController test_controller_;
std::shared_ptr<TestPlan> plan_;
MockBlobStorage* mock_blob_storage_ptr_;
std::shared_ptr<core::Processor> list_azure_blob_storage_;
core::Processor* list_azure_blob_storage_;
std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
};

Expand Down
7 changes: 4 additions & 3 deletions extensions/azure/tests/ListAzureDataLakeStorageTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ class ListAzureDataLakeStorageTestsFixture {
plan_ = test_controller_.createPlan();
auto mock_data_lake_storage_client = std::make_unique<MockDataLakeStorageClient>();
mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
list_azure_data_lake_storage_ = std::shared_ptr<minifi::azure::processors::ListAzureDataLakeStorage>(
auto list_azure_data_lake_storage_unique_ptr = std::unique_ptr<minifi::azure::processors::ListAzureDataLakeStorage>(
new minifi::azure::processors::ListAzureDataLakeStorage("ListAzureDataLakeStorage", utils::Identifier(), std::move(mock_data_lake_storage_client)));
list_azure_data_lake_storage_ = list_azure_data_lake_storage_unique_ptr.get();

plan_->addProcessor(list_azure_data_lake_storage_, "ListAzureDataLakeStorage", { {"success", "d"} });
plan_->addProcessor(std::move(list_azure_data_lake_storage_unique_ptr), "ListAzureDataLakeStorage", { {"success", "d"} });
auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true);
plan_->setProperty(logattribute, minifi::processors::LogAttribute::FlowFilesToLog, "0");

Expand Down Expand Up @@ -73,7 +74,7 @@ class ListAzureDataLakeStorageTestsFixture {
TestController test_controller_;
std::shared_ptr<TestPlan> plan_;
MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
std::shared_ptr<core::Processor> list_azure_data_lake_storage_;
core::Processor* list_azure_data_lake_storage_;
std::shared_ptr<core::controller::ControllerServiceNode> azure_storage_cred_service_;
};

Expand Down
8 changes: 4 additions & 4 deletions extensions/bustache/tests/ApplyTemplateTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,16 @@ TEST_CASE("Test usage of ApplyTemplate", "[ApplyTemplateTest]") {
REQUIRE_FALSE(template_source_dir.empty());
REQUIRE_FALSE(put_file_destination_dir.empty());

std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getFile");
auto getfile = plan->addProcessor("GetFile", "getFile");
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory, get_file_source_dir.string());
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile, "true");

std::shared_ptr<core::Processor> extract_text = plan->addProcessor("ExtractText", "testExtractText", core::Relationship("success", "description"), true);
auto extract_text = plan->addProcessor("ExtractText", "testExtractText", core::Relationship("success", "description"), true);
plan->setProperty(extract_text, org::apache::nifi::minifi::processors::ExtractText::Attribute, TEST_ATTR);

std::shared_ptr<core::Processor> apply_template = plan->addProcessor("ApplyTemplate", "testApplyTemplate", core::Relationship("success", "description"), true);
auto apply_template = plan->addProcessor("ApplyTemplate", "testApplyTemplate", core::Relationship("success", "description"), true);

std::shared_ptr<core::Processor> put_file = plan->addProcessor("PutFile", "put_file", core::Relationship("success", "description"), true);
auto put_file = plan->addProcessor("PutFile", "put_file", core::Relationship("success", "description"), true);
plan->setProperty(put_file, org::apache::nifi::minifi::processors::PutFile::Directory, put_file_destination_dir.string());
plan->setProperty(put_file, org::apache::nifi::minifi::processors::PutFile::ConflictResolution, magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::replace));

Expand Down
14 changes: 7 additions & 7 deletions extensions/civetweb/tests/ListenHTTPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class ListenHTTPTestsFixture {
plan->runNextProcessor(); // UpdateAttribute
plan->runNextProcessor(); // ListenHTTP

auto raw_ptr = dynamic_cast<org::apache::nifi::minifi::processors::ListenHTTP*>(listen_http.get());
auto raw_ptr = dynamic_cast<org::apache::nifi::minifi::processors::ListenHTTP*>(listen_http);
std::string protocol = std::string("http") + (raw_ptr->isSecure() ? "s" : "");
std::string portstr = raw_ptr->getPort();
REQUIRE(LogTestController::getInstance().contains("Listening on port " + portstr));
Expand Down Expand Up @@ -228,10 +228,10 @@ class ListenHTTPTestsFixture {
std::filesystem::path tmp_dir;
TestController testController;
std::shared_ptr<TestPlan> plan;
std::shared_ptr<core::Processor> get_file;
std::shared_ptr<core::Processor> update_attribute;
std::shared_ptr<core::Processor> listen_http;
std::shared_ptr<core::Processor> log_attribute;
core::Processor* get_file = nullptr;
core::Processor* update_attribute = nullptr;
core::Processor* listen_http = nullptr;
core::Processor* log_attribute = nullptr;

std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service;
HttpRequestMethod method = HttpRequestMethod::GET;
Expand Down Expand Up @@ -672,8 +672,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL version", "[https]")

TEST_CASE("ListenHTTP bored yield", "[listenhttp][bored][yield]") {
using processors::ListenHTTP;
const auto listen_http = std::make_shared<ListenHTTP>("listenhttp");
SingleProcessorTestController controller{listen_http};
SingleProcessorTestController controller{std::make_unique<ListenHTTP>("listenhttp")};
auto listen_http = controller.getProcessor();
listen_http->setProperty(ListenHTTP::Port, "0");

REQUIRE(!listen_http->isYield());
Expand Down
Loading

0 comments on commit 2fec604

Please sign in to comment.