Skip to content

Commit

Permalink
Refactor function parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jul 26, 2023
1 parent 860ffdc commit b0176bf
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 67 deletions.
54 changes: 27 additions & 27 deletions extensions/libarchive/BinFiles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFac
}
}

void BinFiles::preprocessFlowFile(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/, const std::shared_ptr<core::FlowFile>& flow) {
void BinFiles::preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
// handle backward compatibility with old segment attributes
std::string value;
if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) {
Expand Down Expand Up @@ -208,67 +208,67 @@ bool BinManager::offer(const std::string &group, const std::shared_ptr<core::Flo
return true;
}

bool BinFiles::resurrectFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
bool BinFiles::resurrectFlowFiles(core::ProcessSession &session) {
auto flow_files = file_store_.getNewFlowFiles();
// these are already processed FlowFiles, that we own
bool had_failure = false;
for (auto &file : flow_files) {
std::string group_id = getGroupId(context.get(), file);
std::string group_id = getGroupId(file);
if (!binManager_.offer(group_id, file)) {
session->transfer(file, Failure);
session.transfer(file, Failure);
had_failure = true;
}
// no need to route successfully captured such files as we already own them in the Self relationship
}
return had_failure;
}

void BinFiles::assumeOwnerShipOfNextBatch(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
void BinFiles::assumeOwnerShipOfNextBatch(core::ProcessSession &session) {
for (size_t i = 0; i < batchSize_; ++i) {
auto flow = session->get();
auto flow = session.get();

if (flow == nullptr) {
break;
}

preprocessFlowFile(context.get(), session.get(), flow);
std::string group_id = getGroupId(context.get(), flow);
preprocessFlowFile(flow);
std::string group_id = getGroupId(flow);

bool offer = binManager_.offer(group_id, flow);
if (!offer) {
session->transfer(flow, Failure);
session.transfer(flow, Failure);
continue;
}
session->transfer(flow, Self);
session.transfer(flow, Self);
}
session->commit();
session.commit();
}

void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session) {
while (!ready_bins.empty()) {
std::unique_ptr<Bin> bin = std::move(ready_bins.front());
ready_bins.pop_front();

try {
addFlowsToSession(context.get(), session.get(), bin);
addFlowsToSession(session, bin);
logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
if (!this->processBin(context.get(), session.get(), bin))
this->transferFlowsToFail(context.get(), session.get(), bin);
session->commit();
if (!processBin(session, bin))
transferFlowsToFail(session, bin);
session.commit();
} catch(const std::exception& ex) {
logger_->log_error("Caught Exception type: '%s' while merging ready bin: '%s'", typeid(ex).name(), ex.what());
binManager_.addReadyBin(std::move(bin));
session->rollback();
session.rollback();
}
}
}

std::deque<std::unique_ptr<Bin>> BinFiles::gatherReadyBins(const std::shared_ptr<core::ProcessContext> &context) {
std::deque<std::unique_ptr<Bin>> BinFiles::gatherReadyBins(core::ProcessContext &context) {
binManager_.gatherReadyBins();
if (gsl::narrow<uint32_t>(binManager_.getBinCount()) > maxBinCount_) {
// bin count reach max allowed
context->yield();
logger_->log_debug("BinFiles reach max bin count %d", this->binManager_.getBinCount());
context.yield();
logger_->log_debug("BinFiles reach max bin count %d", binManager_.getBinCount());
binManager_.removeOldestBin();
}

Expand All @@ -278,27 +278,27 @@ std::deque<std::unique_ptr<Bin>> BinFiles::gatherReadyBins(const std::shared_ptr
}

void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
if (resurrectFlowFiles(context, session)) {
if (resurrectFlowFiles(*session)) {
context->yield();
return;
}

assumeOwnerShipOfNextBatch(context, session);
processReadyBins(gatherReadyBins(context), context, session);
assumeOwnerShipOfNextBatch(*session);
processReadyBins(gatherReadyBins(*context), *session);
}

void BinFiles::transferFlowsToFail(core::ProcessContext* /*context*/, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
void BinFiles::transferFlowsToFail(core::ProcessSession &session, std::unique_ptr<Bin> &bin) {
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (const auto& flow : flows) {
session->transfer(flow, Failure);
session.transfer(flow, Failure);
}
flows.clear();
}

void BinFiles::addFlowsToSession(core::ProcessContext* /*context*/, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
void BinFiles::addFlowsToSession(core::ProcessSession &session, std::unique_ptr<Bin> &bin) {
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (const auto& flow : flows) {
session->add(flow);
session.add(flow);
}
}

Expand Down
18 changes: 9 additions & 9 deletions extensions/libarchive/BinFiles.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,24 +285,24 @@ class BinFiles : public core::Processor {

protected:
// Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
virtual void preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, const std::shared_ptr<core::FlowFile>& flow);
virtual void preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow);
// Returns a group ID representing a bin. This allows flow files to be binned into like groups
virtual std::string getGroupId(core::ProcessContext* /*context*/, const std::shared_ptr<core::FlowFile>& /*flow*/) {
virtual std::string getGroupId(const std::shared_ptr<core::FlowFile>& /*flow*/) {
return "";
}
// Processes a single bin.
virtual bool processBin(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/, std::unique_ptr<Bin>& /*bin*/) {
virtual bool processBin(core::ProcessSession& /*session*/, std::unique_ptr<Bin>& /*bin*/) {
return false;
}
// transfer flows to failure in bin
static void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
static void transferFlowsToFail(core::ProcessSession &session, std::unique_ptr<Bin> &bin);
// moves owned flows to session
static void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
static void addFlowsToSession(core::ProcessSession &session, std::unique_ptr<Bin> &bin);

bool resurrectFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
void assumeOwnerShipOfNextBatch(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
std::deque<std::unique_ptr<Bin>> gatherReadyBins(const std::shared_ptr<core::ProcessContext> &context);
void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
bool resurrectFlowFiles(core::ProcessSession &session);
void assumeOwnerShipOfNextBatch(core::ProcessSession &session);
std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext &context);
void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session);

BinManager binManager_;

Expand Down
48 changes: 24 additions & 24 deletions extensions/libarchive/MergeContent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void MergeContent::validatePropertyOptions() {
}
}

std::string MergeContent::getGroupId(core::ProcessContext*, const std::shared_ptr<core::FlowFile>& flow) {
std::string MergeContent::getGroupId(const std::shared_ptr<core::FlowFile>& flow) {
std::string groupId;
std::string value;
if (!correlationAttributeName_.empty()) {
Expand Down Expand Up @@ -191,7 +191,7 @@ void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession
BinFiles::onTrigger(context, session);
}

bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
bool MergeContent::processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) {
if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK)
return false;

Expand All @@ -213,19 +213,19 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
});
}

std::shared_ptr<core::FlowFile> merge_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
std::shared_ptr<core::FlowFile> merge_flow = std::static_pointer_cast<FlowFileRecord>(session.create());
if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) {
KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow);
} else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) {
KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow);
} else {
logger_->log_error("Attribute strategy not supported %s", attributeStrategy_);
session->remove(merge_flow);
session.remove(merge_flow);
return false;
}

auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
return session->read(ff, cb);
return session.read(ff, cb);
};

const char* mimeType;
Expand All @@ -247,30 +247,30 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
mimeType = "application/zip";
} else {
logger_->log_error("Merge format not supported %s", mergeFormat_);
session->remove(merge_flow);
session.remove(merge_flow);
return false;
}

std::shared_ptr<core::FlowFile> mergeFlow;
try {
mergeBin->merge(context, session, bin->getFlowFile(), *serializer, merge_flow);
session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
mergeBin->merge(session, bin->getFlowFile(), *serializer, merge_flow);
session.putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
} catch (const std::exception& ex) {
logger_->log_error("Merge Content merge catch exception, type: %s, what: %s", typeid(ex).name(), ex.what());
session->remove(merge_flow);
session.remove(merge_flow);
return false;
} catch (...) {
logger_->log_error("Merge Content merge catch exception, type: %s", getCurrentExceptionTypeName());
session->remove(merge_flow);
session.remove(merge_flow);
return false;
}
session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
session.putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));

// we successfully merge the flow
session->transfer(merge_flow, Merge);
session.transfer(merge_flow, Merge);
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (const auto& flow : flows) {
session->transfer(flow, Original);
session.transfer(flow, Original);
}
logger_->log_info("Merge FlowFile record UUID %s, payload length %d", merge_flow->getUUIDStr(), merge_flow->getSize());

Expand All @@ -282,22 +282,22 @@ BinaryConcatenationMerge::BinaryConcatenationMerge(std::string header, std::stri
footer_(std::move(footer)),
demarcator_(std::move(demarcator)) {}

void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
void BinaryConcatenationMerge::merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
session->write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer});
session.write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer});
std::string fileName;
if (flows.size() == 1) {
flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty())
session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
}

void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
void TarMerge::merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer});
session.write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer});
std::string fileName;
merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
if (flows.size() == 1) {
Expand All @@ -307,13 +307,13 @@ void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se
}
if (!fileName.empty()) {
fileName += ".tar";
session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
}
}

void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
void ZipMerge::merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer});
session.write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer});
std::string fileName;
merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
if (flows.size() == 1) {
Expand All @@ -323,13 +323,13 @@ void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se
}
if (!fileName.empty()) {
fileName += ".zip";
session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
}
}

void AttributeMerger::mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow) {
void AttributeMerger::mergeAttributes(core::ProcessSession &session, const std::shared_ptr<core::FlowFile> &merge_flow) {
for (const auto& pair : getMergedAttributes()) {
session->putAttribute(merge_flow, pair.first, pair.second);
session.putAttribute(merge_flow, pair.first, pair.second);
}
}

Expand Down
14 changes: 7 additions & 7 deletions extensions/libarchive/MergeContent.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ class MergeBin {
public:
virtual ~MergeBin() = default;
// merge the flows in the bin
virtual void merge(core::ProcessContext *context, core::ProcessSession *session,
virtual void merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0;
};

class BinaryConcatenationMerge : public MergeBin {
public:
BinaryConcatenationMerge(std::string header, std::string footer, std::string demarcator);

void merge(core::ProcessContext* context, core::ProcessSession *session,
void merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>>& flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) override;
// Nest Callback Class for write stream
class WriteCallback {
Expand Down Expand Up @@ -242,21 +242,21 @@ class ArchiveMerge {

class TarMerge: public ArchiveMerge, public MergeBin {
public:
void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
};

class ZipMerge: public ArchiveMerge, public MergeBin {
public:
void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
};

class AttributeMerger {
public:
explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: flows_(flows) {}
void mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow);
void mergeAttributes(core::ProcessSession &session, const std::shared_ptr<core::FlowFile> &merge_flow);
virtual ~AttributeMerger() = default;

protected:
Expand Down Expand Up @@ -373,11 +373,11 @@ class MergeContent : public processors::BinFiles {
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
void initialize() override;
bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) override;
bool processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) override;

protected:
// Returns a group ID representing a bin. This allows flow files to be binned into like groups
std::string getGroupId(core::ProcessContext *context, const std::shared_ptr<core::FlowFile>& flow) override;
std::string getGroupId(const std::shared_ptr<core::FlowFile>& flow) override;
// check whether the defragment bin is validate
static bool checkDefragment(std::unique_ptr<Bin> &bin);

Expand Down

0 comments on commit b0176bf

Please sign in to comment.