diff --git a/PROCESSORS.md b/PROCESSORS.md index 97ade1f37a..b1ea31cb31 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -22,7 +22,6 @@ limitations under the License. - [BinFiles](#BinFiles) - [CaptureRTSPFrame](#CaptureRTSPFrame) - [CollectKubernetesPodMetrics](#CollectKubernetesPodMetrics) -- [CollectorInitiatedSubscription](#CollectorInitiatedSubscription) - [CompressContent](#CompressContent) - [ConsumeJournald](#ConsumeJournald) - [ConsumeKafka](#ConsumeKafka) @@ -286,39 +285,6 @@ In the list below, the names of required properties appear in bold. Any other pr | success | All flow files produced are routed to Success. | -## CollectorInitiatedSubscription - -### Description - -Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. - -### Properties - -In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. - -| Name | Default Value | Allowable Values | Description | -|------------------------------------|-----------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Subscription Name** | | | The name of the subscription. The value provided for this parameter should be unique within the computer's scope.
**Supports Expression Language: true** | -| **Subscription Description** | | | A description of the subscription.
**Supports Expression Language: true** | -| **Source Address** | | | The IP address or fully qualified domain name (FQDN) of the local or remote computer (event source) from which the events are collected.
**Supports Expression Language: true** | -| **Source User Name** | | | The user name, which is used by the remote computer (event source) to authenticate the user.
**Supports Expression Language: true** | -| **Source Password** | | | The password, which is used by the remote computer (event source) to authenticate the user.
**Sensitive Property: true**
**Supports Expression Language: true** | -| **Source Channels** | | | The Windows Event Log Channels (on domain computer(s)) from which events are transferred.
**Supports Expression Language: true** | -| **Max Delivery Items** | 1000 | | Determines the maximum number of items that will forwarded from an event source for each request. | -| **Delivery MaxLatency Time** | 10 min | | How long, in milliseconds, the event source should wait before sending events. | -| **Heartbeat Interval** | 10 min | | Time interval, in milliseconds, which is observed between the sent heartbeat messages. The event collector uses this property to determine the interval between queries to the event source. | -| **Channel** | ForwardedEvents | | The Windows Event Log Channel (on local machine) to which events are transferred.
**Supports Expression Language: true** | -| **Query** | * | | XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)
**Supports Expression Language: true** | -| **Max Buffer Size** | 1 MB | | The individual Event Log XMLs are rendered to a buffer. This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.) | -| **Inactive Duration To Reconnect** | 10 min | | If no new event logs are processed for the specified time period, this processor will try reconnecting to recover from a state where any further messages cannot be consumed. Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned. Setting no duration, e.g. '0 ms' disables auto-reconnection. | - -### Relationships - -| Name | Description | -|---------|------------------------------------------------| -| success | Relationship for successfully consumed events. | - - ## CompressContent ### Description diff --git a/README.md b/README.md index 0fc786fe1a..f325315e09 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte | SQL | [ExecuteSQL](PROCESSORS.md#executesql)
[PutSQL](PROCESSORS.md#putsql)
[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)
| -DENABLE_SQL=ON | | Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)
[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus) | -DENABLE_SPLUNK=ON | | Systemd (Linux) | [ConsumeJournald](PROCESSORS.md#consumejournald) | -DENABLE_SYSTEMD=ON | -| Windows Event Log (Windows) | [CollectorInitiatedSubscription](PROCESSORS.md#collectorinitiatedsubscription)
[ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)
[TailEventLog](PROCESSORS.md#taileventlog) | -DENABLE_WEL=ON | +| Windows Event Log (Windows) | [ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)
[TailEventLog](PROCESSORS.md#taileventlog) | -DENABLE_WEL=ON | Please see our [Python guide](extensions/python/PYTHON.md) on how to write Python processors and use them within MiNiFi C++. diff --git a/extensions/lua/LuaProcessSession.cpp b/extensions/lua/LuaProcessSession.cpp index daaa997314..3a7f3a48dd 100644 --- a/extensions/lua/LuaProcessSession.cpp +++ b/extensions/lua/LuaProcessSession.cpp @@ -22,16 +22,12 @@ namespace org::apache::nifi::minifi::extensions::lua { -LuaProcessSession::LuaProcessSession(std::shared_ptr session) - : session_(std::move(session)) { +LuaProcessSession::LuaProcessSession(core::ProcessSession& session) + : session_(session) { } std::shared_ptr LuaProcessSession::get() { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - - auto flow_file = session_->get(); + auto flow_file = session_.get(); if (flow_file == nullptr) { return nullptr; @@ -43,34 +39,26 @@ std::shared_ptr LuaProcessSession::get() { return result; } -void LuaProcessSession::transfer(const std::shared_ptr &script_flow_file, +void LuaProcessSession::transfer(const std::shared_ptr& script_flow_file, const core::Relationship& relationship) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - - auto flow_file = script_flow_file->getFlowFile(); + const auto flow_file = script_flow_file->getFlowFile(); if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } - session_->transfer(flow_file, relationship); + session_.transfer(flow_file, relationship); } void LuaProcessSession::read(const std::shared_ptr &script_flow_file, sol::table input_stream_callback) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - - auto flow_file = script_flow_file->getFlowFile(); + const auto flow_file = script_flow_file->getFlowFile(); if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } - session_->read(flow_file, [&input_stream_callback](const std::shared_ptr& input_stream) -> int64_t { + session_.read(flow_file, [&input_stream_callback](const std::shared_ptr& input_stream) -> int64_t { sol::function callback = input_stream_callback["process"]; return callback(input_stream_callback, std::make_shared(input_stream)); }); @@ -78,43 +66,31 @@ void LuaProcessSession::read(const std::shared_ptr &script_fl void LuaProcessSession::write(const std::shared_ptr &script_flow_file, sol::table output_stream_callback) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - auto flow_file = script_flow_file->getFlowFile(); if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } - session_->write(flow_file, [&output_stream_callback](const std::shared_ptr& output_stream) -> int64_t { + session_.write(flow_file, [&output_stream_callback](const std::shared_ptr& output_stream) -> int64_t { sol::function callback = output_stream_callback["process"]; return callback(output_stream_callback, std::make_shared(output_stream)); }); } std::shared_ptr LuaProcessSession::create() { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - - auto result = std::make_shared(session_->create()); + auto result = std::make_shared(session_.create()); flow_files_.push_back(result); return result; } std::shared_ptr LuaProcessSession::create(const std::shared_ptr &flow_file) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - std::shared_ptr result; if (flow_file == nullptr) { - result = std::make_shared(session_->create()); + result = std::make_shared(session_.create()); } else { - result = std::make_shared(session_->create(flow_file->getFlowFile().get())); + result = std::make_shared(session_.create(flow_file->getFlowFile().get())); } flow_files_.push_back(result); @@ -127,22 +103,16 @@ void LuaProcessSession::releaseCoreResources() { flow_file->releaseFlowFile(); } } - - session_.reset(); } void LuaProcessSession::remove(const std::shared_ptr& script_flow_file) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - auto flow_file = script_flow_file->getFlowFile(); if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } - session_->remove(flow_file); + session_.remove(flow_file); } } // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/LuaProcessSession.h b/extensions/lua/LuaProcessSession.h index de5508890d..bebd13f68e 100644 --- a/extensions/lua/LuaProcessSession.h +++ b/extensions/lua/LuaProcessSession.h @@ -31,15 +31,15 @@ namespace org::apache::nifi::minifi::extensions::lua { class LuaProcessSession { public: - explicit LuaProcessSession(std::shared_ptr session); + explicit LuaProcessSession(core::ProcessSession& session); std::shared_ptr get(); std::shared_ptr create(); - std::shared_ptr create(const std::shared_ptr &flow_file); - void transfer(const std::shared_ptr &flow_file, const core::Relationship& relationship); - void read(const std::shared_ptr &script_flow_file, sol::table input_stream_callback); - void write(const std::shared_ptr &flow_file, sol::table output_stream_callback); - void remove(const std::shared_ptr& flow_file); + std::shared_ptr create(const std::shared_ptr& script_flow_file); + void transfer(const std::shared_ptr& script_flow_file, const core::Relationship& relationship); + void read(const std::shared_ptr& script_flow_file, sol::table input_stream_callback); + void write(const std::shared_ptr& script_flow_file, sol::table output_stream_callback); + void remove(const std::shared_ptr& script_flow_file); /** * Sometimes we want to release shared pointers to core resources when @@ -53,7 +53,7 @@ class LuaProcessSession { private: std::vector> flow_files_; - std::shared_ptr session_; + core::ProcessSession& session_; }; } // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/LuaScriptEngine.cpp b/extensions/lua/LuaScriptEngine.cpp index 0981a9edc4..0806b7f773 100644 --- a/extensions/lua/LuaScriptEngine.cpp +++ b/extensions/lua/LuaScriptEngine.cpp @@ -131,7 +131,6 @@ class TriggerSession { TriggerSession& operator=(const TriggerSession&) = delete; ~TriggerSession() { - script_context_->releaseProcessContext(); lua_session_->releaseCoreResources(); } @@ -141,7 +140,7 @@ class TriggerSession { }; } // namespace -void LuaScriptEngine::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { +void LuaScriptEngine::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { auto script_context = std::make_shared(context, lua_); auto lua_session = std::make_shared(session); TriggerSession trigger_session(script_context, lua_session); diff --git a/extensions/lua/LuaScriptEngine.h b/extensions/lua/LuaScriptEngine.h index 4dcc6d2cde..f5ddf12ade 100644 --- a/extensions/lua/LuaScriptEngine.h +++ b/extensions/lua/LuaScriptEngine.h @@ -41,7 +41,7 @@ class LuaScriptEngine { void eval(const std::string& script); void evalFile(const std::filesystem::path& file_name); - void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session); + void onTrigger(core::ProcessContext& context, core::ProcessSession& session); void initialize(const core::Relationship& success, const core::Relationship& failure, const std::shared_ptr& logger); void setModulePaths(std::vector module_paths) { diff --git a/extensions/lua/LuaScriptExecutor.cpp b/extensions/lua/LuaScriptExecutor.cpp index 22a40e391a..665d98d988 100644 --- a/extensions/lua/LuaScriptExecutor.cpp +++ b/extensions/lua/LuaScriptExecutor.cpp @@ -26,8 +26,8 @@ namespace org::apache::nifi::minifi::extensions::lua { LuaScriptExecutor::LuaScriptExecutor(std::string_view name, const utils::Identifier& uuid) : script::ScriptExecutor(name, uuid) {} -void LuaScriptExecutor::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { - auto lua_script_engine = lua_script_engine_queue_->getResource(); +void LuaScriptExecutor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto lua_script_engine = lua_script_engine_queue_->getResource(); gsl_Expects(std::holds_alternative(script_to_run_) || std::holds_alternative(script_to_run_)); if (module_directory_) { diff --git a/extensions/lua/LuaScriptExecutor.h b/extensions/lua/LuaScriptExecutor.h index d001058c27..3137ed9123 100644 --- a/extensions/lua/LuaScriptExecutor.h +++ b/extensions/lua/LuaScriptExecutor.h @@ -21,7 +21,6 @@ #include #include #include -#include #include "../script/ScriptExecutor.h" #include "LuaScriptEngine.h" #include "utils/ResourceQueue.h" @@ -32,7 +31,7 @@ class LuaScriptExecutor : public script::ScriptExecutor { public: explicit LuaScriptExecutor(std::string_view name, const utils::Identifier& uuid = {}); - void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize(std::filesystem::path script_file, std::string script_body, std::optional module_directory, diff --git a/extensions/lua/LuaScriptProcessContext.cpp b/extensions/lua/LuaScriptProcessContext.cpp index 38a1775c3b..14c7a5ec8a 100644 --- a/extensions/lua/LuaScriptProcessContext.cpp +++ b/extensions/lua/LuaScriptProcessContext.cpp @@ -24,22 +24,19 @@ namespace org::apache::nifi::minifi::extensions::lua { -LuaScriptProcessContext::LuaScriptProcessContext(std::shared_ptr context, sol::state& sol_state) - : context_(std::move(context)), sol_state_(sol_state) { +LuaScriptProcessContext::LuaScriptProcessContext(core::ProcessContext& context, sol::state& sol_state) + : context_(context), sol_state_(sol_state) { } std::string LuaScriptProcessContext::getProperty(const std::string &name) { std::string value; - context_->getProperty(name, value); + context_.getProperty(name, value); return value; } -void LuaScriptProcessContext::releaseProcessContext() { - context_.reset(); -} LuaScriptStateManager LuaScriptProcessContext::getStateManager() { - return LuaScriptStateManager(context_->getStateManager(), sol_state_); + return LuaScriptStateManager(context_.getStateManager(), sol_state_); } } // namespace org::apache::nifi::minifi::extensions::lua diff --git a/extensions/lua/LuaScriptProcessContext.h b/extensions/lua/LuaScriptProcessContext.h index c5eeb1974d..98fa3c9491 100644 --- a/extensions/lua/LuaScriptProcessContext.h +++ b/extensions/lua/LuaScriptProcessContext.h @@ -28,15 +28,14 @@ namespace org::apache::nifi::minifi::extensions::lua { class LuaScriptProcessContext { public: - explicit LuaScriptProcessContext(std::shared_ptr context, sol::state& sol_state); + explicit LuaScriptProcessContext(core::ProcessContext& context, sol::state& sol_state); std::string getProperty(const std::string &name); - void releaseProcessContext(); LuaScriptStateManager getStateManager(); private: - std::shared_ptr context_; + core::ProcessContext& context_; sol::state& sol_state_; }; diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index e3b0a3a14e..115760e40b 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -75,7 +75,7 @@ void ExecutePythonProcessor::initalizeThroughScriptEngine() { } } -void ExecutePythonProcessor::onScheduleSharedPtr(const std::shared_ptr &context, const std::shared_ptr& /*sessionFactory*/) { +void ExecutePythonProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*sessionFactory*/) { addAutoTerminatedRelationship(Original); if (!processor_initialized_) { loadScript(); @@ -95,7 +95,7 @@ void ExecutePythonProcessor::onScheduleSharedPtr(const std::shared_ptr &context, const std::shared_ptr &session) { +void ExecutePythonProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { reloadScriptIfUsingScriptFileProperty(); if (script_to_exec_.empty()) { throw std::runtime_error("Neither Script Body nor Script File is available to execute"); diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index c13e85bcef..59b5613271 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -88,8 +88,8 @@ class ExecutePythonProcessor : public core::Processor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void initialize() override; - void onScheduleSharedPtr(const std::shared_ptr &context, const std::shared_ptr &sessionFactory) override; - void onTriggerSharedPtr(const std::shared_ptr &context, const std::shared_ptr &session) override; + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void setSupportsDynamicProperties() { python_dynamic_ = true; diff --git a/extensions/python/PythonScriptEngine.cpp b/extensions/python/PythonScriptEngine.cpp index ccf6156934..10893c195b 100644 --- a/extensions/python/PythonScriptEngine.cpp +++ b/extensions/python/PythonScriptEngine.cpp @@ -90,20 +90,20 @@ void PythonScriptEngine::describe(core::Processor* proc) { } } -void PythonScriptEngine::onSchedule(const std::shared_ptr &context) { +void PythonScriptEngine::onSchedule(core::ProcessContext& context) { if (processor_instance_.get() != nullptr) { - callProcessorObjectMethod("onSchedule", std::weak_ptr(context)); + callProcessorObjectMethod("onSchedule", &context); } else { - call("onSchedule", std::weak_ptr(context)); + call("onSchedule", &context); } } -void PythonScriptEngine::onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) { +void PythonScriptEngine::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { auto py_session = std::make_shared(session); if (processor_instance_.get() != nullptr) { - callProcessorObjectMethod("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session)); + callProcessorObjectMethod("onTrigger", &context, std::weak_ptr(py_session)); } else { - call("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session)); + call("onTrigger", &context, std::weak_ptr(py_session)); } } diff --git a/extensions/python/PythonScriptEngine.h b/extensions/python/PythonScriptEngine.h index b11c7fa649..dd02485166 100644 --- a/extensions/python/PythonScriptEngine.h +++ b/extensions/python/PythonScriptEngine.h @@ -149,8 +149,8 @@ class PythonScriptEngine { void onInitialize(core::Processor* proc); void describe(core::Processor* proc); - void onSchedule(const std::shared_ptr& context); - void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session); + void onSchedule(core::ProcessContext& context); + void onTrigger(core::ProcessContext& context, core::ProcessSession& session); void initialize(const core::Relationship& success, const core::Relationship& failure, const core::Relationship& original, const std::shared_ptr& logger); void initializeProcessorObject(const std::string& python_class_name); std::vector getCustomPythonRelationships(); diff --git a/extensions/python/PythonScriptExecutor.cpp b/extensions/python/PythonScriptExecutor.cpp index 7bf14bc5d4..1dbe57ecb6 100644 --- a/extensions/python/PythonScriptExecutor.cpp +++ b/extensions/python/PythonScriptExecutor.cpp @@ -27,10 +27,10 @@ namespace org::apache::nifi::minifi::extensions::python { -PythonScriptExecutor::PythonScriptExecutor(std::string_view name, const utils::Identifier& uuid) : script::ScriptExecutor(name, uuid) {} +PythonScriptExecutor::PythonScriptExecutor(const std::string_view name, const utils::Identifier& uuid) : script::ScriptExecutor(name, uuid) {} -void PythonScriptExecutor::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { +void PythonScriptExecutor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { gsl_Expects(python_script_engine_); gsl_Expects(std::holds_alternative(script_to_run_) || std::holds_alternative(script_to_run_)); diff --git a/extensions/python/PythonScriptExecutor.h b/extensions/python/PythonScriptExecutor.h index 69fc96d00b..80bf7096f1 100644 --- a/extensions/python/PythonScriptExecutor.h +++ b/extensions/python/PythonScriptExecutor.h @@ -29,7 +29,7 @@ class PythonScriptExecutor : public script::ScriptExecutor { public: explicit PythonScriptExecutor(std::string_view name, const utils::Identifier& uuid = {}); - void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize(std::filesystem::path script_file, std::string script_body, std::optional module_directory, diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 540a74c48e..5dd9628ff7 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -65,7 +65,7 @@ int PyProcessContext::init(PyProcessContext* self, PyObject* args, PyObject*) { } PyObject* PyProcessContext::getProperty(PyProcessContext* self, PyObject* args) { - auto context = self->process_context_.lock(); + auto context = self->process_context_; if (!context) { PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); return nullptr; @@ -100,7 +100,7 @@ PyObject* PyProcessContext::getProperty(PyProcessContext* self, PyObject* args) } PyObject* PyProcessContext::getStateManager(PyProcessContext* self, PyObject*) { - auto context = self->process_context_.lock(); + auto context = self->process_context_; if (!context) { PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); return nullptr; @@ -110,7 +110,7 @@ PyObject* PyProcessContext::getStateManager(PyProcessContext* self, PyObject*) { } PyObject* PyProcessContext::getControllerService(PyProcessContext* self, PyObject* args) { - auto context = self->process_context_.lock(); + auto context = self->process_context_; if (!context) { PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); return nullptr; @@ -134,7 +134,7 @@ PyObject* PyProcessContext::getControllerService(PyProcessContext* self, PyObjec } PyObject* PyProcessContext::getName(PyProcessContext* self, PyObject*) { - auto context = self->process_context_.lock(); + auto context = self->process_context_; if (!context) { PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); return nullptr; @@ -144,7 +144,7 @@ PyObject* PyProcessContext::getName(PyProcessContext* self, PyObject*) { } PyObject* PyProcessContext::getProperties(PyProcessContext* self, PyObject*) { - auto context = self->process_context_.lock(); + auto context = self->process_context_; if (!context) { PyErr_SetString(PyExc_AttributeError, "tried reading process context outside 'on_trigger'"); return nullptr; diff --git a/extensions/python/types/PyProcessContext.h b/extensions/python/types/PyProcessContext.h index 26aba677d0..00c6a04e69 100644 --- a/extensions/python/types/PyProcessContext.h +++ b/extensions/python/types/PyProcessContext.h @@ -26,7 +26,7 @@ namespace org::apache::nifi::minifi::extensions::python { struct PyProcessContext { PyProcessContext() {} - using HeldType = std::weak_ptr; + using HeldType = core::ProcessContext*; static constexpr const char* HeldTypeName = "PyProcessContext::HeldType"; PyObject_HEAD diff --git a/extensions/python/types/PyProcessSession.cpp b/extensions/python/types/PyProcessSession.cpp index 876c68bf75..00b9bf8a5b 100644 --- a/extensions/python/types/PyProcessSession.cpp +++ b/extensions/python/types/PyProcessSession.cpp @@ -29,15 +29,11 @@ namespace org::apache::nifi::minifi::extensions::python { namespace core = org::apache::nifi::minifi::core; -PyProcessSession::PyProcessSession(std::shared_ptr session) - : session_(std::move(session)) { +PyProcessSession::PyProcessSession(core::ProcessSession& session) + : session_(gsl::make_not_null(&session)) { } std::shared_ptr PyProcessSession::get() { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - auto flow_file = session_->get(); if (flow_file == nullptr) { @@ -51,10 +47,6 @@ std::shared_ptr PyProcessSession::get() { void PyProcessSession::transfer(const std::shared_ptr& flow_file, const core::Relationship& relationship) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } @@ -63,10 +55,6 @@ void PyProcessSession::transfer(const std::shared_ptr& flow_file } void PyProcessSession::transferToCustomRelationship(const std::shared_ptr& flow_file, const std::string& relationship_name) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } @@ -75,10 +63,6 @@ void PyProcessSession::transferToCustomRelationship(const std::shared_ptr& flow_file, BorrowedObject input_stream_callback) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } @@ -89,10 +73,6 @@ void PyProcessSession::read(const std::shared_ptr& flow_file, Bo } void PyProcessSession::write(const std::shared_ptr& flow_file, BorrowedObject output_stream_callback) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } @@ -103,10 +83,6 @@ void PyProcessSession::write(const std::shared_ptr& flow_file, B } std::shared_ptr PyProcessSession::create(const std::shared_ptr& flow_file) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - auto result = session_->create(flow_file.get()); flow_files_.push_back(result); @@ -114,10 +90,6 @@ std::shared_ptr PyProcessSession::create(const std::shared_ptr PyProcessSession::clone(const std::shared_ptr& flow_file) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - if (!flow_file) { throw std::runtime_error("Flow file to clone is nullptr"); } @@ -129,19 +101,11 @@ std::shared_ptr PyProcessSession::clone(const std::shared_ptr& flow_file) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - session_->remove(flow_file); flow_files_.erase(ranges::remove_if(flow_files_, [&flow_file](const auto& ff)-> bool { return ff == flow_file; }), flow_files_.end()); } std::string PyProcessSession::getContentsAsString(const std::shared_ptr& flow_file) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - if (!flow_file) { throw std::runtime_error("Access of FlowFile after it has been released"); } @@ -155,10 +119,6 @@ std::string PyProcessSession::getContentsAsString(const std::shared_ptr& flow_file, std::string_view key, const std::string& value) { - if (!session_) { - throw std::runtime_error("Access of ProcessSession after it has been released"); - } - session_->putAttribute(*flow_file, key, value); } diff --git a/extensions/python/types/PyProcessSession.h b/extensions/python/types/PyProcessSession.h index a78bb91078..1a88966793 100644 --- a/extensions/python/types/PyProcessSession.h +++ b/extensions/python/types/PyProcessSession.h @@ -27,7 +27,7 @@ namespace org::apache::nifi::minifi::extensions::python { class PyProcessSession { public: - explicit PyProcessSession(std::shared_ptr session); + explicit PyProcessSession(core::ProcessSession& session); std::shared_ptr get(); std::shared_ptr create(const std::shared_ptr& flow_file = nullptr); @@ -42,7 +42,7 @@ class PyProcessSession { private: std::vector> flow_files_; - std::shared_ptr session_; + gsl::not_null session_; }; struct PyProcessSessionObject { diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp index 8e1080027f..7b9fd16588 100644 --- a/extensions/script/ExecuteScript.cpp +++ b/extensions/script/ExecuteScript.cpp @@ -74,7 +74,7 @@ void ExecuteScript::onSchedule(core::ProcessContext& context, core::ProcessSessi script_executor_->initialize(std::move(script_file), std::move(script_body), std::move(module_directory), getMaxConcurrentTasks(), Success, Failure, Original, logger_); } -void ExecuteScript::onTriggerSharedPtr(const std::shared_ptr& context, const std::shared_ptr& session) { +void ExecuteScript::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { gsl_Expects(script_executor_); script_executor_->onTrigger(context, session); } diff --git a/extensions/script/ExecuteScript.h b/extensions/script/ExecuteScript.h index e5ef777398..56378e280c 100644 --- a/extensions/script/ExecuteScript.h +++ b/extensions/script/ExecuteScript.h @@ -93,7 +93,7 @@ class ExecuteScript : public core::Processor { void initialize() override; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; - void onTriggerSharedPtr(const std::shared_ptr &context, const std::shared_ptr &session) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; private: std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_); diff --git a/extensions/script/ScriptExecutor.h b/extensions/script/ScriptExecutor.h index 49f82b4bf0..b685993b70 100644 --- a/extensions/script/ScriptExecutor.h +++ b/extensions/script/ScriptExecutor.h @@ -17,8 +17,6 @@ #pragma once -#include -#include #include #include @@ -30,9 +28,9 @@ namespace org::apache::nifi::minifi::extensions::script { class ScriptExecutor : public minifi::core::CoreComponent { public: - ScriptExecutor(std::string_view name, const utils::Identifier& uuid) : core::CoreComponent(name, uuid) {} + ScriptExecutor(const std::string_view name, const utils::Identifier& uuid) : core::CoreComponent(name, uuid) {} - virtual void onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) = 0; + virtual void onTrigger(core::ProcessContext& context, core::ProcessSession& session) = 0; virtual void initialize(std::filesystem::path script_file, std::string script_body, std::optional module_directory, diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp deleted file mode 100644 index b699b0af5d..0000000000 --- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp +++ /dev/null @@ -1,572 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "CollectorInitiatedSubscription.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "io/BufferStream.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessSessionFactory.h" -#include "core/Resource.h" -#include "utils/gsl.h" -#include "utils/OptionalUtils.h" -#include "utils/OsUtils.h" -#include "utils/UnicodeConversion.h" - -#pragma comment(lib, "wevtapi.lib") -#pragma comment(lib, "Wecapi.lib") - -using namespace std::literals::chrono_literals; - -namespace org::apache::nifi::minifi::processors { - -#define LOG_SUBSCRIPTION_ERROR(error) logError(__LINE__, error) -#define LOG_SUBSCRIPTION_WINDOWS_ERROR(info) logWindowsError(__LINE__, info) - -CollectorInitiatedSubscription::CollectorInitiatedSubscription(const std::string& name, const utils::Identifier& uuid) - : core::Processor(name, uuid), logger_(core::logging::LoggerFactory::getLogger(uuid_)) { - char buff[MAX_COMPUTERNAME_LENGTH + 1]; - DWORD size = sizeof(buff); - if (GetComputerName(buff, &size)) { - computerName_ = buff; - } else { - LOG_SUBSCRIPTION_WINDOWS_ERROR("GetComputerName"); - } -} - -void CollectorInitiatedSubscription::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void CollectorInitiatedSubscription::onScheduleSharedPtr(const std::shared_ptr &context, const std::shared_ptr &sessionFactory) { - gsl_Expects(context); - - if (subscriptionHandle_) { - logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe."); - } else { - sessionFactory_ = sessionFactory; - - if (!createSubscription(*context)) - return; - if (!checkSubscriptionRuntimeStatus()) - return; - - subscribe(*context); - } - - subscription_name_ = utils::to_wstring(context->getProperty(SubscriptionName).value()); - max_buffer_size_ = context->getProperty(MaxBufferSize).value(); -} - -void CollectorInitiatedSubscription::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { - if (!subscriptionHandle_) { - if (!subscribe(context)) { - context.yield(); - return; - } - } - - checkSubscriptionRuntimeStatus(); - - const auto flowFileCount = processQueue(session); - - const auto now = GetTickCount64(); - - if (flowFileCount > 0) { - lastActivityTimestamp_ = now; - } else if (auto inactive_duration_to_reconnect_ms = context.getProperty(InactiveDurationToReconnect) - | utils::transform([](const auto& time_period_value) { return time_period_value.getMilliseconds().count(); }); - inactive_duration_to_reconnect_ms && *inactive_duration_to_reconnect_ms > 0) { - if ((now - lastActivityTimestamp_) > gsl::narrow(*inactive_duration_to_reconnect_ms)) { - logger_->log_info("Exceeds configured 'inactive duration to reconnect' {} ms. Unsubscribe to reconnect..", *inactive_duration_to_reconnect_ms); - unsubscribe(); - } - } -} - -void CollectorInitiatedSubscription::logInvalidSubscriptionPropertyType(int line, DWORD type) { - logError(line, "Invalid property type: " + std::to_string(type)); -} - -bool CollectorInitiatedSubscription::checkSubscriptionRuntimeStatus() { - EC_HANDLE hSubscription = EcOpenSubscription(subscription_name_.c_str(), EC_READ_ACCESS, EC_OPEN_EXISTING); - if (!hSubscription) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcOpenSubscription"); - return false; - } - const auto guard_hSubscription = gsl::finally([hSubscription]() { EcClose(hSubscription); }); - - PEC_VARIANT vProperty = NULL; - std::vector buffer; - if (!getSubscriptionProperty(hSubscription, EcSubscriptionEventSources, 0, buffer, vProperty)) { - return false; - } - - // Ensure that we have obtained handle to the Array Property. - if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarObjectArrayPropertyHandle) { - logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type); - return false; - } - - if (vProperty->Type == EcVarTypeNull) { - LOG_SUBSCRIPTION_ERROR("!hArray"); - return false; - } - - const EC_OBJECT_ARRAY_PROPERTY_HANDLE hArray = vProperty->PropertyHandleVal; - const auto guard_hArray = gsl::finally([hArray]() { EcClose(hArray); }); - - // Get the EventSources array size (number of elements). - DWORD dwEventSourceCount{}; - if (!EcGetObjectArraySize(hArray, &dwEventSourceCount)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArraySize"); - return false; - } - - auto getArrayProperty = [this](EC_OBJECT_ARRAY_PROPERTY_HANDLE hArray, EC_SUBSCRIPTION_PROPERTY_ID propID, DWORD arrayIndex, DWORD flags, std::vector& buffer, PEC_VARIANT& vProperty) -> bool { - buffer.clear(); - buffer.resize(sizeof(EC_VARIANT)); - DWORD dwBufferSizeUsed{}; - if (!EcGetObjectArrayProperty(hArray, propID, arrayIndex, flags, static_cast(buffer.size()), reinterpret_cast(&buffer[0]), &dwBufferSizeUsed)) { - if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) { - buffer.resize(dwBufferSizeUsed); - if (!EcGetObjectArrayProperty(hArray, propID, arrayIndex, flags, static_cast(buffer.size()), reinterpret_cast(&buffer[0]), &dwBufferSizeUsed)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArrayProperty"); - return false; - } - } else { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArrayProperty"); - return false; - } - } - - vProperty = reinterpret_cast(&buffer[0]); - - return true; - }; - - auto getStatus = [this](const std::wstring& eventSource, EC_SUBSCRIPTION_RUNTIME_STATUS_INFO_ID statusInfoID, DWORD flags, std::vector& buffer, PEC_VARIANT& vStatus) -> bool { - buffer.clear(); - buffer.resize(sizeof(EC_VARIANT)); - DWORD dwBufferSize{}; - if (!EcGetSubscriptionRunTimeStatus( - subscription_name_.c_str(), - statusInfoID, - eventSource.c_str(), - flags, - static_cast(buffer.size()), - reinterpret_cast(&buffer[0]), - &dwBufferSize)) { - if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) { - buffer.resize(dwBufferSize); - if (!EcGetSubscriptionRunTimeStatus(subscription_name_.c_str(), - statusInfoID, - eventSource.c_str(), - flags, - static_cast(buffer.size()), - reinterpret_cast(&buffer[0]), - &dwBufferSize)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionRunTimeStatus"); - } - } else { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionRunTimeStatus"); - } - } - - vStatus = reinterpret_cast(&buffer[0]); - - return true; - }; - - for (DWORD i = 0; i < dwEventSourceCount; i++) { - std::vector eventSourceBuffer; - PEC_VARIANT vProperty = NULL; - if (!getArrayProperty(hArray, EcSubscriptionEventSourceAddress, i, 0, eventSourceBuffer, vProperty)) { - return false; - } - - if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarTypeString) { - logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type); - return false; - } - - if (vProperty->Type == EcVarTypeNull) - continue; - - const std::wstring eventSource = vProperty->StringVal; - - if (!getStatus(eventSource.c_str(), EcSubscriptionRunTimeStatusActive, 0, buffer, vProperty)) { - return false; - } - - if (vProperty->Type != EcVarTypeUInt32) { - logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type); - return false; - } - - const auto runtimeStatus = vProperty->UInt32Val; - - std::wstring strRuntimeStatus; - - switch (runtimeStatus) { - case EcRuntimeStatusActiveStatusActive: - strRuntimeStatus = L"Active"; - break; - case EcRuntimeStatusActiveStatusDisabled: - strRuntimeStatus = L"Disabled"; - break; - case EcRuntimeStatusActiveStatusInactive: - strRuntimeStatus = L"Inactive"; - break; - case EcRuntimeStatusActiveStatusTrying: - strRuntimeStatus = L"Trying"; - break; - default: - strRuntimeStatus = L"Unknown"; - break; - } - - // Get Subscription Last Error. - if (!getStatus(eventSource, EcSubscriptionRunTimeStatusLastError, 0, buffer, vProperty)) { - return false; - } - - if (vProperty->Type != EcVarTypeUInt32) { - logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type); - return false; - } - - const auto lastError = vProperty->UInt32Val; - - if (lastError == 0 && (runtimeStatus == EcRuntimeStatusActiveStatusActive || runtimeStatus == EcRuntimeStatusActiveStatusTrying)) { - logger_->log_info("Subscription '{}': status '{}', no error.", utils::to_string(subscription_name_), utils::to_string(strRuntimeStatus)); - return true; - } - - // Obtain the associated Error Message. - if (!getStatus(eventSource, EcSubscriptionRunTimeStatusLastErrorMessage, 0, buffer, vProperty)) { - return false; - } - - if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarTypeString) { - logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type); - return false; - } - - std::wstring lastErrorMessage; - if (vProperty->Type == EcVarTypeString) { - lastErrorMessage = vProperty->StringVal; - } - - logger_->log_error("Runtime status: {}, last error: {}, last error message: {}", - utils::to_string(strRuntimeStatus), - lastError, - utils::to_string(lastErrorMessage)); - - return false; - } - - return true; -} - -bool CollectorInitiatedSubscription::getSubscriptionProperty(EC_HANDLE hSubscription, EC_SUBSCRIPTION_PROPERTY_ID propID, DWORD flags, std::vector& buffer, PEC_VARIANT& vProperty) { - buffer.clear(); - buffer.resize(sizeof(EC_VARIANT)); - DWORD dwBufferSize{}; - if (!EcGetSubscriptionProperty(hSubscription, propID, flags, static_cast(buffer.size()), reinterpret_cast(&buffer[0]), &dwBufferSize)) { - if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) { - buffer.resize(dwBufferSize); - if (!EcGetSubscriptionProperty(hSubscription, propID, flags, static_cast(buffer.size()), reinterpret_cast(&buffer[0]), &dwBufferSize)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionProperty"); - return false; - } - } else { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionProperty"); - return false; - } - } - - vProperty = reinterpret_cast(&buffer[0]); - - return true; -} - -bool CollectorInitiatedSubscription::createSubscription(core::ProcessContext& context) { - // If subcription already exists, delete it. - EC_HANDLE hSubscription = EcOpenSubscription(subscription_name_.c_str(), EC_READ_ACCESS, EC_OPEN_EXISTING); - if (hSubscription) { - EcClose(hSubscription); - if (!EcDeleteSubscription(subscription_name_.c_str(), 0)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcDeleteSubscription"); - return false; - } - } - - // Create subscription. - hSubscription = EcOpenSubscription(subscription_name_.c_str(), EC_READ_ACCESS | EC_WRITE_ACCESS, EC_CREATE_NEW); - if (!hSubscription) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcOpenSubscription"); - return false; - } - const auto guard_hSubscription = gsl::finally([hSubscription]() { EcClose(hSubscription); }); - - struct SubscriptionProperty { - SubscriptionProperty(EC_SUBSCRIPTION_PROPERTY_ID propId, const std::wstring& val) { - propId_ = propId; - - prop_.Type = EcVarTypeString; - prop_.StringVal = val.c_str(); - } - - SubscriptionProperty(EC_SUBSCRIPTION_PROPERTY_ID propId, uint32_t val) { - propId_ = propId; - - prop_.Type = EcVarTypeUInt32; - prop_.UInt32Val = val; - } - - SubscriptionProperty(EC_SUBSCRIPTION_PROPERTY_ID propId, bool val) { - propId_ = propId; - - prop_.Type = EcVarTypeBoolean; - prop_.BooleanVal = val; - } - - EC_SUBSCRIPTION_PROPERTY_ID propId_; - EC_VARIANT prop_; - }; - - const auto subscription_description = utils::to_wstring(context.getProperty(SubscriptionDescription).value()); - const auto source_channels = utils::to_wstring(context.getProperty(SourceChannels).value()); - const auto channel = utils::to_wstring(context.getProperty(Channel).value()); - const auto max_delivery_items = context.getProperty(MaxDeliveryItems).value().getValue(); - const auto delivery_max_latency_time = context.getProperty(DeliveryMaxLatencyTime).value().getMilliseconds().count(); - const auto heartbeat_interval = context.getProperty(HeartbeatInterval).value().getMilliseconds().count(); - const auto source_user_name = utils::to_wstring(context.getProperty(SourceUserName).value()); - const auto source_password = utils::to_wstring(context.getProperty(SourcePassword).value()); - - std::vector listProperty = { - {EcSubscriptionDescription, subscription_description}, - {EcSubscriptionURI, std::wstring(L"http://schemas.microsoft.com/wbem/wsman/1/windows/EventLog")}, - {EcSubscriptionQuery, L""}, - {EcSubscriptionLogFile, channel}, - {EcSubscriptionConfigurationMode, static_cast(EcConfigurationModeCustom)}, - {EcSubscriptionDeliveryMode, static_cast(EcDeliveryModePull)}, - {EcSubscriptionDeliveryMaxItems, static_cast(max_delivery_items)}, - {EcSubscriptionDeliveryMaxLatencyTime, static_cast(delivery_max_latency_time)}, - {EcSubscriptionHeartbeatInterval, static_cast(heartbeat_interval)}, - {EcSubscriptionContentFormat, static_cast(EcContentFormatRenderedText)}, - {EcSubscriptionCredentialsType, static_cast(EcSubscriptionCredDefault)}, - {EcSubscriptionEnabled, true}, - {EcSubscriptionCommonUserName, source_user_name}, - {EcSubscriptionCommonPassword, source_password} - }; - for (auto& prop : listProperty) { - if (!EcSetSubscriptionProperty(hSubscription, prop.propId_, 0, &prop.prop_)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcSetSubscriptionProperty id: " + std::to_string(prop.propId_)); - return false; - } - } - - // Get the EventSources array so a new event source can be added for the specified target. - std::vector buffer; - PEC_VARIANT vProperty = NULL; - if (!getSubscriptionProperty(hSubscription, EcSubscriptionEventSources, 0, buffer, vProperty)) - return false; - - // Event Sources is a collection. Ensure that we have obtained handle to the Array Property. - if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarObjectArrayPropertyHandle) { - logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type); - return false; - } - - if (vProperty->Type == EcVarTypeNull) { - LOG_SUBSCRIPTION_ERROR("!hArray"); - return false; - } - - const EC_OBJECT_ARRAY_PROPERTY_HANDLE hArray = vProperty->PropertyHandleVal; - const auto guard_hArray = gsl::finally([hArray]() { EcClose(hArray); }); - - DWORD dwEventSourceCount{}; - if (!EcGetObjectArraySize(hArray, &dwEventSourceCount)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArraySize"); - return false; - } - - // Add a new EventSource to the EventSources array object. - if (!EcInsertObjectArrayElement(hArray, dwEventSourceCount)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcInsertObjectArrayElement"); - return false; - } - - const auto source_address = utils::to_wstring(context.getProperty(SourceAddress).value()); - for (auto& prop : std::vector{{EcSubscriptionEventSourceAddress, source_address}, {EcSubscriptionEventSourceEnabled, true}}) { - if (!EcSetObjectArrayProperty(hArray, prop.propId_, dwEventSourceCount, 0, &prop.prop_)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcSetObjectArrayProperty id: " + std::to_string(prop.propId_)); - return false; - } - } - - if (!EcSaveSubscription(hSubscription, NULL)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcSaveSubscription"); - return false; - } - - return true; -} - -bool CollectorInitiatedSubscription::subscribe(core::ProcessContext& context) { - logger_->log_debug("CollectorInitiatedSubscription: MaxBufferSize {}", max_buffer_size_.getValue()); - - const auto channel = context.getProperty(Channel).value(); - const auto query = context.getProperty(Query).value(); - provenanceUri_ = "winlog://" + computerName_ + "/" + channel + "?" + query; - - const auto channel_ws = utils::to_wstring(context.getProperty(Channel).value()); - const auto query_ws = utils::to_wstring(context.getProperty(Query).value()); - - const EVT_SUBSCRIBE_CALLBACK callback = [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent) { - auto pCollectorInitiatedSubscription = static_cast(pContext); - - auto& logger = pCollectorInitiatedSubscription->logger_; - - if (action == EvtSubscribeActionError) { - if (ERROR_EVT_QUERY_RESULT_STALE == reinterpret_cast(hEvent)) { - logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size."); - } else { - logger->log_error("Received the following Win32 error: {:#x}", reinterpret_cast(hEvent)); - } - } else if (action == EvtSubscribeActionDeliver) { - DWORD size = 0; - DWORD used = 0; - DWORD propertyCount = 0; - - if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) { - if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) { - if (used > pCollectorInitiatedSubscription->max_buffer_size_.getValue()) { - logger->log_error("Dropping event {} because it couldn't be rendered within {} bytes.", hEvent, pCollectorInitiatedSubscription->max_buffer_size_.getValue()); - return 0UL; - } - - size = used; - std::vector buf(size/2); - if (EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) { - auto xml = utils::to_string(&buf[0]); - - pCollectorInitiatedSubscription->renderedXMLs_.enqueue(std::move(xml)); - } else { - logger->log_error("EvtRender returned the following error code: {}.", GetLastError()); - } - } - } - } - - return 0UL; - }; - - subscriptionHandle_ = EvtSubscribe( - NULL, - NULL, - channel_ws.c_str(), - query_ws.c_str(), - NULL, - this, - callback, - EvtSubscribeToFutureEvents | EvtSubscribeStrict); - - if (!subscriptionHandle_) { - logger_->log_error("Unable to subscribe with provided parameters, received the following error code: {}", GetLastError()); - return false; - } - - lastActivityTimestamp_ = GetTickCount64(); - - return true; -} - -void CollectorInitiatedSubscription::unsubscribe() { - if (subscriptionHandle_) { - EvtClose(subscriptionHandle_); - subscriptionHandle_ = 0; - } -} - -int CollectorInitiatedSubscription::processQueue(core::ProcessSession& session) { - int flowFileCount = 0; - - std::string xml; - while (renderedXMLs_.try_dequeue(xml)) { - auto flowFile = session.create(); - - session.writeBuffer(flowFile, xml); - session.putAttribute(*flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml"); - session.getProvenanceReporter()->receive(*flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms); - session.transfer(flowFile, Success); - - flowFileCount++; - } - - return flowFileCount; -} - -void CollectorInitiatedSubscription::notifyStop() { - if (!EcDeleteSubscription(subscription_name_.c_str(), 0)) { - LOG_SUBSCRIPTION_WINDOWS_ERROR("EcDeleteSubscription"); - } - - unsubscribe(); - - if (renderedXMLs_.size_approx() != 0) { - auto session = sessionFactory_->createSession(); - if (session) { - logger_->log_info("Finishing processing leftover events"); - - processQueue(*session); - } else { - logger_->log_error( - "Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the internal queue. " - "Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, " - "receiving events and stopping before the onTrigger happens. The messages in the internal queue cannot finish processing until " - "the processor is triggered to run."); - } - } -} - -void CollectorInitiatedSubscription::logError(int line, const std::string& error) { - logger_->log_error("Line {}: {}\n", line, error); -} - -void CollectorInitiatedSubscription::logWindowsError(int line, const std::string& info) { - auto last_error = utils::OsUtils::windowsErrorToErrorCode(GetLastError()); - logger_->log_error("Line {}: '{}': error {}: {}", line, info, last_error, last_error.message()); -} - -REGISTER_RESOURCE(CollectorInitiatedSubscription, Processor); - -} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.h b/extensions/windows-event-log/CollectorInitiatedSubscription.h deleted file mode 100644 index d44ae55ed7..0000000000 --- a/extensions/windows-event-log/CollectorInitiatedSubscription.h +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include -#include -#include - -#include "core/Core.h" -#include "FlowFileRecord.h" -#include "concurrentqueue.h" -#include "core/Processor.h" -#include "core/ProcessSession.h" -#include "core/PropertyDefinition.h" -#include "core/PropertyDefinitionBuilder.h" -#include "core/PropertyType.h" -#include "core/RelationshipDefinition.h" - -namespace org::apache::nifi::minifi::processors { - -class CollectorInitiatedSubscription : public core::Processor { - public: - explicit CollectorInitiatedSubscription(const std::string& name, const utils::Identifier& uuid = {}); - virtual ~CollectorInitiatedSubscription() = default; - - EXTENSIONAPI static constexpr const char* Description = "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows."; - - EXTENSIONAPI static constexpr auto SubscriptionName = core::PropertyDefinitionBuilder<>::createProperty("Subscription Name") - .isRequired(true) - .withDescription("The name of the subscription. The value provided for this parameter should be unique within the computer's scope.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto SubscriptionDescription = core::PropertyDefinitionBuilder<>::createProperty("Subscription Description") - .isRequired(true) - .withDescription("A description of the subscription.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto SourceAddress = core::PropertyDefinitionBuilder<>::createProperty("Source Address") - .isRequired(true) - .withDescription("The IP address or fully qualified domain name (FQDN) of the local or remote computer (event source) from which the events are collected.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto SourceUserName = core::PropertyDefinitionBuilder<>::createProperty("Source User Name") - .isRequired(true) - .withDescription("The user name, which is used by the remote computer (event source) to authenticate the user.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto SourcePassword = core::PropertyDefinitionBuilder<>::createProperty("Source Password") - .isRequired(true) - .withDescription("The password, which is used by the remote computer (event source) to authenticate the user.") - .supportsExpressionLanguage(true) - .isSensitive(true) - .build(); - EXTENSIONAPI static constexpr auto SourceChannels = core::PropertyDefinitionBuilder<>::createProperty("Source Channels") - .isRequired(true) - .withDescription("The Windows Event Log Channels (on domain computer(s)) from which events are transferred.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto MaxDeliveryItems = core::PropertyDefinitionBuilder<>::createProperty("Max Delivery Items") - .isRequired(true) - .withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE) - .withDefaultValue("1000") - .withDescription("Determines the maximum number of items that will forwarded from an event source for each request.") - .build(); - EXTENSIONAPI static constexpr auto DeliveryMaxLatencyTime = core::PropertyDefinitionBuilder<>::createProperty("Delivery MaxLatency Time") - .isRequired(true) - .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) - .withDefaultValue("10 min") - .withDescription("How long, in milliseconds, the event source should wait before sending events.") - .build(); - EXTENSIONAPI static constexpr auto HeartbeatInterval = core::PropertyDefinitionBuilder<>::createProperty("Heartbeat Interval") - .isRequired(true) - .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) - .withDefaultValue("10 min") - .withDescription( - "Time interval, in milliseconds, which is observed between the sent heartbeat messages." - " The event collector uses this property to determine the interval between queries to the event source.") - .build(); - EXTENSIONAPI static constexpr auto Channel = core::PropertyDefinitionBuilder<>::createProperty("Channel") - .isRequired(true) - .withDefaultValue("ForwardedEvents") - .withDescription("The Windows Event Log Channel (on local machine) to which events are transferred.") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto Query = core::PropertyDefinitionBuilder<>::createProperty("Query") - .isRequired(true) - .withDefaultValue("*") - .withDescription("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)") - .supportsExpressionLanguage(true) - .build(); - EXTENSIONAPI static constexpr auto MaxBufferSize = core::PropertyDefinitionBuilder<>::createProperty("Max Buffer Size") - .isRequired(true) - .withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE) - .withDefaultValue("1 MB") - .withDescription( - "The individual Event Log XMLs are rendered to a buffer." - " This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)") - .build(); - EXTENSIONAPI static constexpr auto InactiveDurationToReconnect = core::PropertyDefinitionBuilder<>::createProperty("Inactive Duration To Reconnect") - .isRequired(true) - .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) - .withDefaultValue("10 min") - .withDescription( - "If no new event logs are processed for the specified time period," - " this processor will try reconnecting to recover from a state where any further messages cannot be consumed." - " Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned." - " Setting no duration, e.g. '0 ms' disables auto-reconnection.") - .build(); - EXTENSIONAPI static constexpr auto Properties = std::array{ - SubscriptionName, - SubscriptionDescription, - SourceAddress, - SourceUserName, - SourcePassword, - SourceChannels, - MaxDeliveryItems, - DeliveryMaxLatencyTime, - HeartbeatInterval, - Channel, - Query, - MaxBufferSize, - InactiveDurationToReconnect - }; - - - EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Relationship for successfully consumed events."}; - EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; - - EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; - EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; - EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void onScheduleSharedPtr(const std::shared_ptr &context, const std::shared_ptr &sessionFactory) override; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; - void initialize(void) override; - void notifyStop() override; - - protected: - bool createSubscription(core::ProcessContext& context); - bool subscribe(core::ProcessContext& context); - void unsubscribe(); - int processQueue(core::ProcessSession& session); - void logError(int line, const std::string& error); - void logWindowsError(int line, const std::string& info); - void logInvalidSubscriptionPropertyType(int line, DWORD type); - bool getSubscriptionProperty(EC_HANDLE hSubscription, EC_SUBSCRIPTION_PROPERTY_ID propID, DWORD flags, std::vector& buffer, PEC_VARIANT& vProperty); - bool checkSubscriptionRuntimeStatus(); - - private: - std::shared_ptr logger_; - moodycamel::ConcurrentQueue renderedXMLs_; - std::string provenanceUri_; - std::string computerName_; - EVT_HANDLE subscriptionHandle_{}; - uint64_t lastActivityTimestamp_{}; - std::shared_ptr sessionFactory_; - std::wstring subscription_name_; - core::DataSizeValue max_buffer_size_; -}; - -} // namespace org::apache::nifi::minifi::processors diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 299b183ff5..f720733e8d 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -190,14 +190,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state virtual void triggerAndCommit(const std::shared_ptr& context, const std::shared_ptr& session_factory); void trigger(const std::shared_ptr& context, const std::shared_ptr& process_session); - virtual void onTriggerSharedPtr(const std::shared_ptr& context, const std::shared_ptr& session) { - onTrigger(*context, *session); - } virtual void onTrigger(ProcessContext&, ProcessSession&) {} - virtual void onScheduleSharedPtr(const std::shared_ptr& context, const std::shared_ptr& session_factory) { - onSchedule(*context, *session_factory); - } virtual void onSchedule(ProcessContext&, ProcessSessionFactory&) {} // Hook executed when onSchedule fails (throws). Configuration should be reset in this diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 27064d21b3..d0f8a9c284 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -86,7 +86,7 @@ void ThreadedSchedulingAgent::schedule(core::Processor* processor) { auto session_factory = std::make_shared(process_context); - processor->onScheduleSharedPtr(process_context, session_factory); + processor->onSchedule(*process_context, *session_factory); std::vector threads; diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 79d8e43f26..b5cf616e23 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -200,7 +200,7 @@ void Processor::triggerAndCommit(const std::shared_ptr& context, void Processor::trigger(const std::shared_ptr& context, const std::shared_ptr& process_session) { ++metrics_->iterations; const auto start = std::chrono::steady_clock::now(); - onTriggerSharedPtr(context, process_session); + onTrigger(*context, *process_session); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast(std::chrono::steady_clock::now() - start)); } diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index 0cf9efa5e6..b83c1509c4 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -474,7 +474,7 @@ void TestPlan::scheduleProcessor(const std::shared_ptr& // Ordering on factories and list of configured processors do not matter const auto factory = std::make_shared(context); factories_.push_back(factory); - processor->onScheduleSharedPtr(context, factory); + processor->onSchedule(*context, *factory); configured_processors_.push_back(processor); } }