diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index 4f92177c..e1397f83 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -2,5 +2,6 @@ add_subdirectory(listDetector) add_subdirectory(sampler) add_subdirectory(telemetry) add_subdirectory(deduplicator) +add_subdirectory(adsnormalizer) add_subdirectory(flowScatter) add_subdirectory(clickhouse) diff --git a/modules/adsnormalizer/CMakeLists.txt b/modules/adsnormalizer/CMakeLists.txt new file mode 100644 index 00000000..febd4f0a --- /dev/null +++ b/modules/adsnormalizer/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(src) diff --git a/modules/adsnormalizer/README.md b/modules/adsnormalizer/README.md new file mode 100644 index 00000000..e69de29b diff --git a/modules/adsnormalizer/src/CMakeLists.txt b/modules/adsnormalizer/src/CMakeLists.txt new file mode 100644 index 00000000..b2ec3977 --- /dev/null +++ b/modules/adsnormalizer/src/CMakeLists.txt @@ -0,0 +1,16 @@ +add_executable(adsnormalizer + main.cpp + adsnormalizer.cpp +) + +target_link_libraries(adsnormalizer PRIVATE + telemetry::telemetry + telemetry::appFs + common + unirec::unirec++ + unirec::unirec + trap::trap + argparse +) + +install(TARGETS adsnormalizer DESTINATION ${INSTALL_DIR_BIN}) diff --git a/modules/adsnormalizer/src/adsnormalizer.cpp b/modules/adsnormalizer/src/adsnormalizer.cpp new file mode 100644 index 00000000..7d3e70de --- /dev/null +++ b/modules/adsnormalizer/src/adsnormalizer.cpp @@ -0,0 +1,147 @@ +/** + * @file + * @author Jaroslav Pesek + * @brief Implementation of the AdsNormalizer class. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "adsnormalizer.hpp" +#include + +namespace { + +using namespace Nemea; +using SwapFn = void(*)(UnirecRecord&, ur_field_id_t, ur_field_id_t); + +static ur_field_id_t getUnirecIdByName(const char* str) +{ + auto unirecId = ur_get_id_by_name(str); + if (unirecId == UR_E_INVALID_NAME) { + throw std::runtime_error(std::string("Invalid Unirec name:") + str); + } + return static_cast(unirecId); +} + +template +void swapValues(UnirecRecord& record, ur_field_id_t field1, ur_field_id_t field2) +{ + auto value1 = record.getFieldAsType(field1); + auto value2 = record.getFieldAsType(field2); + record.setFieldFromType(value2, field1); + record.setFieldFromType(value1, field2); +} + + +template +void swap_fn(UnirecRecord& rec, ur_field_id_t f1, ur_field_id_t f2) +{ + swapValues(rec, f1, f2); +} + +static const std::unordered_map swap_dispatch = { + {UR_TYPE_IP, swap_fn}, + {UR_TYPE_MAC, swap_fn}, + {UR_TYPE_UINT8, swap_fn}, + {UR_TYPE_UINT16, swap_fn}, + {UR_TYPE_UINT32, swap_fn}, + {UR_TYPE_UINT64, swap_fn}, + {UR_TYPE_INT8, swap_fn}, + {UR_TYPE_INT16, swap_fn}, + {UR_TYPE_INT32, swap_fn}, + {UR_TYPE_INT64, swap_fn} +}; + +} // namespace + +namespace AdsNorm { + +AdsNormalizer::AdsNormalizer(const std::string& templateStr) +{ + std::vector names; + + std::istringstream outer(templateStr); + std::string token; + + while (std::getline(outer, token, ',')) { + std::istringstream inner(token); + std::string droppedType, name; + + if (!(inner >> droppedType >> name) || name.empty()) { + throw std::invalid_argument("AdsNormalizer: malformed segment: \"" + token + '"'); + } + names.push_back(std::move(name)); + } + if (names.size() % 2 != 0) { + throw std::invalid_argument( + "AdsNormalizer: odd number of column names in template string"); + } + + m_str_fields.reserve(names.size() / 2); + for (size_t i = 0; i < names.size(); i += 2) { + m_str_fields.emplace_back(std::move(names[i]), std::move(names[i + 1])); + } +} + +UnirecRecord AdsNormalizer::swapColumnPairs(const UnirecRecordView& record) +{ + to_send_record.copyFieldsFrom(record); + for (const auto& pair_id : m_id_fields) { + auto field_type = ur_get_type(pair_id.first); + auto it = swap_dispatch.find(field_type); + if (it != swap_dispatch.end()) { + it->second(to_send_record, pair_id.first, pair_id.second); + } else { + throw std::runtime_error("AdsNormalizer: unsupported field type for rotation"); + } + } + + return to_send_record; +} + +std::optional AdsNormalizer::sendOriginal(UnirecRecordView& record) +{ + m_stats.totalRecords++; + + auto prefix_tag = record.getFieldAsType(m_prefix_tag_id); + if (prefix_tag == 0) { + return std::nullopt; // Do not send the record if PREFIX_TAG is 0. + } + m_stats.nonRotatedRecords++; + return record; +} + +std::optional AdsNormalizer::sendRotated(UnirecRecordView& record) +{ + auto prefix_tag_dst = record.getFieldAsType(m_prefix_tag_dst_id); + if (prefix_tag_dst == 0) { + return std::nullopt; // Do not send the record if PREFIX_TAG_DST is 0. + } + + auto rotated_record = swapColumnPairs(record); + m_stats.rotatedRecords++; + return rotated_record; +} + +void AdsNormalizer::updateUnirec(UnirecRecord& new_record) +{ + for(const auto& str_fields : m_str_fields) { + ur_field_id_t field = getUnirecIdByName(str_fields.first.c_str()); + ur_field_id_t field_rev = getUnirecIdByName(str_fields.second.c_str()); + m_id_fields.emplace_back(std::make_pair(field, field_rev)); + } + m_prefix_tag_id = getUnirecIdByName("PREFIX_TAG"); + m_prefix_tag_dst_id = getUnirecIdByName("PREFIX_TAG_DST"); + to_send_record = UnirecRecord(new_record); +} + + + + + + + + + + +} // namespace AdsNorm diff --git a/modules/adsnormalizer/src/adsnormalizer.hpp b/modules/adsnormalizer/src/adsnormalizer.hpp new file mode 100644 index 00000000..002cfe3a --- /dev/null +++ b/modules/adsnormalizer/src/adsnormalizer.hpp @@ -0,0 +1,79 @@ +/** + * @file normalizer.hpp + * @author Jaroslav Pesek + * @brief Declaration of the ADS Normalizer class. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + + +namespace AdsNorm { + +using namespace Nemea; + +struct AdsNormalizerStats { + uint64_t totalRecords = 0; ///< Total number of records processed. + uint64_t rotatedRecords = 0; ///< Number of records that were rotated. + uint64_t nonRotatedRecords = 0; ///< Number of records that were not rotated. +}; + + +/** + * @brief A class for normalizing ADS records. + * + * This class provides functionality to normalize ADS records by rotating them + * based on PREFIX_TAG or PREFIX_TAG_DST. + * If PREFIX_TAG == 0 and PREFIX_TAG_DST == 0, the unirec is dropped and info is logged as unexpected situation. + * If PREFIX_TAG != 0 and PREFIX_TAG_DST == 0, the rotatable columns are NOT rotated and the unirec is send as is. + * If PREFIX_TAG == 0 and PREFIX_TAG_DST != 0, the rotatable columns ARE rotated and the unirec is send with the rotated columns. + * If PREFIX_TAG != 0 and PREFIX_TAG_DST != 0, the unirec is send 2 times: rotated and non-rotated. + */ +class AdsNormalizer { +public: + /** + * @brief Constructs an AdsNormalizer object with the given template string. + * @param templateStr The template string for the normalization - strings containing the column names to be rotated. + * The template string should contain pairs of column names separated by a comma. + * For example: "ipaddr SRC_IP,ipaddr DST_IP,uint16 SRC_PORT,uint16 DST_PORT,uint32 BYTES,uint32 BYTES_REV". + */ + explicit AdsNormalizer(const std::string& templateStr); + + /** + * @brief Rotates the columns of the given record based on the template string. + * @param record The UnirecRecordView object representing the record to be normalized. + * + * This function modifies the record in place by rotating the specified columns + * according to the template string provided during construction. + */ + UnirecRecord swapColumnPairs(const UnirecRecordView& record); + + std::optional sendOriginal(UnirecRecordView& record); + + std::optional sendRotated(UnirecRecordView& record); + + void updateUnirec(UnirecRecord& new_record); + +private: + std::vector> m_str_fields; ///< Template string for the normalization. + std::vector> m_id_fields; ///< Vector of pairs of column names to be rotated. + AdsNormalizerStats m_stats; ///< Statistics of the normalization process. + ur_field_id_t m_prefix_tag_id; ///< Unirec ID for PREFIX_TAG field. + ur_field_id_t m_prefix_tag_dst_id; ///< Unirec ID for PREFIX_TAG_DST field. + UnirecRecord to_send_record; +}; + + + + + + +} // namespace AdsNorm diff --git a/modules/adsnormalizer/src/main.cpp b/modules/adsnormalizer/src/main.cpp new file mode 100644 index 00000000..2c40c5e9 --- /dev/null +++ b/modules/adsnormalizer/src/main.cpp @@ -0,0 +1,205 @@ +/** + * @file + * @author Jaroslav Pesek + * @author Karel Hynek + * @author Pavel Siska + * @brief Sampling Module: Sample flowdata + * + * This file contains the main function and supporting functions for the Unirec Sampling Module. + * This module process Unirec records thourgh a bidirectional interface and samples them accoring + * to user specified sampling rate. It utilizes the Unirec++ library for + * record handling, argparse for command-line argument parsing. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "logger/logger.hpp" +#include "unirec/unirec-telemetry.hpp" +#include "adsnormalizer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace Nemea; + +std::atomic g_stopFlag(false); + +void signalHandler(int signum) +{ + Nm::loggerGet("signalHandler")->info("Interrupt signal {} received", signum); + g_stopFlag.store(true); +} + +/** + * @brief Handle a format change exception by adjusting the template. + * + * This function is called when a `FormatChangeException` is caught in the main loop. + * It adjusts the template in the bidirectional interface to handle the format change. + * + * @param biInterface Bidirectional interface for Unirec communication. + */ +void handleFormatChange(UnirecBidirectionalInterface& biInterface, AdsNorm::AdsNormalizer& normalizer) +{ + biInterface.changeTemplate(); + auto newRecord = biInterface.createUnirecRecord(); + normalizer.updateUnirec(newRecord); +} + +/** + * @brief Process the next Unirec record and sample them. + * + * This function receives the next Unirec record through the bidirectional interface + * and performs sampling. + * + * @param biInterface Bidirectional interface for Unirec communication. + */ + +void processNextRecord(UnirecBidirectionalInterface& biInterface, AdsNorm::AdsNormalizer& normalizer) +{ + std::optional unirecRecord = biInterface.receive(); + if (!unirecRecord) { + return; + } + auto rec1 = normalizer.sendRotated(*unirecRecord); + if (rec1) { + biInterface.send(*rec1); + } + auto rec = normalizer.sendOriginal(*unirecRecord); + if (rec) { + biInterface.send(*rec); + } +} + +/** + * @brief Process Unirec records. + * + * The `processUnirecRecords` function continuously receives Unirec records through the provided + * bidirectional interface (`biInterface`) and performs sampling. The loop runs indefinitely until + * an end-of-file condition is encountered. + * + * @param biInterface Bidirectional interface for Unirec communication. + * @param sampler Sampler class for sampling. + */ +void processUnirecRecords(UnirecBidirectionalInterface& biInterface, AdsNorm::AdsNormalizer& normalizer) +{ + while (!g_stopFlag.load()) { + try { + processNextRecord(biInterface, normalizer); + } catch (FormatChangeException& ex) { + handleFormatChange(biInterface, normalizer); + } catch (EoFException& ex) { + break; + } catch (std::exception& ex) { + throw; + } + } +} + +// telemetry::Content getSamplerTelemetry(const Sampler::Sampler& sampler) +//{ +// auto stats = sampler.getStats(); + +// telemetry::Dict dict; +// dict["totalRecords"] = stats.totalRecords; +// dict["sampledRecords"] = stats.sampledRecords; +// return dict; +//} + +int main(int argc, char** argv) +{ + argparse::ArgumentParser program("ADS Normalizer"); + + Unirec unirec({1, 1, "adsnormalizer", "ADS Normalizer module"}); + + Nm::loggerInit(); + auto logger = Nm::loggerGet("main"); + + signal(SIGINT, signalHandler); + + try { + program.add_argument("-t", "--template") + .required() + .help( + "Specify the tempate of flipable fields in form ,,,, ... .") + .default_value(std::string("")); + program.add_argument("-m", "--appfs-mountpoint") + .required() + .help("path where the appFs directory will be mounted") + .default_value(std::string("")); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + unirec.init(argc, argv); + } catch (HelpException& ex) { + std::cerr << program; + return EXIT_SUCCESS; + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + program.parse_args(argc, argv); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + std::shared_ptr telemetryRootDirectory; + telemetryRootDirectory = telemetry::Directory::create(); + + std::unique_ptr appFs; + + try { + auto mountPoint = program.get("--appfs-mountpoint"); + if (!mountPoint.empty()) { + const bool tryToUnmountOnStart = true; + const bool createMountPoint = true; + appFs = std::make_unique( + telemetryRootDirectory, + mountPoint, + tryToUnmountOnStart, + createMountPoint); + appFs->start(); + } + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + + try { + const std::string unirecTemplate = program.get("--template"); + + UnirecBidirectionalInterface biInterface = unirec.buildBidirectionalInterface(); +/* + auto telemetryInputDirectory = telemetryRootDirectory->addDir("input"); + const telemetry::FileOps inputFileOps + = {[&biInterface]() { return Nm::getInterfaceTelemetry(biInterface); }, nullptr}; + const auto inputFile = telemetryInputDirectory->addFile("stats", inputFileOps); + + auto telemetrySamplerDirectory = telemetryRootDirectory->addDir("sampler"); + const telemetry::FileOps samplerFileOps + = {[&sampler]() { return getSamplerTelemetry(sampler); }, nullptr}; + const auto samplerFile = telemetrySamplerDirectory->addFile("stats", samplerFileOps); +*/ + auto normalizer = AdsNorm::AdsNormalizer(unirecTemplate); + processUnirecRecords(biInterface, normalizer); + + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + + return EXIT_SUCCESS; +}