Skip to content

Commit

Permalink
MINIFICPP-2454 Create SplitRecord processor
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Sep 12, 2024
1 parent 10911ec commit 96dc9e3
Show file tree
Hide file tree
Showing 4 changed files with 340 additions and 0 deletions.
36 changes: 36 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ limitations under the License.
- [RetryFlowFile](#RetryFlowFile)
- [RouteOnAttribute](#RouteOnAttribute)
- [RouteText](#RouteText)
- [SplitRecord](#SplitRecord)
- [SplitText](#SplitText)
- [TailEventLog](#TailEventLog)
- [TailFile](#TailFile)
Expand Down Expand Up @@ -2833,6 +2834,41 @@ In the list below, the names of required properties appear in bold. Any other pr
| success | All files, containing log events, are routed to success |


## SplitRecord

### Description

Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|-----------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------|
| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data |
| **Record Writer** | | | Specifies the Controller Service to use for writing out the records |
| **Records Per Split** | | | Specifies how many records should be written to each 'split' or 'segment' FlowFile<br/>**Supports Expression Language: true** |

### Relationships

| Name | Description |
|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship. |
| original | Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship. |
| splits | The individual 'segments' of the original FlowFile will be routed to this relationship. |

### Output Attributes

| Attribute | Relationship | Description |
|---------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------|
| record.count | | The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship. |
| fragment.identifier | | All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute |
| fragment.index | | A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile |
| fragment.count | | The number of split FlowFiles generated from the parent FlowFile |
| segment.original.filename | | The filename of the parent FlowFile |


## SplitText

### Description
Expand Down
125 changes: 125 additions & 0 deletions extensions/standard-processors/processors/SplitRecord.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SplitRecord.h"

#include "core/Resource.h"
#include "nonstd/expected.hpp"

namespace org::apache::nifi::minifi::processors {
namespace {
template<typename RecordSetIO>
std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property) {
std::string service_name;
if (context.getProperty(property, service_name) && !IsNullOrEmpty(service_name)) {
auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name));
if (!record_set_io)
return nullptr;
return record_set_io;
}
return nullptr;
}
} // namespace

SplitRecord::SplitRecord(std::string_view name, const utils::Identifier& uuid)
: Processor(name, uuid), logger_{core::logging::LoggerFactory<SplitRecord>::getLogger(uuid)} {}

void SplitRecord::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}

void SplitRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
record_set_reader_ = getRecordSetIO<core::RecordSetReader>(context, SplitRecord::RecordReader);
if (!record_set_reader_) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader set is missing or invalid");
}
record_set_writer_ = getRecordSetIO<core::RecordSetWriter>(context, SplitRecord::RecordWriter);
if (!record_set_writer_) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer set is missing or invalid");
}
}

nonstd::expected<std::size_t, std::string> SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& original_flow_file) {
std::string value;
std::size_t records_per_split = 0;
if (context.getProperty(RecordsPerSplit, value, original_flow_file.get())) {
if (!core::Property::StringToInt(value, records_per_split)) {
return nonstd::make_unexpected("Failed to convert Records Per Split property to an integer");
} else if (records_per_split < 1) {
return nonstd::make_unexpected("Records per split should be set to a number larger than 0");
}
} else {
return nonstd::make_unexpected("Records per split should be set to a valid number larger than 0");
}
return records_per_split;
}

void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
const auto original_flow_file = session.get();
if (!original_flow_file) {
yield();
return;
}

auto records_per_split = readRecordsPerSplit(context, original_flow_file);
if (!records_per_split) {
logger_->log_error("Failed to read Records Per Split property: {}", records_per_split.error());
session.transfer(original_flow_file, Failure);
return;
}

auto record_set = record_set_reader_->read(original_flow_file, session);
if (!record_set) {
logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message());
session.transfer(original_flow_file, Failure);
return;
}

std::size_t current_index = 0;
const auto fragment_identifier = utils::IdGenerator::getIdGenerator()->generate().to_string();
std::size_t fragment_index = 0;
std::size_t fragment_count = record_set->size() / records_per_split.value() + (record_set->size() % records_per_split.value() == 0 ? 0 : 1);
while (current_index < record_set->size()) {
auto split_flow_file = session.create(original_flow_file.get());
if (!split_flow_file) {
logger_->log_error("Failed to create a new flow file for record set");
session.transfer(original_flow_file, Failure);
return;
}

core::RecordSet slice_record_set;
for (std::size_t i = 0; i < records_per_split.value() && current_index < record_set->size(); ++i, ++current_index) {
slice_record_set.push_back(std::move(record_set->at(current_index)));
}

split_flow_file->setAttribute("record.count", std::to_string(slice_record_set.size()));
split_flow_file->setAttribute("fragment.identifier", fragment_identifier);
split_flow_file->setAttribute("fragment.index", std::to_string(fragment_index));
split_flow_file->setAttribute("fragment.count", std::to_string(fragment_count));
split_flow_file->setAttribute("segment.original.filename", original_flow_file->getAttribute("filename").value_or(""));

record_set_writer_->write(slice_record_set, split_flow_file, session);
session.transfer(split_flow_file, Splits);
++fragment_index;
}

session.transfer(original_flow_file, Original);
}

REGISTER_RESOURCE(SplitRecord, Processor);

} // namespace org::apache::nifi::minifi::processors
88 changes: 88 additions & 0 deletions extensions/standard-processors/processors/SplitRecord.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "core/Annotation.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessSessionFactory.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/RelationshipDefinition.h"
#include "core/logging/Logger.h"
#include "controllers/RecordSetReader.h"
#include "controllers/RecordSetWriter.h"

namespace org::apache::nifi::minifi::processors {

class SplitRecord : public core::Processor {
public:
EXTENSIONAPI static constexpr const char* Description = "Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles";

EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
.withDescription("Specifies the Controller Service to use for reading incoming data")
.isRequired(true)
.withAllowedTypes<minifi::core::RecordSetReader>()
.build();
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
.withDescription("Specifies the Controller Service to use for writing out the records")
.isRequired(true)
.withAllowedTypes<minifi::core::RecordSetWriter>()
.build();
EXTENSIONAPI static constexpr auto RecordsPerSplit = core::PropertyDefinitionBuilder<>::createProperty("Records Per Split")
.withDescription("Specifies how many records should be written to each 'split' or 'segment' FlowFile")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
RecordReader,
RecordWriter,
RecordsPerSplit
});

EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
"If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship."};
EXTENSIONAPI static constexpr auto Splits = core::RelationshipDefinition{"splits",
"The individual 'segments' of the original FlowFile will be routed to this relationship."};
EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original",
"Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship."};
EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Splits, Original};

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;

ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS

explicit SplitRecord(std::string_view name, const utils::Identifier& uuid = {});
void initialize() override;
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

private:
static nonstd::expected<std::size_t, std::string> readRecordsPerSplit(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& original_flow_file);

std::shared_ptr<core::RecordSetReader> record_set_reader_;
std::shared_ptr<core::RecordSetWriter> record_set_writer_;

std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SplitRecord>::getLogger(uuid_)};
};

} // namespace org::apache::nifi::minifi::processors
91 changes: 91 additions & 0 deletions extensions/standard-processors/tests/unit/SplitRecordTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "processors/SplitRecord.h"
#include "unit/SingleProcessorTestController.h"
#include "utils/StringUtils.h"

namespace org::apache::nifi::minifi::test {

class SplitRecordTestController : public TestController {
public:
SplitRecordTestController() {
controller_.plan->addController("JsonRecordSetReader", "JsonRecordSetReader");
controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
proc_->setProperty(processors::SplitRecord::RecordReader, "JsonRecordSetReader");
proc_->setProperty(processors::SplitRecord::RecordWriter, "JsonRecordSetWriter");
}

void verifyResults(const ProcessorTriggerResult& results, const std::vector<std::string>& expected_contents) const {
REQUIRE(results.at(processors::SplitRecord::Original).size() == 1);
REQUIRE(results.at(processors::SplitRecord::Splits).size() == expected_contents.size());
auto& split_results = results.at(processors::SplitRecord::Splits);
const auto fragment_identifier = split_results[0]->getAttribute("fragment.identifier").value();
const auto original_filename = results.at(processors::SplitRecord::Original)[0]->getAttribute("filename").value();
for (size_t i = 0; i < expected_contents.size(); ++i) {
CHECK(controller_.plan->getContent(split_results[i]) == expected_contents[i]);
CHECK(split_results[i]->getAttribute("record.count").value() == std::to_string(minifi::utils::string::split(expected_contents[i], "},{").size()));
CHECK(split_results[i]->getAttribute("fragment.index").value() == std::to_string(i));
CHECK(split_results[i]->getAttribute("fragment.count").value() == std::to_string(expected_contents.size()));
CHECK(split_results[i]->getAttribute("fragment.identifier").value() == fragment_identifier);
CHECK(split_results[i]->getAttribute("segment.original.filename").value() == original_filename);
}
}

protected:
std::shared_ptr<core::Processor> proc_ = std::make_shared<processors::SplitRecord>("SplitRecord");
SingleProcessorTestController controller_{proc_};
};

TEST_CASE_METHOD(SplitRecordTestController, "Invalid Records Per Split property", "[splitrecord]") {
proc_->setProperty(processors::SplitRecord::RecordsPerSplit, "invalid");
auto results = controller_.trigger({InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}"}});
REQUIRE(results[processors::SplitRecord::Failure].size() == 1);
REQUIRE(LogTestController::getInstance().contains("Failed to convert Records Per Split property to an integer", 1s));
}

TEST_CASE_METHOD(SplitRecordTestController, "Records Per Split property should be greater than zero", "[splitrecord]") {
proc_->setProperty(processors::SplitRecord::RecordsPerSplit, "${id}");
auto results = controller_.trigger({InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"id", "0"}}}});
REQUIRE(results[processors::SplitRecord::Failure].size() == 1);
REQUIRE(LogTestController::getInstance().contains("Records per split should be set to a number larger than 0", 1s));
}

TEST_CASE_METHOD(SplitRecordTestController, "Invalid records in flow file result in zero splits", "[splitrecord]") {
proc_->setProperty(processors::SplitRecord::RecordsPerSplit, "1");
auto results = controller_.trigger({InputFlowFileData{ R"({"name": "John)"}});
CHECK(results[processors::SplitRecord::Splits].empty());
REQUIRE(results[processors::SplitRecord::Original].size() == 1);
CHECK(controller_.plan->getContent(results.at(processors::SplitRecord::Original)[0]) == "{\"name\": \"John");
}

TEST_CASE_METHOD(SplitRecordTestController, "Split records one by one", "[splitrecord]") {
proc_->setProperty(processors::SplitRecord::RecordsPerSplit, "1");
auto results = controller_.trigger({InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}"}});
verifyResults(results, {R"([{"name":"John"}])", R"([{"name":"Jill"}])"});
}

TEST_CASE_METHOD(SplitRecordTestController, "Split records two by two", "[splitrecord]") {
proc_->setProperty(processors::SplitRecord::RecordsPerSplit, "2");
auto results = controller_.trigger({InputFlowFileData{"{\"a\": \"1\", \"b\": \"2\"}\n{\"c\": \"3\"}\n{\"d\": \"4\", \"e\": \"5\"}\n{\"f\": \"6\"}\n{\"g\": \"7\", \"h\": \"8\"}\n"}});
verifyResults(results, {R"([{"b":"2","a":"1"},{"c":"3"}])", R"([{"e":"5","d":"4"},{"f":"6"}])", R"([{"h":"8","g":"7"}])"});
}

} // namespace org::apache::nifi::minifi::test

0 comments on commit 96dc9e3

Please sign in to comment.