From b0176bf6ada52e07a9bcf853947aba9c1e0e45b9 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 26 Jul 2023 16:49:45 +0200 Subject: [PATCH] Refactor function parameters --- extensions/libarchive/BinFiles.cpp | 54 +++++++++++++------------- extensions/libarchive/BinFiles.h | 18 ++++----- extensions/libarchive/MergeContent.cpp | 48 +++++++++++------------ extensions/libarchive/MergeContent.h | 14 +++---- 4 files changed, 67 insertions(+), 67 deletions(-) diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp index 8a8bb7e3abd..01dc155533c 100644 --- a/extensions/libarchive/BinFiles.cpp +++ b/extensions/libarchive/BinFiles.cpp @@ -80,7 +80,7 @@ void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFac } } -void BinFiles::preprocessFlowFile(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/, const std::shared_ptr& flow) { +void BinFiles::preprocessFlowFile(const std::shared_ptr& flow) { // handle backward compatibility with old segment attributes std::string value; if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) { @@ -208,14 +208,14 @@ bool BinManager::offer(const std::string &group, const std::shared_ptr &context, const std::shared_ptr &session) { +bool BinFiles::resurrectFlowFiles(core::ProcessSession &session) { auto flow_files = file_store_.getNewFlowFiles(); // these are already processed FlowFiles, that we own bool had_failure = false; for (auto &file : flow_files) { - std::string group_id = getGroupId(context.get(), file); + std::string group_id = getGroupId(file); if (!binManager_.offer(group_id, file)) { - session->transfer(file, Failure); + session.transfer(file, Failure); had_failure = true; } // no need to route successfully captured such files as we already own them in the Self relationship @@ -223,52 +223,52 @@ bool BinFiles::resurrectFlowFiles(const std::shared_ptr &c return had_failure; } -void BinFiles::assumeOwnerShipOfNextBatch(const std::shared_ptr &context, const std::shared_ptr &session) { +void BinFiles::assumeOwnerShipOfNextBatch(core::ProcessSession &session) { for (size_t i = 0; i < batchSize_; ++i) { - auto flow = session->get(); + auto flow = session.get(); if (flow == nullptr) { break; } - preprocessFlowFile(context.get(), session.get(), flow); - std::string group_id = getGroupId(context.get(), flow); + preprocessFlowFile(flow); + std::string group_id = getGroupId(flow); bool offer = binManager_.offer(group_id, flow); if (!offer) { - session->transfer(flow, Failure); + session.transfer(flow, Failure); continue; } - session->transfer(flow, Self); + session.transfer(flow, Self); } - session->commit(); + session.commit(); } -void BinFiles::processReadyBins(std::deque> ready_bins, const std::shared_ptr &context, const std::shared_ptr &session) { +void BinFiles::processReadyBins(std::deque> ready_bins, core::ProcessSession &session) { while (!ready_bins.empty()) { std::unique_ptr bin = std::move(ready_bins.front()); ready_bins.pop_front(); try { - addFlowsToSession(context.get(), session.get(), bin); + addFlowsToSession(session, bin); logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId()); - if (!this->processBin(context.get(), session.get(), bin)) - this->transferFlowsToFail(context.get(), session.get(), bin); - session->commit(); + if (!processBin(session, bin)) + transferFlowsToFail(session, bin); + session.commit(); } catch(const std::exception& ex) { logger_->log_error("Caught Exception type: '%s' while merging ready bin: '%s'", typeid(ex).name(), ex.what()); binManager_.addReadyBin(std::move(bin)); - session->rollback(); + session.rollback(); } } } -std::deque> BinFiles::gatherReadyBins(const std::shared_ptr &context) { +std::deque> BinFiles::gatherReadyBins(core::ProcessContext &context) { binManager_.gatherReadyBins(); if (gsl::narrow(binManager_.getBinCount()) > maxBinCount_) { // bin count reach max allowed - context->yield(); - logger_->log_debug("BinFiles reach max bin count %d", this->binManager_.getBinCount()); + context.yield(); + logger_->log_debug("BinFiles reach max bin count %d", binManager_.getBinCount()); binManager_.removeOldestBin(); } @@ -278,27 +278,27 @@ std::deque> BinFiles::gatherReadyBins(const std::shared_ptr } void BinFiles::onTrigger(const std::shared_ptr &context, const std::shared_ptr &session) { - if (resurrectFlowFiles(context, session)) { + if (resurrectFlowFiles(*session)) { context->yield(); return; } - assumeOwnerShipOfNextBatch(context, session); - processReadyBins(gatherReadyBins(context), context, session); + assumeOwnerShipOfNextBatch(*session); + processReadyBins(gatherReadyBins(*context), *session); } -void BinFiles::transferFlowsToFail(core::ProcessContext* /*context*/, core::ProcessSession *session, std::unique_ptr &bin) { +void BinFiles::transferFlowsToFail(core::ProcessSession &session, std::unique_ptr &bin) { std::deque> &flows = bin->getFlowFile(); for (const auto& flow : flows) { - session->transfer(flow, Failure); + session.transfer(flow, Failure); } flows.clear(); } -void BinFiles::addFlowsToSession(core::ProcessContext* /*context*/, core::ProcessSession *session, std::unique_ptr &bin) { +void BinFiles::addFlowsToSession(core::ProcessSession &session, std::unique_ptr &bin) { std::deque> &flows = bin->getFlowFile(); for (const auto& flow : flows) { - session->add(flow); + session.add(flow); } } diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h index 49f744f5924..e21fe1148c2 100644 --- a/extensions/libarchive/BinFiles.h +++ b/extensions/libarchive/BinFiles.h @@ -285,24 +285,24 @@ class BinFiles : public core::Processor { protected: // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). - virtual void preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, const std::shared_ptr& flow); + virtual void preprocessFlowFile(const std::shared_ptr& flow); // Returns a group ID representing a bin. This allows flow files to be binned into like groups - virtual std::string getGroupId(core::ProcessContext* /*context*/, const std::shared_ptr& /*flow*/) { + virtual std::string getGroupId(const std::shared_ptr& /*flow*/) { return ""; } // Processes a single bin. - virtual bool processBin(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/, std::unique_ptr& /*bin*/) { + virtual bool processBin(core::ProcessSession& /*session*/, std::unique_ptr& /*bin*/) { return false; } // transfer flows to failure in bin - static void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin); + static void transferFlowsToFail(core::ProcessSession &session, std::unique_ptr &bin); // moves owned flows to session - static void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin); + static void addFlowsToSession(core::ProcessSession &session, std::unique_ptr &bin); - bool resurrectFlowFiles(const std::shared_ptr &context, const std::shared_ptr &session); - void assumeOwnerShipOfNextBatch(const std::shared_ptr &context, const std::shared_ptr &session); - std::deque> gatherReadyBins(const std::shared_ptr &context); - void processReadyBins(std::deque> ready_bins, const std::shared_ptr &context, const std::shared_ptr &session); + bool resurrectFlowFiles(core::ProcessSession &session); + void assumeOwnerShipOfNextBatch(core::ProcessSession &session); + std::deque> gatherReadyBins(core::ProcessContext &context); + void processReadyBins(std::deque> ready_bins, core::ProcessSession &session); BinManager binManager_; diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp index c2f51b5f7e3..46f1b89d3f3 100644 --- a/extensions/libarchive/MergeContent.cpp +++ b/extensions/libarchive/MergeContent.cpp @@ -128,7 +128,7 @@ void MergeContent::validatePropertyOptions() { } } -std::string MergeContent::getGroupId(core::ProcessContext*, const std::shared_ptr& flow) { +std::string MergeContent::getGroupId(const std::shared_ptr& flow) { std::string groupId; std::string value; if (!correlationAttributeName_.empty()) { @@ -191,7 +191,7 @@ void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession BinFiles::onTrigger(context, session); } -bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin) { +bool MergeContent::processBin(core::ProcessSession &session, std::unique_ptr &bin) { if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK) return false; @@ -213,19 +213,19 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio }); } - std::shared_ptr merge_flow = std::static_pointer_cast(session->create()); + std::shared_ptr merge_flow = std::static_pointer_cast(session.create()); if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) { KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow); } else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) { KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, merge_flow); } else { logger_->log_error("Attribute strategy not supported %s", attributeStrategy_); - session->remove(merge_flow); + session.remove(merge_flow); return false; } auto flowFileReader = [&] (const std::shared_ptr& ff, const io::InputStreamCallback& cb) { - return session->read(ff, cb); + return session.read(ff, cb); }; const char* mimeType; @@ -247,30 +247,30 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio mimeType = "application/zip"; } else { logger_->log_error("Merge format not supported %s", mergeFormat_); - session->remove(merge_flow); + session.remove(merge_flow); return false; } std::shared_ptr mergeFlow; try { - mergeBin->merge(context, session, bin->getFlowFile(), *serializer, merge_flow); - session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType); + mergeBin->merge(session, bin->getFlowFile(), *serializer, merge_flow); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType); } catch (const std::exception& ex) { logger_->log_error("Merge Content merge catch exception, type: %s, what: %s", typeid(ex).name(), ex.what()); - session->remove(merge_flow); + session.remove(merge_flow); return false; } catch (...) { logger_->log_error("Merge Content merge catch exception, type: %s", getCurrentExceptionTypeName()); - session->remove(merge_flow); + session.remove(merge_flow); return false; } - session->putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize())); + session.putAttribute(merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize())); // we successfully merge the flow - session->transfer(merge_flow, Merge); + session.transfer(merge_flow, Merge); std::deque> &flows = bin->getFlowFile(); for (const auto& flow : flows) { - session->transfer(flow, Original); + session.transfer(flow, Original); } logger_->log_info("Merge FlowFile record UUID %s, payload length %d", merge_flow->getUUIDStr(), merge_flow->getSize()); @@ -282,9 +282,9 @@ BinaryConcatenationMerge::BinaryConcatenationMerge(std::string header, std::stri footer_(std::move(footer)), demarcator_(std::move(demarcator)) {} -void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session, +void BinaryConcatenationMerge::merge(core::ProcessSession &session, std::deque> &flows, FlowFileSerializer& serializer, const std::shared_ptr& merge_flow) { - session->write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer}); + session.write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer}); std::string fileName; if (flows.size() == 1) { flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); @@ -292,12 +292,12 @@ void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::Pr flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); } if (!fileName.empty()) - session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); } -void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session, +void TarMerge::merge(core::ProcessSession &session, std::deque> &flows, FlowFileSerializer& serializer, const std::shared_ptr& merge_flow) { - session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer}); + session.write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer}); std::string fileName; merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); if (flows.size() == 1) { @@ -307,13 +307,13 @@ void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se } if (!fileName.empty()) { fileName += ".tar"; - session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); } } -void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session, +void ZipMerge::merge(core::ProcessSession &session, std::deque> &flows, FlowFileSerializer& serializer, const std::shared_ptr& merge_flow) { - session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer}); + session.write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer}); std::string fileName; merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); if (flows.size() == 1) { @@ -323,13 +323,13 @@ void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se } if (!fileName.empty()) { fileName += ".zip"; - session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); + session.putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName); } } -void AttributeMerger::mergeAttributes(core::ProcessSession *session, const std::shared_ptr &merge_flow) { +void AttributeMerger::mergeAttributes(core::ProcessSession &session, const std::shared_ptr &merge_flow) { for (const auto& pair : getMergedAttributes()) { - session->putAttribute(merge_flow, pair.first, pair.second); + session.putAttribute(merge_flow, pair.first, pair.second); } } diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index 87c253b3df1..474d437c505 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -56,7 +56,7 @@ class MergeBin { public: virtual ~MergeBin() = default; // merge the flows in the bin - virtual void merge(core::ProcessContext *context, core::ProcessSession *session, + virtual void merge(core::ProcessSession &session, std::deque> &flows, FlowFileSerializer& serializer, const std::shared_ptr &flowFile) = 0; }; @@ -64,7 +64,7 @@ class BinaryConcatenationMerge : public MergeBin { public: BinaryConcatenationMerge(std::string header, std::string footer, std::string demarcator); - void merge(core::ProcessContext* context, core::ProcessSession *session, + void merge(core::ProcessSession &session, std::deque>& flows, FlowFileSerializer& serializer, const std::shared_ptr& merge_flow) override; // Nest Callback Class for write stream class WriteCallback { @@ -242,13 +242,13 @@ class ArchiveMerge { class TarMerge: public ArchiveMerge, public MergeBin { public: - void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, + void merge(core::ProcessSession &session, std::deque> &flows, FlowFileSerializer& serializer, const std::shared_ptr &merge_flow) override; }; class ZipMerge: public ArchiveMerge, public MergeBin { public: - void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque> &flows, + void merge(core::ProcessSession &session, std::deque> &flows, FlowFileSerializer& serializer, const std::shared_ptr &merge_flow) override; }; @@ -256,7 +256,7 @@ class AttributeMerger { public: explicit AttributeMerger(std::deque> &flows) : flows_(flows) {} - void mergeAttributes(core::ProcessSession *session, const std::shared_ptr &merge_flow); + void mergeAttributes(core::ProcessSession &session, const std::shared_ptr &merge_flow); virtual ~AttributeMerger() = default; protected: @@ -373,11 +373,11 @@ class MergeContent : public processors::BinFiles { void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; void initialize() override; - bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr &bin) override; + bool processBin(core::ProcessSession &session, std::unique_ptr &bin) override; protected: // Returns a group ID representing a bin. This allows flow files to be binned into like groups - std::string getGroupId(core::ProcessContext *context, const std::shared_ptr& flow) override; + std::string getGroupId(const std::shared_ptr& flow) override; // check whether the defragment bin is validate static bool checkDefragment(std::unique_ptr &bin);