From 9c9d2e59b635a879ab4b4ba459a5a0a33d586f1e Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Thu, 3 Oct 2024 14:58:01 +0200 Subject: [PATCH] MINIFICPP-2471 SegmentContent --- PROCESSORS.md | 31 +++ README.md | 6 +- .../processors/SegmentContent.cpp | 115 ++++++++++ .../processors/SegmentContent.h | 78 +++++++ .../tests/unit/SegmentContentTests.cpp | 214 ++++++++++++++++++ libminifi/test/libtest/unit/TestBase.cpp | 17 +- libminifi/test/libtest/unit/TestBase.h | 1 + 7 files changed, 455 insertions(+), 7 deletions(-) create mode 100644 extensions/standard-processors/processors/SegmentContent.cpp create mode 100644 extensions/standard-processors/processors/SegmentContent.h create mode 100644 extensions/standard-processors/tests/unit/SegmentContentTests.cpp diff --git a/PROCESSORS.md b/PROCESSORS.md index 293c317e41..f27e13d75e 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -2833,6 +2833,37 @@ In the list below, the names of required properties appear in bold. Any other pr | success | All files, containing log events, are routed to success | +## SegmentContent + +### Description + +Segments a FlowFile into multiple smaller segments on byte boundaries. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|------------------|---------------|------------------|--------------------------------------------------------------------------------------------| +| **Segment Size** | |
| The maximum data size in bytes for each segment
**Supports Expression Language: true** | + +### Relationships + +| Name | Description | +|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| original | The original FlowFile will be sent to this relationship | +| segments | All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original | + +### Output Attributes + +| Attribute | Relationship | Description | +|---------------------------|--------------|-------------------------------------------------------------------------------------------------------------------------| +| fragment.identifier | | All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute | +| fragment.index | | A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile | +| fragment.count | | The number of segments generated from the parent FlowFile | +| segment.original.filename | | The filename of the parent FlowFile | + + ## SplitText ### Description diff --git a/README.md b/README.md index f325315e09..d2295fd0cb 100644 --- a/README.md +++ b/README.md @@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors: The following table lists the base set of processors. -| Extension Set | Processors | -|---------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[AttributesToJSON](PROCESSORS.md#attributestojson)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[InvokeHTTP](PROCESSORS.md#invokehttp)
[ListenSyslog](PROCESSORS.md#listensyslog)
[ListenTCP](PROCESSORS.md#listentcp)
[ListenUDP](PROCESSORS.md#listenudp)
[ListFile](PROCESSORS.md#listfile)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutTCP](PROCESSORS.md#puttcp)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[SplitText](PROCESSORS.md#splittext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) | +| Extension Set | Processors | +|---------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[AttributesToJSON](PROCESSORS.md#attributestojson)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[InvokeHTTP](PROCESSORS.md#invokehttp)
[ListenSyslog](PROCESSORS.md#listensyslog)
[ListenTCP](PROCESSORS.md#listentcp)
[ListenUDP](PROCESSORS.md#listenudp)
[ListFile](PROCESSORS.md#listfile)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutTCP](PROCESSORS.md#puttcp)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[SegmentContent](PROCESSORS.md#segmentcontent)
[SplitText](PROCESSORS.md#splittext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) | The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as RocksDB ), can be disabled with the respective CMAKE flag on the command line. diff --git a/extensions/standard-processors/processors/SegmentContent.cpp b/extensions/standard-processors/processors/SegmentContent.cpp new file mode 100644 index 0000000000..5c37776d86 --- /dev/null +++ b/extensions/standard-processors/processors/SegmentContent.cpp @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SegmentContent.h" + +#include "core/FlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "range/v3/view/split.hpp" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::processors { + +constexpr size_t BUFFER_TARGET_SIZE = 1024; + +void SegmentContent::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void SegmentContent::onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) {} + +namespace { +std::shared_ptr createSegment(core::ProcessSession& session) { + auto first_split = session.create(); + if (!first_split) { throw Exception(PROCESSOR_EXCEPTION, "Couldn't create FlowFile"); } + return first_split; +} + +void updateSplitAttributesAndTransfer(core::ProcessSession& session, const std::vector>& splits, const core::FlowFile& original) { + const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string(); + for (size_t split_i = 0; split_i < splits.size(); ++split_i) { + const auto& split = splits[split_i]; + split->setAttribute(SegmentContent::FragmentCountOutputAttribute.name, std::to_string(splits.size())); + split->setAttribute(SegmentContent::FragmentIndexOutputAttribute.name, std::to_string(split_i + 1)); // One based indexing + split->setAttribute(SegmentContent::FragmentIdentifierOutputAttribute.name, fragment_identifier_); + split->setAttribute(SegmentContent::SegmentOriginalFilenameOutputAttribute.name, original.getAttribute(core::SpecialFlowAttribute::FILENAME).value_or("")); + session.transfer(split, SegmentContent::Segments); + } +} +} // namespace + +void SegmentContent::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto original = session.get(); + if (!original) { + context.yield(); + return; + } + + size_t max_segment_size{}; + const auto segment_size_str = context.getProperty(SegmentSize, original.get()); + if (!segment_size_str || !core::DataSizeValue::StringToInt(*segment_size_str, max_segment_size)) { + throw Exception(PROCESSOR_EXCEPTION, fmt::format("Invalid Segment Size {}", segment_size_str)); + } + + const auto ff_content_stream = session.getFlowFileContentStream(*original); + if (!ff_content_stream) { + throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the ContentStream of {}", original->getUUID().to_string())); + } + + std::vector buffer; + std::vector> segments{}; + + size_t current_segment_size = 0; + segments.push_back(createSegment(session)); + size_t ret{}; + bool needs_new_segment = false; + while (true) { + const size_t segment_remaining_size = max_segment_size - current_segment_size; + const size_t buffer_size = std::min(BUFFER_TARGET_SIZE, segment_remaining_size); + buffer.resize(buffer_size); + ret = ff_content_stream->read(buffer); + if (io::isError(ret)) { + logger_->log_error("Error while reading from {}", original->getUUID().to_string()); + break; + } + if (ret == 0) { // No more data + break; + } + if (needs_new_segment) { + segments.push_back(createSegment(session)); + needs_new_segment = false; + } + buffer.resize(ret); + session.appendBuffer(segments.back(), buffer); + current_segment_size += ret; + if (current_segment_size >= max_segment_size) { // Defensive >= (read shouldn't read larger than requested size) + needs_new_segment = true; + current_segment_size = 0; + } + }; + + updateSplitAttributesAndTransfer(session, segments, *original); + session.transfer(original, Original); +} + +REGISTER_RESOURCE(SegmentContent, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/SegmentContent.h b/extensions/standard-processors/processors/SegmentContent.h new file mode 100644 index 0000000000..15b6f46b88 --- /dev/null +++ b/extensions/standard-processors/processors/SegmentContent.h @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "FlowFileRecord.h" +#include "core/ProcessSession.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/RelationshipDefinition.h" +#include "utils/Export.h" + +namespace org::apache::nifi::minifi::processors { + +class SegmentContent final : public core::Processor { + public: + explicit SegmentContent(const std::string_view name, const utils::Identifier& uuid = {}) : Processor(name, uuid) {} + + EXTENSIONAPI static constexpr auto Description = "Segments a FlowFile into multiple smaller segments on byte boundaries."; + + EXTENSIONAPI static constexpr auto SegmentSize = + core::PropertyDefinitionBuilder<2>::createProperty("Segment Size") + .withDescription("The maximum data size in bytes for each segment") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array({SegmentSize}); + + EXTENSIONAPI static constexpr auto Segments = core::RelationshipDefinition{ + "segments", "All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original"}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original FlowFile will be sent to this relationship"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Segments}; + + EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"}; + EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.index", {}, "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile"}; + EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of segments generated from the parent FlowFile"}; + EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The filename of the parent FlowFile"}; + EXTENSIONAPI static constexpr auto OutputAttributes = + std::to_array({FragmentIdentifierOutputAttribute, FragmentIndexOutputAttribute, FragmentCountOutputAttribute, SegmentOriginalFilenameOutputAttribute}); + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + private: + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_); +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/SegmentContentTests.cpp b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp new file mode 100644 index 0000000000..923a3ab271 --- /dev/null +++ b/extensions/standard-processors/tests/unit/SegmentContentTests.cpp @@ -0,0 +1,214 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "FlowFileRecord.h" +#include "catch2/generators/catch_generators.hpp" +#include "processors/SegmentContent.h" +#include "range/v3/algorithm/equal.hpp" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::processors::test { + +std::vector generateRandomData(const size_t n) { + std::vector bytes(n); + std::mt19937 gen(std::random_device{}()); // NOLINT (linter wants a space before '{') [whitespace/braces] + std::uniform_int_distribution dist; + + for (auto& byte: bytes) { + byte = static_cast(dist(gen)); // Assign random byte + } + + return bytes; +} + +std::string_view calcExpectedSegment(const std::string_view original_content, size_t segment_i, size_t segment_size) { + const auto start_pos = segment_i * segment_size; + const auto end_pos = std::min(start_pos + segment_size, original_content.length()); + const auto actual_size = std::min(segment_size, end_pos - start_pos); + return original_content.substr(segment_i * segment_size, std::min(segment_size, actual_size)); +} + +std::span calcExpectedSegment(const std::span original_content, size_t segment_i, size_t segment_size) { + const auto start_pos = segment_i * segment_size; + const auto end_pos = std::min(start_pos + segment_size, original_content.size()); + const auto actual_size = std::min(segment_size, end_pos - start_pos); + return original_content.subspan(segment_i * segment_size, std::min(segment_size, actual_size)); +} + +template +std::vector createByteVector(Bytes... bytes) { + return {static_cast(bytes)...}; +} + +TEST_CASE("SegmentContent with different sized text input") { + const auto segment_content = std::make_shared("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + auto [original_size, segment_size] = GENERATE( + std::make_tuple(size_t{1020}, size_t{30}), + std::make_tuple(1020, 31), + std::make_tuple(1020, 1), + std::make_tuple(2000, 30), + std::make_tuple(2000, 1010), + std::make_tuple(2000, 1050), + std::make_tuple(100, 100), + std::make_tuple(99, 100), + std::make_tuple(100, 99)); + + const std::string original_content = utils::string::repeat("a", original_size); + + segment_content->setProperty(SegmentContent::SegmentSize, std::to_string(segment_size)); + + auto trigger_results = controller.trigger(original_content); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + auto expected_segment_size = gsl::narrow(std::ceil(static_cast(original_size) / static_cast(segment_size))); + REQUIRE(segments.size() == expected_segment_size); + REQUIRE(original.size() == 1); + + size_t segment_size_sum = 0; + for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) { + auto segment_str = controller.plan->getContent(segments[segment_i]); + CHECK(segment_str == calcExpectedSegment(original_content, segment_i, segment_size)); + segment_size_sum += segment_str.length(); + } + CHECK(original_size == segment_size_sum); +} + +TEST_CASE("SegmentContent with different sized byte input") { + const auto segment_content = std::make_shared("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + auto [original_size, segment_size] = GENERATE( + std::make_tuple(size_t{1020}, size_t{30}), + std::make_tuple(1020, 31), + std::make_tuple(1020, 1), + std::make_tuple(2000, 30), + std::make_tuple(2000, 1010), + std::make_tuple(2000, 1050), + std::make_tuple(100, 100), + std::make_tuple(99, 100), + std::make_tuple(100, 99)); + + const auto input_data = generateRandomData(original_size); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + segment_content->setProperty(SegmentContent::SegmentSize, std::to_string(segment_size)); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + auto expected_segment_size = gsl::narrow(std::ceil(static_cast(original_size) / static_cast(segment_size))); + REQUIRE(segments.size() == expected_segment_size); + REQUIRE(original.size() == 1); + + size_t segment_size_sum = 0; + for (size_t segment_i = 0; segment_i < expected_segment_size; ++segment_i) { + auto segment_bytes = controller.plan->getContentAsBytes(*segments[segment_i]); + CHECK(ranges::equal(segment_bytes, calcExpectedSegment(input_data, segment_i, segment_size))); + segment_size_sum += segment_bytes.size(); + } + CHECK(original_size == segment_size_sum); +} + +TEST_CASE("SimpleTest", "[NiFi]") { + const auto segment_content = std::make_shared("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + segment_content->setProperty(SegmentContent::SegmentSize, "4 B"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(segments.size() == 3); + REQUIRE(original.size() == 1); + + auto expected_segment_1 = createByteVector(1, 2, 3, 4); + auto expected_segment_2 = createByteVector(5, 6, 7, 8); + auto expected_segment_3 = createByteVector(9); + + CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data); + CHECK(controller.plan->getContentAsBytes(*segments[0]) == expected_segment_1); + CHECK(controller.plan->getContentAsBytes(*segments[1]) == expected_segment_2); + CHECK(controller.plan->getContentAsBytes(*segments[2]) == expected_segment_3); +} + +TEST_CASE("TransferSmall", "[NiFi]") { + const auto segment_content = std::make_shared("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + segment_content->setProperty(SegmentContent::SegmentSize, "4 KB"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(segments.size() == 1); + REQUIRE(original.size() == 1); + + CHECK(controller.plan->getContentAsBytes(*segments[0]) == input_data); + CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data); +} + +TEST_CASE("ExpressionLanguageSupport", "[NiFi]") { + const auto segment_content = std::make_shared("SegmentContent"); + minifi::test::SingleProcessorTestController controller{segment_content}; + + segment_content->setProperty(SegmentContent::SegmentSize, "${segmentSize}"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 6, 7, 8, 9); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + std::unordered_map attributes = {}; + attributes["segmentSize"] = "4 B"; + auto trigger_results = controller.trigger(input, std::move(attributes)); + + auto original = trigger_results.at(processors::SegmentContent::Original); + auto segments = trigger_results.at(processors::SegmentContent::Segments); + + REQUIRE(segments.size() == 3); + REQUIRE(original.size() == 1); + + auto expected_segment_1 = createByteVector(1, 2, 3, 4); + auto expected_segment_2 = createByteVector(5, 6, 7, 8); + auto expected_segment_3 = createByteVector(9); + + CHECK(controller.plan->getContentAsBytes(*original[0]) == input_data); + CHECK(controller.plan->getContentAsBytes(*segments[0]) == expected_segment_1); + CHECK(controller.plan->getContentAsBytes(*segments[1]) == expected_segment_2); + CHECK(controller.plan->getContentAsBytes(*segments[2]) == expected_segment_3); +} + +} // namespace org::apache::nifi::minifi::processors::test diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index b83c1509c4..c254b38eaf 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -672,11 +672,20 @@ void TestPlan::validateAnnotations() const { } } +std::vector TestPlan::getContentAsBytes(const core::FlowFile& flow_file) const { + const auto content_claim = flow_file.getResourceClaim(); + const auto content_stream = content_repo_->read(*content_claim); + const auto output_stream = std::make_shared(); + std::ignore = minifi::InputStreamPipe{*output_stream}(content_stream); + auto content = output_stream->getBuffer().subspan(flow_file.getOffset(), flow_file.getSize()); + return ranges::to(content); +} + std::string TestPlan::getContent(const minifi::core::FlowFile& file) const { - auto content_claim = file.getResourceClaim(); - auto content_stream = content_repo_->read(*content_claim); - auto output_stream = std::make_shared(); - minifi::InputStreamPipe{*output_stream}(content_stream); + const auto content_claim = file.getResourceClaim(); + const auto content_stream = content_repo_->read(*content_claim); + const auto output_stream = std::make_shared(); + std::ignore = minifi::InputStreamPipe{*output_stream}(content_stream); return utils::span_to(minifi::utils::as_span(output_stream->getBuffer()).subspan(file.getOffset(), file.getSize())); } diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index 05826b2b65..94c90e9be1 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h @@ -285,6 +285,7 @@ class TestPlan { return state_storage_; } + std::vector getContentAsBytes(const core::FlowFile& flow_file) const; std::string getContent(const std::shared_ptr& file) const { return getContent(*file); } std::string getContent(const minifi::core::FlowFile& file) const;