diff --git a/PROCESSORS.md b/PROCESSORS.md
index 293c317e41..f27e13d75e 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -2833,6 +2833,37 @@ In the list below, the names of required properties appear in bold. Any other pr
| success | All files, containing log events, are routed to success |
+## SegmentContent
+
+### Description
+
+Segments a FlowFile into multiple smaller segments on byte boundaries.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|------------------|---------------|------------------|--------------------------------------------------------------------------------------------|
+| **Segment Size** | |
| The maximum data size in bytes for each segment
**Supports Expression Language: true** |
+
+### Relationships
+
+| Name | Description |
+|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| original | The original FlowFile will be sent to this relationship |
+| segments | All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original |
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------|
+| fragment.identifier | | All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute |
+| fragment.index | | A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile |
+| fragment.count | | The number of segments generated from the parent FlowFile |
+| segment.original.filename | | The filename of the parent FlowFile |
+
+
## SplitText
### Description
diff --git a/README.md b/README.md
index f325315e09..d2295fd0cb 100644
--- a/README.md
+++ b/README.md
@@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors:
The following table lists the base set of processors.
-| Extension Set | Processors |
-|---------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[AttributesToJSON](PROCESSORS.md#attributestojson)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[InvokeHTTP](PROCESSORS.md#invokehttp)
[ListenSyslog](PROCESSORS.md#listensyslog)
[ListenTCP](PROCESSORS.md#listentcp)
[ListenUDP](PROCESSORS.md#listenudp)
[ListFile](PROCESSORS.md#listfile)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutTCP](PROCESSORS.md#puttcp)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[SplitText](PROCESSORS.md#splittext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) |
+| Extension Set | Processors |
+|---------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[AttributesToJSON](PROCESSORS.md#attributestojson)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[InvokeHTTP](PROCESSORS.md#invokehttp)
[ListenSyslog](PROCESSORS.md#listensyslog)
[ListenTCP](PROCESSORS.md#listentcp)
[ListenUDP](PROCESSORS.md#listenudp)
[ListFile](PROCESSORS.md#listfile)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutTCP](PROCESSORS.md#puttcp)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[SegmentContent](PROCESSORS.md#segmentcontent)
[SplitText](PROCESSORS.md#splittext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) |
The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as RocksDB ), can be disabled with the respective CMAKE flag on the command line.
diff --git a/extensions/standard-processors/processors/SegmentContent.cpp b/extensions/standard-processors/processors/SegmentContent.cpp
new file mode 100644
index 0000000000..5c37776d86
--- /dev/null
+++ b/extensions/standard-processors/processors/SegmentContent.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SegmentContent.h"
+
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "range/v3/view/split.hpp"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+constexpr size_t BUFFER_TARGET_SIZE = 1024;
+
+void SegmentContent::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+}
+
+void SegmentContent::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) {}
+
+namespace {
+std::shared_ptr createSegment(core::ProcessSession& session) {
+ auto first_split = session.create();
+ if (!first_split) { throw Exception(PROCESSOR_EXCEPTION, "Couldn't create FlowFile"); }
+ return first_split;
+}
+
+void updateSplitAttributesAndTransfer(core::ProcessSession& session, const std::vector>& splits, const core::FlowFile& original) {
+ const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string();
+ for (size_t split_i = 0; split_i < splits.size(); ++split_i) {
+ const auto& split = splits[split_i];
+ split->setAttribute(SegmentContent::FragmentCountOutputAttribute.name, std::to_string(splits.size()));
+ split->setAttribute(SegmentContent::FragmentIndexOutputAttribute.name, std::to_string(split_i + 1)); // One based indexing
+ split->setAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name, fragment_identifier_);
+ split->setAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name, original.getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(""));
+ session.transfer(split, SegmentContent::Segments);
+ }
+}
+} // namespace
+
+void SegmentContent::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+ const auto original = session.get();
+ if (!original) {
+ context.yield();
+ return;
+ }
+
+ size_t max_segment_size{};
+ const auto segment_size_str = context.getProperty(SegmentSize, original.get());
+ if (!segment_size_str || !core::DataSizeValue::StringToInt(*segment_size_str, max_segment_size)) {
+ throw Exception(PROCESSOR_EXCEPTION, fmt::format("Invalid Segment Size {}", segment_size_str));
+ }
+
+ const auto ff_content_stream = session.getFlowFileContentStream(*original);
+ if (!ff_content_stream) {
+ throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the ContentStream of {}", original->getUUID().to_string()));
+ }
+
+ std::vector buffer;
+ std::vector> segments{};
+
+ size_t current_segment_size = 0;
+ segments.push_back(createSegment(session));
+ size_t ret{};
+ bool needs_new_segment = false;
+ while (true) {
+ const size_t segment_remaining_size = max_segment_size - current_segment_size;
+ const size_t buffer_size = std::min(BUFFER_TARGET_SIZE, segment_remaining_size);
+ buffer.resize(buffer_size);
+ ret = ff_content_stream->read(buffer);
+ if (io::isError(ret)) {
+ logger_->log_error("Error while reading from {}", original->getUUID().to_string());
+ break;
+ }
+ if (ret == 0) { // No more data
+ break;
+ }
+ if (needs_new_segment) {
+ segments.push_back(createSegment(session));
+ needs_new_segment = false;
+ }
+ buffer.resize(ret);
+ session.appendBuffer(segments.back(), buffer);
+ current_segment_size += ret;
+ if (current_segment_size >= max_segment_size) { // Defensive >= (read shouldn't read larger than requested size)
+ needs_new_segment = true;
+ current_segment_size = 0;
+ }
+ };
+
+ updateSplitAttributesAndTransfer(session, segments, *original);
+ session.transfer(original, Original);
+}
+
+REGISTER_RESOURCE(SegmentContent, Processor);
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/SegmentContent.h b/extensions/standard-processors/processors/SegmentContent.h
new file mode 100644
index 0000000000..15b6f46b88
--- /dev/null
+++ b/extensions/standard-processors/processors/SegmentContent.h
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include
+#include
+#include
+#include
+
+#include "FlowFileRecord.h"
+#include "core/ProcessSession.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/RelationshipDefinition.h"
+#include "utils/Export.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+class SegmentContent final : public core::Processor {
+ public:
+ explicit SegmentContent(const std::string_view name, const utils::Identifier& uuid = {}) : Processor(name, uuid) {}
+
+ EXTENSIONAPI static constexpr auto Description = "Segments a FlowFile into multiple smaller segments on byte boundaries.";
+
+ EXTENSIONAPI static constexpr auto SegmentSize =
+ core::PropertyDefinitionBuilder<2>::createProperty("Segment Size")
+ .withDescription("The maximum data size in bytes for each segment")
+ .isRequired(true)
+ .supportsExpressionLanguage(true)
+ .build();
+
+ EXTENSIONAPI static constexpr auto Properties = std::to_array({SegmentSize});
+
+ EXTENSIONAPI static constexpr auto Segments = core::RelationshipDefinition{
+ "segments", "All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original"};
+ EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original FlowFile will be sent to this relationship"};
+ EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Segments};
+
+ EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute =
+ core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"};
+ EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute =
+ core::OutputAttributeDefinition<0>{"fragment.index", {}, "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile"};
+ EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of segments generated from the parent FlowFile"};
+ EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The filename of the parent FlowFile"};
+ EXTENSIONAPI static constexpr auto OutputAttributes =
+ std::to_array({FragmentIdentifierOutputAttribute, FragmentIndexOutputAttribute, FragmentCountOutputAttribute, SegmentOriginalFilenameOutputAttribute});
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+ EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
+ void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
+ void initialize() override;
+
+ private:
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_);
+};
+
+} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/SegmentContentTests.cpp b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp
new file mode 100644
index 0000000000..923a3ab271
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp
@@ -0,0 +1,214 @@
+/**
+*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+
+#include "FlowFileRecord.h"
+#include "catch2/generators/catch_generators.hpp"
+#include "processors/SegmentContent.h"
+#include "range/v3/algorithm/equal.hpp"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestBase.h"
+
+namespace org::apache::nifi::minifi::processors::test {
+
+std::vector generateRandomData(const size_t n) {
+ std::vector bytes(n);
+ std::mt19937 gen(std::random_device{}()); // NOLINT (linter wants a space before '{') [whitespace/braces]
+ std::uniform_int_distribution dist;
+
+ for (auto& byte: bytes) {
+ byte = static_cast(dist(gen)); // Assign random byte
+ }
+
+ return bytes;
+}
+
+std::string_view calcExpectedSegment(const std::string_view original_content, size_t segment_i, size_t segment_size) {
+ const auto start_pos = segment_i * segment_size;
+ const auto end_pos = std::min(start_pos + segment_size, original_content.length());
+ const auto actual_size = std::min(segment_size, end_pos - start_pos);
+ return original_content.substr(segment_i * segment_size, std::min(segment_size, actual_size));
+}
+
+std::span calcExpectedSegment(const std::span original_content, size_t segment_i, size_t segment_size) {
+ const auto start_pos = segment_i * segment_size;
+ const auto end_pos = std::min(start_pos + segment_size, original_content.size());
+ const auto actual_size = std::min(segment_size, end_pos - start_pos);
+ return original_content.subspan(segment_i * segment_size, std::min(segment_size, actual_size));
+}
+
+template
+std::vector createByteVector(Bytes... bytes) {
+ return {static_cast(bytes)...};
+}
+
+TEST_CASE("SegmentContent with different sized text input") {
+ const auto segment_content = std::make_shared("SegmentContent");
+ minifi::test::SingleProcessorTestController controller{segment_content};
+
+ auto [original_size, segment_size] = GENERATE(
+ std::make_tuple(size_t{1020}, size_t{30}),
+ std::make_tuple(1020, 31),
+ std::make_tuple(1020, 1),
+ std::make_tuple(2000, 30),
+ std::make_tuple(2000, 1010),
+ std::make_tuple(2000, 1050),
+ std::make_tuple(100, 100),
+ std::make_tuple(99, 100),
+ std::make_tuple(100, 99));
+
+ const std::string original_content = utils::string::repeat("a", original_size);
+
+ segment_content->setProperty(SegmentContent::SegmentSize, std::to_string(segment_size));
+
+ auto trigger_results = controller.trigger(original_content);
+
+ auto original = trigger_results.at(processors::SegmentContent::Original);
+ auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+ auto expected_segment_size = gsl::narrow(std::ceil(static_cast(original_size) / static_cast(segment_size)));
+ REQUIRE(segments.size() == expected_segment_size);
+ REQUIRE(original.size() == 1);
+
+ size_t segment_size_sum = 0;
+ for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) {
+ auto segment_str = controller.plan->getContent(segments[segment_i]);
+ CHECK(segment_str == calcExpectedSegment(original_content, segment_i, segment_size));
+ segment_size_sum += segment_str.length();
+ }
+ CHECK(original_size == segment_size_sum);
+}
+
+TEST_CASE("SegmentContent with different sized byte input") {
+ const auto segment_content = std::make_shared("SegmentContent");
+ minifi::test::SingleProcessorTestController controller{segment_content};
+
+ auto [original_size, segment_size] = GENERATE(
+ std::make_tuple(size_t{1020}, size_t{30}),
+ std::make_tuple(1020, 31),
+ std::make_tuple(1020, 1),
+ std::make_tuple(2000, 30),
+ std::make_tuple(2000, 1010),
+ std::make_tuple(2000, 1050),
+ std::make_tuple(100, 100),
+ std::make_tuple(99, 100),
+ std::make_tuple(100, 99));
+
+ const auto input_data = generateRandomData(original_size);
+ std::string_view input(reinterpret_cast(input_data.data()), input_data.size());
+
+ segment_content->setProperty(SegmentContent::SegmentSize, std::to_string(segment_size));
+
+ auto trigger_results = controller.trigger(input);
+
+ auto original = trigger_results.at(processors::SegmentContent::Original);
+ auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+ auto expected_segment_size = gsl::narrow(std::ceil(static_cast(original_size) / static_cast(segment_size)));
+ REQUIRE(segments.size() == expected_segment_size);
+ REQUIRE(original.size() == 1);
+
+ size_t segment_size_sum = 0;
+ for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) {
+ auto segment_bytes = controller.plan->getContentAsBytes(*segments[segment_i]);
+ CHECK(ranges::equal(segment_bytes, calcExpectedSegment(input_data, segment_i, segment_size)));
+ segment_size_sum += segment_bytes.size();
+ }
+ CHECK(original_size == segment_size_sum);
+}
+
+TEST_CASE("SimpleTest", "[NiFi]") {
+ const auto segment_content = std::make_shared("SegmentContent");
+ minifi::test::SingleProcessorTestController controller{segment_content};
+
+ segment_content->setProperty(SegmentContent::SegmentSize, "4 B");
+
+ const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9);
+ std::string_view input(reinterpret_cast(input_data.data()), input_data.size());
+
+ auto trigger_results = controller.trigger(input);
+
+ auto original = trigger_results.at(processors::SegmentContent::Original);
+ auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+ REQUIRE(segments.size() == 3);
+ REQUIRE(original.size() == 1);
+
+ auto expected_segment_1 = createByteVector(1, 2, 3, 4);
+ auto expected_segment_2 = createByteVector(5, 6, 7, 8);
+ auto expected_segment_3 = createByteVector(9);
+
+ CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data);
+ CHECK(controller.plan->getContentAsBytes(*segments[0]) == expected_segment_1);
+ CHECK(controller.plan->getContentAsBytes(*segments[1]) == expected_segment_2);
+ CHECK(controller.plan->getContentAsBytes(*segments[2]) == expected_segment_3);
+}
+
+TEST_CASE("TransferSmall", "[NiFi]") {
+ const auto segment_content = std::make_shared("SegmentContent");
+ minifi::test::SingleProcessorTestController controller{segment_content};
+
+ segment_content->setProperty(SegmentContent::SegmentSize, "4 KB");
+
+ const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9);
+ std::string_view input(reinterpret_cast(input_data.data()), input_data.size());
+
+ auto trigger_results = controller.trigger(input);
+
+ auto original = trigger_results.at(processors::SegmentContent::Original);
+ auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+ REQUIRE(segments.size() == 1);
+ REQUIRE(original.size() == 1);
+
+ CHECK(controller.plan->getContentAsBytes(*segments[0]) == input_data);
+ CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data);
+}
+
+TEST_CASE("ExpressionLanguageSupport", "[NiFi]") {
+ const auto segment_content = std::make_shared("SegmentContent");
+ minifi::test::SingleProcessorTestController controller{segment_content};
+
+ segment_content->setProperty(SegmentContent::SegmentSize, "${segmentSize}");
+
+ const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9);
+ std::string_view input(reinterpret_cast(input_data.data()), input_data.size());
+
+ std::unordered_map attributes = {};
+ attributes["segmentSize"] = "4 B";
+ auto trigger_results = controller.trigger(input, std::move(attributes));
+
+ auto original = trigger_results.at(processors::SegmentContent::Original);
+ auto segments = trigger_results.at(processors::SegmentContent::Segments);
+
+ REQUIRE(segments.size() == 3);
+ REQUIRE(original.size() == 1);
+
+ auto expected_segment_1 = createByteVector(1, 2, 3, 4);
+ auto expected_segment_2 = createByteVector(5, 6, 7, 8);
+ auto expected_segment_3 = createByteVector(9);
+
+ CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data);
+ CHECK(controller.plan->getContentAsBytes(*segments[0]) == expected_segment_1);
+ CHECK(controller.plan->getContentAsBytes(*segments[1]) == expected_segment_2);
+ CHECK(controller.plan->getContentAsBytes(*segments[2]) == expected_segment_3);
+}
+
+} // namespace org::apache::nifi::minifi::processors::test
diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp
index b83c1509c4..c254b38eaf 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -672,11 +672,20 @@ void TestPlan::validateAnnotations() const {
}
}
+std::vector TestPlan::getContentAsBytes(const core::FlowFile& flow_file) const {
+ const auto content_claim = flow_file.getResourceClaim();
+ const auto content_stream = content_repo_->read(*content_claim);
+ const auto output_stream = std::make_shared();
+ std::ignore = minifi::InputStreamPipe{*output_stream}(content_stream);
+ auto content = output_stream->getBuffer().subspan(flow_file.getOffset(), flow_file.getSize());
+ return ranges::to(content);
+}
+
std::string TestPlan::getContent(const minifi::core::FlowFile& file) const {
- auto content_claim = file.getResourceClaim();
- auto content_stream = content_repo_->read(*content_claim);
- auto output_stream = std::make_shared();
- minifi::InputStreamPipe{*output_stream}(content_stream);
+ const auto content_claim = file.getResourceClaim();
+ const auto content_stream = content_repo_->read(*content_claim);
+ const auto output_stream = std::make_shared();
+ std::ignore = minifi::InputStreamPipe{*output_stream}(content_stream);
return utils::span_to(minifi::utils::as_span(output_stream->getBuffer()).subspan(file.getOffset(), file.getSize()));
}
diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h
index 05826b2b65..94c90e9be1 100644
--- a/libminifi/test/libtest/unit/TestBase.h
+++ b/libminifi/test/libtest/unit/TestBase.h
@@ -285,6 +285,7 @@ class TestPlan {
return state_storage_;
}
+ std::vector getContentAsBytes(const core::FlowFile& flow_file) const;
std::string getContent(const std::shared_ptr& file) const { return getContent(*file); }
std::string getContent(const minifi::core::FlowFile& file) const;