Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2159 Add heterogeneous lookup to FlatMap #1612

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions/aws/processors/FetchS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
});

if (result) {
auto putAttributeIfNotEmpty = [&](const std::string& attribute, const std::string& value) {
auto putAttributeIfNotEmpty = [&](std::string_view attribute, const std::string& value) {
if (!value.empty()) {
session->putAttribute(flow_file, attribute, value);
}
Expand Down
12 changes: 6 additions & 6 deletions extensions/gcp/tests/DeleteGCSObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ TEST_F(DeleteGCSObjectTests, MissingBucket) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
}

Expand All @@ -80,8 +80,8 @@ TEST_F(DeleteGCSObjectTests, ServerGivesPermaError) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
}

Expand All @@ -92,8 +92,8 @@ TEST_F(DeleteGCSObjectTests, ServerGivesTransientErrors) {
const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}});
EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
}

Expand Down
8 changes: 4 additions & 4 deletions extensions/gcp/tests/FetchGCSObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ TEST_F(FetchGCSObjectTests, MissingBucket) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
}

Expand All @@ -91,8 +91,8 @@ TEST_F(FetchGCSObjectTests, ServerError) {
const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}});
EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
}

TEST_F(FetchGCSObjectTests, HappyPath) {
Expand Down
16 changes: 8 additions & 8 deletions extensions/gcp/tests/PutGCSObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ TEST_F(PutGCSObjectTests, MissingBucket) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}

Expand All @@ -111,8 +111,8 @@ TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}

Expand All @@ -123,8 +123,8 @@ TEST_F(PutGCSObjectTests, ServerGivesPermaError) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}

Expand Down Expand Up @@ -193,8 +193,8 @@ TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) {
const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(std::string(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(std::string(minifi_gcp::GCS_ENCRYPTION_ALGORITHM_ATTR)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_ALGORITHM_ATTR));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}

Expand Down
16 changes: 8 additions & 8 deletions extensions/http-curl/processors/InvokeHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>

int64_t http_code = client.getResponseCode();
const char* content_type = client.getContentType();
flow_file->addAttribute(std::string(STATUS_CODE), std::to_string(http_code));
if (!response_headers.empty()) { flow_file->addAttribute(std::string(STATUS_MESSAGE), response_headers.at(0)); }
flow_file->addAttribute(std::string(REQUEST_URL), client.getURL());
flow_file->addAttribute(std::string(TRANSACTION_ID), transaction_id);
flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
flow_file->addAttribute(REQUEST_URL, client.getURL());
flow_file->addAttribute(TRANSACTION_ID, transaction_id);

bool is_success = ((http_code / 100) == 2);

Expand All @@ -307,10 +307,10 @@ void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>
// if content type isn't returned we should return application/octet-stream
// as per RFC 2046 -- 4.5.1
response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
response_flow->addAttribute(std::string(STATUS_CODE), std::to_string(http_code));
if (!response_headers.empty()) { response_flow->addAttribute(std::string(STATUS_MESSAGE), response_headers.at(0)); }
response_flow->addAttribute(std::string(REQUEST_URL), client.getURL());
response_flow->addAttribute(std::string(TRANSACTION_ID), transaction_id);
response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
response_flow->addAttribute(REQUEST_URL, client.getURL());
response_flow->addAttribute(TRANSACTION_ID, transaction_id);
io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
// need an import from the data stream.
session->importFrom(stream, response_flow);
Expand Down
2 changes: 1 addition & 1 deletion extensions/libarchive/CompressContent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
std::string attr;
flowFile->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, attr);
if (attr.empty()) {
logger_->log_error("No %s attribute existed for the flow, route to failure", core::SpecialFlowAttribute::MIME_TYPE);
logger_->log_error("No %s attribute existed for the flow, route to failure", std::string(core::SpecialFlowAttribute::MIME_TYPE));
session->transfer(flowFile, Failure);
return;
}
Expand Down
4 changes: 2 additions & 2 deletions extensions/mqtt/processors/ConsumeMQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*c
session->remove(flow_file);
} else {
putUserPropertiesAsAttributes(message, flow_file, session);
session->putAttribute(flow_file, std::string(BrokerOutputAttribute.name), uri_);
session->putAttribute(flow_file, std::string(TopicOutputAttribute.name), message.topic);
session->putAttribute(flow_file, BrokerOutputAttribute.name, uri_);
session->putAttribute(flow_file, TopicOutputAttribute.name, message.topic);
fillAttributeFromContentType(message, flow_file, session);
logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
session->transfer(flow_file, Success);
Expand Down
8 changes: 4 additions & 4 deletions extensions/python/types/PyScriptFlowFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ PyObject* PyScriptFlowFile::getAttribute(PyScriptFlowFile* self, PyObject* args)
if (!PyArg_ParseTuple(args, "s", &attribute)) {
throw PyException();
}
return object::returnReference(flow_file->getAttribute(std::string(attribute)).value_or(""));
return object::returnReference(flow_file->getAttribute(attribute).value_or(""));
}

PyObject* PyScriptFlowFile::addAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand All @@ -88,7 +88,7 @@ PyObject* PyScriptFlowFile::addAttribute(PyScriptFlowFile* self, PyObject* args)
throw PyException();
}

return object::returnReference(flow_file->addAttribute(std::string(key), std::string(value)));
return object::returnReference(flow_file->addAttribute(key, std::string(value)));
}

PyObject* PyScriptFlowFile::updateAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand All @@ -104,7 +104,7 @@ PyObject* PyScriptFlowFile::updateAttribute(PyScriptFlowFile* self, PyObject* ar
throw PyException();
}

return object::returnReference(flow_file->updateAttribute(std::string(key), std::string(value)));
return object::returnReference(flow_file->updateAttribute(key, std::string(value)));
}

PyObject* PyScriptFlowFile::removeAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand All @@ -118,7 +118,7 @@ PyObject* PyScriptFlowFile::removeAttribute(PyScriptFlowFile* self, PyObject* ar
if (!PyArg_ParseTuple(args, "s", &attribute)) {
throw PyException();
}
return object::returnReference(flow_file->removeAttribute(std::string(attribute)));
return object::returnReference(flow_file->removeAttribute(attribute));
}

PyObject* PyScriptFlowFile::setAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/standard-processors/tests/unit/GetTCPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace org::apache::nifi::minifi::test {

void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
const auto local_addresses = {"127.0.0.1:" + std::to_string(port), "::ffff:127.0.0.1:" + std::to_string(port), "::1:" + std::to_string(port)};
CHECK(ranges::contains(local_addresses, flow_file.getAttribute(std::string(GetTCP::SourceEndpoint.name))));
CHECK(ranges::contains(local_addresses, flow_file.getAttribute(GetTCP::SourceEndpoint.name)));
}

minifi::utils::net::SslData createSslDataForServer() {
Expand Down
54 changes: 21 additions & 33 deletions libminifi/include/core/FlowFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
#define LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
#pragma once

#include <map>
#include <memory>
Expand All @@ -34,11 +33,7 @@
#include "utils/FlatMap.h"
#include "utils/Export.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace org::apache::nifi::minifi::core {

class Connectable;

Expand Down Expand Up @@ -125,7 +120,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* Sets the lineage start date
* @param date new lineage start date
*/
void setLineageStartDate(const std::chrono::system_clock::time_point date);
void setLineageStartDate(std::chrono::system_clock::time_point date);

void setLineageIdentifiers(const std::vector<utils::Identifier>& lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
Expand All @@ -137,9 +132,9 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* @param value value to set
* @return result of finding key
*/
bool getAttribute(const std::string& key, std::string& value) const;
bool getAttribute(std::string_view key, std::string& value) const;

[[nodiscard]] std::optional<std::string> getAttribute(const std::string& key) const;
[[nodiscard]] std::optional<std::string> getAttribute(std::string_view key) const;

/**
* Updates the value in the attribute map that corresponds
Expand All @@ -148,14 +143,14 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* @param value value to set to attribute name
* @return result of finding key
*/
bool updateAttribute(std::string key, std::string value);
bool updateAttribute(std::string_view key, const std::string& value);

/**
* Removes the attribute
* @param key attribute name to remove
* @return result of finding key
*/
bool removeAttribute(std::string key);
bool removeAttribute(std::string_view key);

/**
* setAttribute, if attribute already there, update it, else, add it
Expand Down Expand Up @@ -184,7 +179,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* adds an attribute if it does not exist
*
*/
bool addAttribute(const std::string& key, const std::string& value);
bool addAttribute(std::string_view key, const std::string& value);

/**
* Set the size of this record.
Expand Down Expand Up @@ -296,36 +291,29 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
// FlowFile Attribute
struct SpecialFlowAttribute {
// The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
MINIFIAPI static const std::string PATH;
MINIFIAPI static constexpr std::string_view PATH = "path";
// The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
MINIFIAPI static const std::string ABSOLUTE_PATH;
MINIFIAPI static constexpr std::string_view ABSOLUTE_PATH = "absolute.path";
// The filename of the FlowFile. The filename should not contain any directory structure.
MINIFIAPI static const std::string FILENAME;
MINIFIAPI static constexpr std::string_view FILENAME = "filename";
// A unique UUID assigned to this FlowFile.
MINIFIAPI static const std::string UUID;
MINIFIAPI static constexpr std::string_view UUID = "uuid";
// A numeric value indicating the FlowFile priority
MINIFIAPI static const std::string priority;
MINIFIAPI static constexpr std::string_view priority = "priority";
// The MIME Type of this FlowFile
MINIFIAPI static const std::string MIME_TYPE;
MINIFIAPI static constexpr std::string_view MIME_TYPE = "mime.type";
// Specifies the reason that a FlowFile is being discarded
MINIFIAPI static const std::string DISCARD_REASON;
MINIFIAPI static constexpr std::string_view DISCARD_REASON = "discard.reason";
// Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
MINIFIAPI static const std::string ALTERNATE_IDENTIFIER;
MINIFIAPI static constexpr std::string_view ALTERNATE_IDENTIFIER = "alternate.identifier";
// Flow identifier
MINIFIAPI static const std::string FLOW_ID;
MINIFIAPI static constexpr std::string_view FLOW_ID = "flow.id";

static const auto& getSpecialFlowAttributes() {
static const std::array<std::string_view, 9> SPECIAL_FLOW_ATTRIBUTES {
PATH, ABSOLUTE_PATH, FILENAME, UUID, priority, MIME_TYPE, DISCARD_REASON, ALTERNATE_IDENTIFIER, FLOW_ID
static constexpr std::array<std::string_view, 9> getSpecialFlowAttributes() {
return {
PATH, ABSOLUTE_PATH, FILENAME, UUID, priority, MIME_TYPE, DISCARD_REASON, ALTERNATE_IDENTIFIER, FLOW_ID
};
return SPECIAL_FLOW_ATTRIBUTES;
}
};

} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org

#endif // LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
} // namespace org::apache::nifi::minifi::core
4 changes: 2 additions & 2 deletions libminifi/include/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class ProcessSession : public ReferenceContainer {
// Transfer the FlowFile to the relationship
virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship);
// Put Attribute
void putAttribute(const std::shared_ptr<core::FlowFile>& flow, const std::string& key, const std::string& value);
void putAttribute(const std::shared_ptr<core::FlowFile>& flow, std::string_view key, const std::string& value);
// Remove Attribute
void removeAttribute(const std::shared_ptr<core::FlowFile>& flow, const std::string& key);
void removeAttribute(const std::shared_ptr<core::FlowFile>& flow, std::string_view key);
// Remove Flow File
void remove(const std::shared_ptr<core::FlowFile> &flow);
// Access the contents of the flow file as an input stream; returns null if the flow file has no content claim
Expand Down
Loading
Loading