Skip to content

Commit

Permalink
MINIFICPP-2463 TailFile doesnt pickup the data after the last delimit…
Browse files Browse the repository at this point in the history
…er in rotated files
  • Loading branch information
martinzink committed Oct 10, 2024
1 parent 0359e1b commit f56438f
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 311 deletions.
49 changes: 24 additions & 25 deletions extensions/standard-processors/processors/TailFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class FileReaderCallback {
return begin_ != end_ || input_stream_.good();
}

bool useLatestFlowFile() const {
bool endedWithDelimiter() const {
return latest_flow_file_ends_with_delimiter_;
}

Expand Down Expand Up @@ -309,7 +309,7 @@ void TailFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFac
}
}

void TailFile::parseAttributeProviderServiceProperty(core::ProcessContext& context) {
void TailFile::parseAttributeProviderServiceProperty(const core::ProcessContext& context) {
const auto attribute_provider_service_name = context.getProperty(AttributeProviderService);
if (!attribute_provider_service_name || attribute_provider_service_name->empty()) {
return;
Expand Down Expand Up @@ -396,7 +396,7 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::filesystem::path, Tai
}
}

bool TailFile::recoverState(core::ProcessContext& context) {
bool TailFile::recoverState(const core::ProcessContext& context) {
std::map<std::filesystem::path, TailState> new_tail_states;
bool state_load_success = getStateFromStateManager(new_tail_states) ||
getStateFromLegacyStateFile(context, new_tail_states);
Expand Down Expand Up @@ -429,7 +429,7 @@ bool TailFile::getStateFromStateManager(std::map<std::filesystem::path, TailStat
std::unordered_map<std::string, std::string> state_map;
if (state_manager_->get(state_map)) {
for (size_t i = 0U;; ++i) {
if (state_map.find("file." + std::to_string(i) + ".name") == state_map.end()) {
if (!state_map.contains("file." + std::to_string(i) + ".name")) {
break;
}
try {
Expand Down Expand Up @@ -462,7 +462,7 @@ bool TailFile::getStateFromStateManager(std::map<std::filesystem::path, TailStat
return false;
}

bool TailFile::getStateFromLegacyStateFile(core::ProcessContext& context,
bool TailFile::getStateFromLegacyStateFile(const core::ProcessContext& context,
std::map<std::filesystem::path, TailState> &new_tail_states) const {
std::string state_file_name_property;
context.getProperty(StateFile, state_file_name_property);
Expand Down Expand Up @@ -499,7 +499,7 @@ std::ostream& operator<<(std::ostream &os, const TailState &tail_state) {
return os;
}

bool TailFile::storeState() {
bool TailFile::storeState() const {
std::unordered_map<std::string, std::string> state;
size_t i = 0;
for (const auto& tail_state : tail_states_) {
Expand Down Expand Up @@ -554,12 +554,12 @@ std::vector<TailState> TailFile::findRotatedFilesAfterLastReadTime(const TailSta
auto collect_matching_files = [&](const std::filesystem::path& path, const std::filesystem::path& file_name) -> bool {
utils::Regex pattern_regex(pattern);
if (file_name != state.file_name_ && utils::regexMatch(file_name.string(), pattern_regex)) {
auto full_file_name = path / file_name;
const auto full_file_name = path / file_name;
TailStateWithMtime::TimePoint mtime{utils::file::last_write_time_point(full_file_name)};
logger_->log_debug("File {} with mtime {} matches rolling filename pattern {}", file_name, int64_t{mtime.time_since_epoch().count()}, pattern);
if (mtime >= std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) {
logger_->log_debug("File {} has mtime >= last read time, so we are going to read it", file_name);
matched_files_with_mtime.emplace_back(TailState{path, file_name}, mtime);
matched_files_with_mtime.emplace_back(TailState{path, file_name, true}, mtime);
}
}
return true;
Expand All @@ -581,8 +581,7 @@ std::vector<TailState> TailFile::sortAndSkipMainFilePrefix(const TailState &stat
TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_;
auto full_file_name = first_rotated_file.fileNameWithPath();
if (utils::file::file_size(full_file_name) >= state.position_) {
uint64_t checksum = utils::file::computeChecksum(full_file_name, state.position_);
if (checksum == state.checksum_) {
if (utils::file::computeChecksum(full_file_name, state.position_) == state.checksum_) {
first_rotated_file.position_ = state.position_;
first_rotated_file.checksum_ = state.checksum_;
}
Expand All @@ -591,7 +590,7 @@ std::vector<TailState> TailFile::sortAndSkipMainFilePrefix(const TailState &stat

std::vector<TailState> matched_files;
matched_files.reserve(matched_files_with_mtime.size());
std::transform(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), std::back_inserter(matched_files),
std::ranges::transform(matched_files_with_mtime, std::back_inserter(matched_files),
[](TailStateWithMtime &tail_state_with_mtime) { return std::move(tail_state_with_mtime.tail_state_); });
return matched_files;
}
Expand All @@ -618,7 +617,7 @@ void TailFile::onTrigger(core::ProcessContext& context, core::ProcessSession& se
first_trigger_ = false;
}

bool TailFile::isOldFileInitiallyRead(TailState &state) const {
bool TailFile::isOldFileInitiallyRead(const TailState& state) const {
// This is our initial processing and no stored state was found
return first_trigger_ && state.last_read_time_ == std::chrono::file_clock::time_point{};
}
Expand Down Expand Up @@ -695,8 +694,8 @@ void TailFile::processSingleFile(core::ProcessSession& session,
auto flow_file = session.create();
session.write(flow_file, std::ref(file_reader));

if (file_reader.useLatestFlowFile()) {
updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, flow_file);
if (file_reader.endedWithDelimiter() || (state.is_rotated_ && flow_file->getSize() > 0)) {
updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, *flow_file);
session.transfer(flow_file, Success);
updateStateAttributes(state_copy, flow_file->getSize(), file_reader.checksum());

Expand All @@ -715,7 +714,7 @@ void TailFile::processSingleFile(core::ProcessSession& session,
auto flow_file = session.create();
session.write(flow_file, std::ref(file_reader));

updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, flow_file);
updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, *flow_file);
session.transfer(flow_file, Success);
updateStateAttributes(state, flow_file->getSize(), file_reader.checksum());
}
Expand All @@ -724,24 +723,24 @@ void TailFile::processSingleFile(core::ProcessSession& session,
void TailFile::updateFlowFileAttributes(const std::filesystem::path& full_file_name, const TailState& state,
const std::filesystem::path& fileName, const std::string& baseName,
const std::string& extension,
std::shared_ptr<core::FlowFile> &flow_file) const {
logger_->log_info("TailFile {} for {} bytes", fileName, flow_file->getSize());
std::string logName = textfragmentutils::createFileName(baseName, extension, state.position_, flow_file->getSize());
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, state.path_.string());
flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name.string());
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, logName);
core::FlowFile& flow_file) const {
logger_->log_info("TailFile {} for {} bytes", fileName, flow_file.getSize());
std::string logName = textfragmentutils::createFileName(baseName, extension, state.position_, flow_file.getSize());
flow_file.setAttribute(core::SpecialFlowAttribute::PATH, state.path_.string());
flow_file.addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name.string());
flow_file.setAttribute(core::SpecialFlowAttribute::FILENAME, logName);

flow_file->setAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, baseName);
flow_file->setAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, extension);
flow_file->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(state.position_));
flow_file.setAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, baseName);
flow_file.setAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, extension);
flow_file.setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(state.position_));

if (extra_attributes_.contains(state.path_.string())) {
std::string prefix;
if (attribute_provider_service_) {
prefix = std::string(attribute_provider_service_->name()) + ".";
}
for (const auto& [key, value] : extra_attributes_.at(state.path_.string())) {
flow_file->setAttribute(prefix + key, value);
flow_file.setAttribute(prefix + key, value);
}
}
}
Expand Down
26 changes: 13 additions & 13 deletions extensions/standard-processors/processors/TailFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <unordered_map>
#include <vector>
#include <set>
#include <optional>

#include "controllers/AttributeProviderService.h"
Expand Down Expand Up @@ -73,12 +72,12 @@ namespace org::apache::nifi::minifi::processors {

struct TailState {
TailState(std::filesystem::path path, std::filesystem::path file_name, uint64_t position,
std::chrono::file_clock::time_point last_read_time,
uint64_t checksum)
: path_(std::move(path)), file_name_(std::move(file_name)), position_(position), last_read_time_(last_read_time), checksum_(checksum) {}
const std::chrono::file_clock::time_point last_read_time,
const uint64_t checksum, const bool is_rotated = false)
: path_(std::move(path)), file_name_(std::move(file_name)), position_(position), last_read_time_(last_read_time), checksum_(checksum), is_rotated_(is_rotated) {}

TailState(std::filesystem::path path, std::filesystem::path file_name)
: TailState{std::move(path), std::move(file_name), 0, std::chrono::file_clock::time_point{}, 0} {}
TailState(std::filesystem::path path, std::filesystem::path file_name, const bool is_rotated = false)
: TailState{std::move(path), std::move(file_name), 0, std::chrono::file_clock::time_point{}, 0, is_rotated} {}

TailState() = default;

Expand All @@ -95,6 +94,7 @@ struct TailState {
uint64_t position_ = 0;
std::chrono::file_clock::time_point last_read_time_;
uint64_t checksum_ = 0;
bool is_rotated_ = false;
};

std::ostream& operator<<(std::ostream &os, const TailState &tail_state);
Expand Down Expand Up @@ -221,9 +221,9 @@ class TailFile : public core::Processor {
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

void initialize() override;
bool recoverState(core::ProcessContext& context);
bool recoverState(const core::ProcessContext& context);
void logState();
bool storeState();
bool storeState() const;
std::chrono::milliseconds getLookupFrequency() const;

private:
Expand All @@ -237,7 +237,7 @@ class TailFile : public core::Processor {
TimePoint mtime_;
};

void parseAttributeProviderServiceProperty(core::ProcessContext& context);
void parseAttributeProviderServiceProperty(const core::ProcessContext& context);
void parseStateFileLine(char *buf, std::map<std::filesystem::path, TailState> &state) const;
void processAllRotatedFiles(core::ProcessSession& session, TailState &state);
void processRotatedFiles(core::ProcessSession& session, TailState &state, std::vector<TailState> &rotated_file_states);
Expand All @@ -253,21 +253,21 @@ class TailFile : public core::Processor {
const std::filesystem::path& full_file_name,
TailState &state);
bool getStateFromStateManager(std::map<std::filesystem::path, TailState> &new_tail_states) const;
bool getStateFromLegacyStateFile(core::ProcessContext& context,
bool getStateFromLegacyStateFile(const core::ProcessContext& context,
std::map<std::filesystem::path, TailState> &new_tail_states) const;
void doMultifileLookup(core::ProcessContext& context);
void checkForRemovedFiles();
void checkForNewFiles(core::ProcessContext& context);
static std::string baseDirectoryFromAttributes(const controllers::AttributeProviderService::AttributeMap& attribute_map, core::ProcessContext& context);
void updateFlowFileAttributes(const std::filesystem::path& full_file_name, const TailState &state, const std::filesystem::path& fileName,
const std::string &baseName, const std::string &extension,
std::shared_ptr<core::FlowFile> &flow_file) const;
core::FlowFile& flow_file) const;
static void updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum);
bool isOldFileInitiallyRead(TailState &state) const;
bool isOldFileInitiallyRead(const TailState &state) const;

static const char *CURRENT_STR;
static const char *POSITION_STR;
static const int BUFFER_SIZE = 512;
static constexpr int BUFFER_SIZE = 512;

std::optional<char> delimiter_; // Delimiter for the data incoming from the tailed file.
core::StateManager* state_manager_ = nullptr;
Expand Down
Loading

0 comments on commit f56438f

Please sign in to comment.