Skip to content

Commit

Permalink
MINIFICPP-2429 Remove onTriggerSharedPtr/onScheduleSharedPtr
Browse files Browse the repository at this point in the history
Closes #1843

Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
martinzink authored and szaszm committed Aug 13, 2024
1 parent 15e9a96 commit 2d3a41b
Show file tree
Hide file tree
Showing 29 changed files with 64 additions and 937 deletions.
34 changes: 0 additions & 34 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ limitations under the License.
- [BinFiles](#BinFiles)
- [CaptureRTSPFrame](#CaptureRTSPFrame)
- [CollectKubernetesPodMetrics](#CollectKubernetesPodMetrics)
- [CollectorInitiatedSubscription](#CollectorInitiatedSubscription)
- [CompressContent](#CompressContent)
- [ConsumeJournald](#ConsumeJournald)
- [ConsumeKafka](#ConsumeKafka)
Expand Down Expand Up @@ -286,39 +285,6 @@ In the list below, the names of required properties appear in bold. Any other pr
| success | All flow files produced are routed to Success. |


## CollectorInitiatedSubscription

### Description

Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|------------------------------------|-----------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Subscription Name** | | | The name of the subscription. The value provided for this parameter should be unique within the computer's scope.<br/>**Supports Expression Language: true** |
| **Subscription Description** | | | A description of the subscription.<br/>**Supports Expression Language: true** |
| **Source Address** | | | The IP address or fully qualified domain name (FQDN) of the local or remote computer (event source) from which the events are collected.<br/>**Supports Expression Language: true** |
| **Source User Name** | | | The user name, which is used by the remote computer (event source) to authenticate the user.<br/>**Supports Expression Language: true** |
| **Source Password** | | | The password, which is used by the remote computer (event source) to authenticate the user.<br/>**Sensitive Property: true**<br/>**Supports Expression Language: true** |
| **Source Channels** | | | The Windows Event Log Channels (on domain computer(s)) from which events are transferred.<br/>**Supports Expression Language: true** |
| **Max Delivery Items** | 1000 | | Determines the maximum number of items that will forwarded from an event source for each request. |
| **Delivery MaxLatency Time** | 10 min | | How long, in milliseconds, the event source should wait before sending events. |
| **Heartbeat Interval** | 10 min | | Time interval, in milliseconds, which is observed between the sent heartbeat messages. The event collector uses this property to determine the interval between queries to the event source. |
| **Channel** | ForwardedEvents | | The Windows Event Log Channel (on local machine) to which events are transferred.<br/>**Supports Expression Language: true** |
| **Query** | * | | XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)<br/>**Supports Expression Language: true** |
| **Max Buffer Size** | 1 MB | | The individual Event Log XMLs are rendered to a buffer. This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.) |
| **Inactive Duration To Reconnect** | 10 min | | If no new event logs are processed for the specified time period, this processor will try reconnecting to recover from a state where any further messages cannot be consumed. Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned. Setting no duration, e.g. '0 ms' disables auto-reconnection. |

### Relationships

| Name | Description |
|---------|------------------------------------------------|
| success | Relationship for successfully consumed events. |


## CompressContent

### Description
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
| SQL | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/> | -DENABLE_SQL=ON |
| Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)<br/>[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus) | -DENABLE_SPLUNK=ON |
| Systemd (Linux) | [ConsumeJournald](PROCESSORS.md#consumejournald) | -DENABLE_SYSTEMD=ON |
| Windows Event Log (Windows) | [CollectorInitiatedSubscription](PROCESSORS.md#collectorinitiatedsubscription)<br/>[ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)<br/>[TailEventLog](PROCESSORS.md#taileventlog) | -DENABLE_WEL=ON |
| Windows Event Log (Windows) | [ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)<br/>[TailEventLog](PROCESSORS.md#taileventlog) | -DENABLE_WEL=ON |

Please see our [Python guide](extensions/python/PYTHON.md) on how to write Python processors and use them within MiNiFi C++.

Expand Down
56 changes: 13 additions & 43 deletions extensions/lua/LuaProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@

namespace org::apache::nifi::minifi::extensions::lua {

LuaProcessSession::LuaProcessSession(std::shared_ptr<core::ProcessSession> session)
: session_(std::move(session)) {
LuaProcessSession::LuaProcessSession(core::ProcessSession& session)
: session_(session) {
}

std::shared_ptr<LuaScriptFlowFile> LuaProcessSession::get() {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

auto flow_file = session_->get();
auto flow_file = session_.get();

if (flow_file == nullptr) {
return nullptr;
Expand All @@ -43,78 +39,58 @@ std::shared_ptr<LuaScriptFlowFile> LuaProcessSession::get() {
return result;
}

void LuaProcessSession::transfer(const std::shared_ptr<LuaScriptFlowFile> &script_flow_file,
void LuaProcessSession::transfer(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file,
const core::Relationship& relationship) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

auto flow_file = script_flow_file->getFlowFile();
const auto flow_file = script_flow_file->getFlowFile();

if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->transfer(flow_file, relationship);
session_.transfer(flow_file, relationship);
}

void LuaProcessSession::read(const std::shared_ptr<LuaScriptFlowFile> &script_flow_file,
sol::table input_stream_callback) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

auto flow_file = script_flow_file->getFlowFile();
const auto flow_file = script_flow_file->getFlowFile();

if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
session_.read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
sol::function callback = input_stream_callback["process"];
return callback(input_stream_callback, std::make_shared<LuaInputStream>(input_stream));
});
}

void LuaProcessSession::write(const std::shared_ptr<LuaScriptFlowFile> &script_flow_file,
sol::table output_stream_callback) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

auto flow_file = script_flow_file->getFlowFile();

if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
session_.write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
sol::function callback = output_stream_callback["process"];
return callback(output_stream_callback, std::make_shared<LuaOutputStream>(output_stream));
});
}

std::shared_ptr<LuaScriptFlowFile> LuaProcessSession::create() {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

auto result = std::make_shared<LuaScriptFlowFile>(session_->create());
auto result = std::make_shared<LuaScriptFlowFile>(session_.create());
flow_files_.push_back(result);
return result;
}

std::shared_ptr<LuaScriptFlowFile> LuaProcessSession::create(const std::shared_ptr<LuaScriptFlowFile> &flow_file) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

std::shared_ptr<LuaScriptFlowFile> result;

if (flow_file == nullptr) {
result = std::make_shared<LuaScriptFlowFile>(session_->create());
result = std::make_shared<LuaScriptFlowFile>(session_.create());
} else {
result = std::make_shared<LuaScriptFlowFile>(session_->create(flow_file->getFlowFile().get()));
result = std::make_shared<LuaScriptFlowFile>(session_.create(flow_file->getFlowFile().get()));
}

flow_files_.push_back(result);
Expand All @@ -127,22 +103,16 @@ void LuaProcessSession::releaseCoreResources() {
flow_file->releaseFlowFile();
}
}

session_.reset();
}

void LuaProcessSession::remove(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file) {
if (!session_) {
throw std::runtime_error("Access of ProcessSession after it has been released");
}

auto flow_file = script_flow_file->getFlowFile();

if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}

session_->remove(flow_file);
session_.remove(flow_file);
}

} // namespace org::apache::nifi::minifi::extensions::lua
14 changes: 7 additions & 7 deletions extensions/lua/LuaProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ namespace org::apache::nifi::minifi::extensions::lua {

class LuaProcessSession {
public:
explicit LuaProcessSession(std::shared_ptr<core::ProcessSession> session);
explicit LuaProcessSession(core::ProcessSession& session);

std::shared_ptr<LuaScriptFlowFile> get();
std::shared_ptr<LuaScriptFlowFile> create();
std::shared_ptr<LuaScriptFlowFile> create(const std::shared_ptr<LuaScriptFlowFile> &flow_file);
void transfer(const std::shared_ptr<LuaScriptFlowFile> &flow_file, const core::Relationship& relationship);
void read(const std::shared_ptr<LuaScriptFlowFile> &script_flow_file, sol::table input_stream_callback);
void write(const std::shared_ptr<LuaScriptFlowFile> &flow_file, sol::table output_stream_callback);
void remove(const std::shared_ptr<LuaScriptFlowFile>& flow_file);
std::shared_ptr<LuaScriptFlowFile> create(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file);
void transfer(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file, const core::Relationship& relationship);
void read(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file, sol::table input_stream_callback);
void write(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file, sol::table output_stream_callback);
void remove(const std::shared_ptr<LuaScriptFlowFile>& script_flow_file);

/**
* Sometimes we want to release shared pointers to core resources when
Expand All @@ -53,7 +53,7 @@ class LuaProcessSession {

private:
std::vector<std::shared_ptr<LuaScriptFlowFile>> flow_files_;
std::shared_ptr<core::ProcessSession> session_;
core::ProcessSession& session_;
};

} // namespace org::apache::nifi::minifi::extensions::lua
3 changes: 1 addition & 2 deletions extensions/lua/LuaScriptEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class TriggerSession {
TriggerSession& operator=(const TriggerSession&) = delete;

~TriggerSession() {
script_context_->releaseProcessContext();
lua_session_->releaseCoreResources();
}

Expand All @@ -141,7 +140,7 @@ class TriggerSession {
};
} // namespace

void LuaScriptEngine::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
void LuaScriptEngine::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto script_context = std::make_shared<LuaScriptProcessContext>(context, lua_);
auto lua_session = std::make_shared<LuaProcessSession>(session);
TriggerSession trigger_session(script_context, lua_session);
Expand Down
2 changes: 1 addition & 1 deletion extensions/lua/LuaScriptEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class LuaScriptEngine {

void eval(const std::string& script);
void evalFile(const std::filesystem::path& file_name);
void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session);
void onTrigger(core::ProcessContext& context, core::ProcessSession& session);
void initialize(const core::Relationship& success, const core::Relationship& failure, const std::shared_ptr<core::logging::Logger>& logger);

void setModulePaths(std::vector<std::filesystem::path> module_paths) {
Expand Down
4 changes: 2 additions & 2 deletions extensions/lua/LuaScriptExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ namespace org::apache::nifi::minifi::extensions::lua {

LuaScriptExecutor::LuaScriptExecutor(std::string_view name, const utils::Identifier& uuid) : script::ScriptExecutor(name, uuid) {}

void LuaScriptExecutor::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
auto lua_script_engine = lua_script_engine_queue_->getResource();
void LuaScriptExecutor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
const auto lua_script_engine = lua_script_engine_queue_->getResource();
gsl_Expects(std::holds_alternative<std::filesystem::path>(script_to_run_) || std::holds_alternative<std::string>(script_to_run_));

if (module_directory_) {
Expand Down
Loading

0 comments on commit 2d3a41b

Please sign in to comment.