Skip to content

Commit

Permalink
MINIFICPP-2159 Add heterogeneous lookup to FlatMap
Browse files Browse the repository at this point in the history
Closes #1612

Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
martinzink authored and szaszm committed Aug 2, 2023
1 parent e579692 commit cafc3b1
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 118 deletions.
2 changes: 1 addition & 1 deletion extensions/aws/processors/FetchS3Object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
});

if (result) {
auto putAttributeIfNotEmpty = [&](const std::string& attribute, const std::string& value) {
auto putAttributeIfNotEmpty = [&](std::string_view attribute, const std::string& value) {
if (!value.empty()) {
session->putAttribute(flow_file, attribute, value);
}
Expand Down
12 changes: 6 additions & 6 deletions extensions/gcp/tests/DeleteGCSObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ TEST_F(DeleteGCSObjectTests, MissingBucket) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
}

Expand All @@ -80,8 +80,8 @@ TEST_F(DeleteGCSObjectTests, ServerGivesPermaError) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
}

Expand All @@ -92,8 +92,8 @@ TEST_F(DeleteGCSObjectTests, ServerGivesTransientErrors) {
const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}});
EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
}

Expand Down
8 changes: 4 additions & 4 deletions extensions/gcp/tests/FetchGCSObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ TEST_F(FetchGCSObjectTests, MissingBucket) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
}

Expand All @@ -91,8 +91,8 @@ TEST_F(FetchGCSObjectTests, ServerError) {
const auto& result = test_controller_.trigger("hello world", {{std::string(minifi_gcp::GCS_BUCKET_ATTR), "bucket-from-attribute"}});
EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
}

TEST_F(FetchGCSObjectTests, HappyPath) {
Expand Down
16 changes: 8 additions & 8 deletions extensions/gcp/tests/PutGCSObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ TEST_F(PutGCSObjectTests, MissingBucket) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}

Expand All @@ -111,8 +111,8 @@ TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}

Expand All @@ -123,8 +123,8 @@ TEST_F(PutGCSObjectTests, ServerGivesPermaError) {
const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_DOMAIN)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(std::string(minifi_gcp::GCS_ERROR_REASON)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0]));
}

Expand Down Expand Up @@ -193,8 +193,8 @@ TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) {
const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(std::string(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(std::string(minifi_gcp::GCS_ENCRYPTION_ALGORITHM_ATTR)));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR));
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_ALGORITHM_ATTR));
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
}

Expand Down
16 changes: 8 additions & 8 deletions extensions/http-curl/processors/InvokeHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>

int64_t http_code = client.getResponseCode();
const char* content_type = client.getContentType();
flow_file->addAttribute(std::string(STATUS_CODE), std::to_string(http_code));
if (!response_headers.empty()) { flow_file->addAttribute(std::string(STATUS_MESSAGE), response_headers.at(0)); }
flow_file->addAttribute(std::string(REQUEST_URL), client.getURL());
flow_file->addAttribute(std::string(TRANSACTION_ID), transaction_id);
flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
flow_file->addAttribute(REQUEST_URL, client.getURL());
flow_file->addAttribute(TRANSACTION_ID, transaction_id);

bool is_success = ((http_code / 100) == 2);

Expand All @@ -307,10 +307,10 @@ void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>
// if content type isn't returned we should return application/octet-stream
// as per RFC 2046 -- 4.5.1
response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
response_flow->addAttribute(std::string(STATUS_CODE), std::to_string(http_code));
if (!response_headers.empty()) { response_flow->addAttribute(std::string(STATUS_MESSAGE), response_headers.at(0)); }
response_flow->addAttribute(std::string(REQUEST_URL), client.getURL());
response_flow->addAttribute(std::string(TRANSACTION_ID), transaction_id);
response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
response_flow->addAttribute(REQUEST_URL, client.getURL());
response_flow->addAttribute(TRANSACTION_ID, transaction_id);
io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
// need an import from the data stream.
session->importFrom(stream, response_flow);
Expand Down
2 changes: 1 addition & 1 deletion extensions/libarchive/CompressContent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
std::string attr;
flowFile->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, attr);
if (attr.empty()) {
logger_->log_error("No %s attribute existed for the flow, route to failure", core::SpecialFlowAttribute::MIME_TYPE);
logger_->log_error("No %s attribute existed for the flow, route to failure", std::string(core::SpecialFlowAttribute::MIME_TYPE));
session->transfer(flowFile, Failure);
return;
}
Expand Down
4 changes: 2 additions & 2 deletions extensions/mqtt/processors/ConsumeMQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& /*c
session->remove(flow_file);
} else {
putUserPropertiesAsAttributes(message, flow_file, session);
session->putAttribute(flow_file, std::string(BrokerOutputAttribute.name), uri_);
session->putAttribute(flow_file, std::string(TopicOutputAttribute.name), message.topic);
session->putAttribute(flow_file, BrokerOutputAttribute.name, uri_);
session->putAttribute(flow_file, TopicOutputAttribute.name, message.topic);
fillAttributeFromContentType(message, flow_file, session);
logger_->log_debug("ConsumeMQTT processing success for the flow with UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
session->transfer(flow_file, Success);
Expand Down
8 changes: 4 additions & 4 deletions extensions/python/types/PyScriptFlowFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ PyObject* PyScriptFlowFile::getAttribute(PyScriptFlowFile* self, PyObject* args)
if (!PyArg_ParseTuple(args, "s", &attribute)) {
throw PyException();
}
return object::returnReference(flow_file->getAttribute(std::string(attribute)).value_or(""));
return object::returnReference(flow_file->getAttribute(attribute).value_or(""));
}

PyObject* PyScriptFlowFile::addAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand All @@ -88,7 +88,7 @@ PyObject* PyScriptFlowFile::addAttribute(PyScriptFlowFile* self, PyObject* args)
throw PyException();
}

return object::returnReference(flow_file->addAttribute(std::string(key), std::string(value)));
return object::returnReference(flow_file->addAttribute(key, std::string(value)));
}

PyObject* PyScriptFlowFile::updateAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand All @@ -104,7 +104,7 @@ PyObject* PyScriptFlowFile::updateAttribute(PyScriptFlowFile* self, PyObject* ar
throw PyException();
}

return object::returnReference(flow_file->updateAttribute(std::string(key), std::string(value)));
return object::returnReference(flow_file->updateAttribute(key, std::string(value)));
}

PyObject* PyScriptFlowFile::removeAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand All @@ -118,7 +118,7 @@ PyObject* PyScriptFlowFile::removeAttribute(PyScriptFlowFile* self, PyObject* ar
if (!PyArg_ParseTuple(args, "s", &attribute)) {
throw PyException();
}
return object::returnReference(flow_file->removeAttribute(std::string(attribute)));
return object::returnReference(flow_file->removeAttribute(attribute));
}

PyObject* PyScriptFlowFile::setAttribute(PyScriptFlowFile* self, PyObject* args) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/standard-processors/tests/unit/GetTCPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace org::apache::nifi::minifi::test {

void check_for_attributes(core::FlowFile& flow_file, uint16_t port) {
const auto local_addresses = {"127.0.0.1:" + std::to_string(port), "::ffff:127.0.0.1:" + std::to_string(port), "::1:" + std::to_string(port)};
CHECK(ranges::contains(local_addresses, flow_file.getAttribute(std::string(GetTCP::SourceEndpoint.name))));
CHECK(ranges::contains(local_addresses, flow_file.getAttribute(GetTCP::SourceEndpoint.name)));
}

minifi::utils::net::SslData createSslDataForServer() {
Expand Down
54 changes: 21 additions & 33 deletions libminifi/include/core/FlowFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
#define LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
#pragma once

#include <map>
#include <memory>
Expand All @@ -34,11 +33,7 @@
#include "utils/FlatMap.h"
#include "utils/Export.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace org::apache::nifi::minifi::core {

class Connectable;

Expand Down Expand Up @@ -125,7 +120,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* Sets the lineage start date
* @param date new lineage start date
*/
void setLineageStartDate(const std::chrono::system_clock::time_point date);
void setLineageStartDate(std::chrono::system_clock::time_point date);

void setLineageIdentifiers(const std::vector<utils::Identifier>& lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
Expand All @@ -137,9 +132,9 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* @param value value to set
* @return result of finding key
*/
bool getAttribute(const std::string& key, std::string& value) const;
bool getAttribute(std::string_view key, std::string& value) const;

[[nodiscard]] std::optional<std::string> getAttribute(const std::string& key) const;
[[nodiscard]] std::optional<std::string> getAttribute(std::string_view key) const;

/**
* Updates the value in the attribute map that corresponds
Expand All @@ -148,14 +143,14 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* @param value value to set to attribute name
* @return result of finding key
*/
bool updateAttribute(std::string key, std::string value);
bool updateAttribute(std::string_view key, const std::string& value);

/**
* Removes the attribute
* @param key attribute name to remove
* @return result of finding key
*/
bool removeAttribute(std::string key);
bool removeAttribute(std::string_view key);

/**
* setAttribute, if attribute already there, update it, else, add it
Expand Down Expand Up @@ -184,7 +179,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* adds an attribute if it does not exist
*
*/
bool addAttribute(const std::string& key, const std::string& value);
bool addAttribute(std::string_view key, const std::string& value);

/**
* Set the size of this record.
Expand Down Expand Up @@ -296,36 +291,29 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
// FlowFile Attribute
struct SpecialFlowAttribute {
// The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename
MINIFIAPI static const std::string PATH;
MINIFIAPI static constexpr std::string_view PATH = "path";
// The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename
MINIFIAPI static const std::string ABSOLUTE_PATH;
MINIFIAPI static constexpr std::string_view ABSOLUTE_PATH = "absolute.path";
// The filename of the FlowFile. The filename should not contain any directory structure.
MINIFIAPI static const std::string FILENAME;
MINIFIAPI static constexpr std::string_view FILENAME = "filename";
// A unique UUID assigned to this FlowFile.
MINIFIAPI static const std::string UUID;
MINIFIAPI static constexpr std::string_view UUID = "uuid";
// A numeric value indicating the FlowFile priority
MINIFIAPI static const std::string priority;
MINIFIAPI static constexpr std::string_view priority = "priority";
// The MIME Type of this FlowFile
MINIFIAPI static const std::string MIME_TYPE;
MINIFIAPI static constexpr std::string_view MIME_TYPE = "mime.type";
// Specifies the reason that a FlowFile is being discarded
MINIFIAPI static const std::string DISCARD_REASON;
MINIFIAPI static constexpr std::string_view DISCARD_REASON = "discard.reason";
// Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
MINIFIAPI static const std::string ALTERNATE_IDENTIFIER;
MINIFIAPI static constexpr std::string_view ALTERNATE_IDENTIFIER = "alternate.identifier";
// Flow identifier
MINIFIAPI static const std::string FLOW_ID;
MINIFIAPI static constexpr std::string_view FLOW_ID = "flow.id";

static const auto& getSpecialFlowAttributes() {
static const std::array<std::string_view, 9> SPECIAL_FLOW_ATTRIBUTES {
PATH, ABSOLUTE_PATH, FILENAME, UUID, priority, MIME_TYPE, DISCARD_REASON, ALTERNATE_IDENTIFIER, FLOW_ID
static constexpr std::array<std::string_view, 9> getSpecialFlowAttributes() {
return {
PATH, ABSOLUTE_PATH, FILENAME, UUID, priority, MIME_TYPE, DISCARD_REASON, ALTERNATE_IDENTIFIER, FLOW_ID
};
return SPECIAL_FLOW_ATTRIBUTES;
}
};

} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org

#endif // LIBMINIFI_INCLUDE_CORE_FLOWFILE_H_
} // namespace org::apache::nifi::minifi::core
4 changes: 2 additions & 2 deletions libminifi/include/core/ProcessSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class ProcessSession : public ReferenceContainer {
// Transfer the FlowFile to the relationship
virtual void transfer(const std::shared_ptr<core::FlowFile>& flow, const Relationship& relationship);
// Put Attribute
void putAttribute(const std::shared_ptr<core::FlowFile>& flow, const std::string& key, const std::string& value);
void putAttribute(const std::shared_ptr<core::FlowFile>& flow, std::string_view key, const std::string& value);
// Remove Attribute
void removeAttribute(const std::shared_ptr<core::FlowFile>& flow, const std::string& key);
void removeAttribute(const std::shared_ptr<core::FlowFile>& flow, std::string_view key);
// Remove Flow File
void remove(const std::shared_ptr<core::FlowFile> &flow);
// Access the contents of the flow file as an input stream; returns null if the flow file has no content claim
Expand Down
Loading

0 comments on commit cafc3b1

Please sign in to comment.