diff --git a/README.md b/README.md index 80841610..be2a1a53 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ This repository contains basic modules of the [NEMEA system](https://github.com/CESNET/Nemea). The modules and their functionality/purposes are: -* [ListDetector](modules/listdetector/): forwards records that match rules list. +* [Clickhouse](modules/clickhouse/): converts unirec into clickhouse DB. +* [Deduplicator](modules/deduplicator/): omit duplicate records. +* [ListDetector](modules/listDetector/): forwards records that match rules list. * [Sampler](modules/sampler/): sample records at the given rate. * [Telemetry](modules/telemetry/): provides unirec telemetry of the input interface. -* [Deduplicator](modules/deduplicator/): omit duplicate records. diff --git a/common/external/CMakeLists.txt b/common/external/CMakeLists.txt index 81c13d76..d9abcf88 100644 --- a/common/external/CMakeLists.txt +++ b/common/external/CMakeLists.txt @@ -8,3 +8,5 @@ include(spdlog.cmake) include(rapidcsv.cmake) include(argparse.cmake) include(xxhash.cmake) +include(clickhouse_cpp.cmake) +include(yaml_cpp.cmake) diff --git a/common/external/clickhouse_cpp.cmake b/common/external/clickhouse_cpp.cmake new file mode 100644 index 00000000..cca3c6ca --- /dev/null +++ b/common/external/clickhouse_cpp.cmake @@ -0,0 +1,18 @@ +# clickhouse-cpp library (C++ client for ClickHouse) +include(FetchContent) + +FetchContent_Declare( + clickhouse_cpp + GIT_REPOSITORY "https://github.com/SiskaPavel/clickhouse-cpp.git" + GIT_TAG "65205a8" + GIT_SHALLOW ON +) + +set(DEBUG_DEPENDENCIES OFF) +set(CLICKHOUSE_INSTALL_TARGETS OFF) + +add_compile_options(-Wno-pedantic -Wno-conversion -Wno-sign-conversion) + +FetchContent_MakeAvailable(clickhouse_cpp) + +add_library(clickhouse_cpp::client ALIAS clickhouse-cpp-lib) diff --git a/common/external/yaml_cpp.cmake b/common/external/yaml_cpp.cmake new file mode 100644 index 00000000..ce7b9f2e --- /dev/null +++ b/common/external/yaml_cpp.cmake @@ -0,0 +1,30 @@ +# Yaml-cpp library +# +# yaml-cpp is a YAML parser and emitter in C++ matching the YAML 1.2 spec. +# +# "yaml-cpp" is exposed to be used as a dependency in other CMake targets +# example usage: target_link_libraries(my_target PRIVATE yaml-cpp) + +include(FetchContent) + +FetchContent_Declare( + yaml-cpp + GIT_REPOSITORY https://github.com/jbeder/yaml-cpp.git + GIT_TAG f732014 # yaml-cpp-0.8.0 +) + +# Make sure that subproject accepts predefined build options without warnings. +set(CMAKE_POLICY_DEFAULT_CMP0077 NEW) + +# Library does not compile with -Werror that we use in some builds +string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ") +string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ") +string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ") +string(REPLACE "-Wconversion " " " CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ") +string(REPLACE "-Wconversion " " " CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} ") +string(REPLACE "-Wconversion " " " CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ") +set(YAML_CPP_BUILD_TESTS OFF) +set(YAML_CPP_BUILD_TOOLS OFF) +set(YAML_CPP_INSTALL OFF) + +FetchContent_MakeAvailable(yaml-cpp) diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index df464f25..2d84c1c3 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -2,3 +2,4 @@ add_subdirectory(listDetector) add_subdirectory(sampler) add_subdirectory(telemetry) add_subdirectory(deduplicator) +add_subdirectory(clickhouse) diff --git a/modules/clickhouse/CMakeLists.txt b/modules/clickhouse/CMakeLists.txt new file mode 100644 index 00000000..febd4f0a --- /dev/null +++ b/modules/clickhouse/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(src) diff --git a/modules/clickhouse/README.md b/modules/clickhouse/README.md new file mode 100644 index 00000000..cc3f319a --- /dev/null +++ b/modules/clickhouse/README.md @@ -0,0 +1,165 @@ +# clickhouse output module +Converts Unirec records into clickhouse format and stores them into database/s. +- When multiple database endpoints are specified data is sent only to one of them. +By default it is the first one and the others are used if the previous ones fail. + +## Interfaces +- Input: 1 +- Output: 0 + +## Parameters +### Common TRAP parameters +- `-h [trap,1]` Print help message for this module / for libtrap specific parameters. +- `-i IFC_SPEC` Specification of interface types and their parameters. +- `-v` Be verbose. +- `-vv` Be more verbose. +- `-vvv` Be even more verbose. + +### Module specific parameters +- `-c, --config ` YAML config specifying connections params and data columns + +## Usage +The module expects the ClickHouse database to already contain the table with +appropriate schema corresponding to the configuration entered. The existence +and schema of the table is checked after initiating connection to the database +and an error is displayed if there is a mismatch. The table is not +automatically created. + +### Unirec to clickhouse type conversion +| Unirec | Clickhouse | | Unirec | Clickhouse | +|---------|---------------|-|----------|----------------------| +| int8 | Int8 | | int8* | Array(Int8) | +| int16 | Int16 | | int16* | Array(Int16) | +| int32 | Int32 | | int32* | Array(Int32) | +| int64 | Int64 | | int64* | Array(Int64) | +| uint8 | UInt8 | | uint8* | Array(UInt8) | +| uint16 | UInt16 | | uint16* | Array(UInt16) | +| uint32 | UInt32 | | uint32* | Array(UInt32) | +| uint64 | UInt64 | | uint64* | Array(UInt64) | +| char | UInt8 | | char* | Array(UInt8) | +| float | Float32 | | float* | Array(Float32) | +| double | Float64 | | double* | Array(Float64) | +| ipaddr | IPv6 | | ipaddr* | Array(IPv6) | +| macaddr | Array(UInt8) | | macaddr* | Array(Array(UInt8)) | +| time | DateTime64(9) | | time* | Array(DateTime64(9)) | +| string | String | | | | +| bytes | Array(UInt8) | | | | + +### Clickhouse database and table creation example +```SQL +CREATE DATABASE IF NOT EXISTS clickhouse; +CREATE TABLE clickhouse.flows( + "DST_IP" IPv6, + "SRC_IP" IPv6, + "BYTES" UInt64, + "BYTES_REV" UInt64, + "LINK_BIT_FIELD" UInt64, + "TIME_FIRST" DateTime64(9), + "TIME_LAST" DateTime64(9), + "PACKETS" UInt32, + "PACKETS_REV" UInt32, + "DST_PORT" UInt16, + "SRC_PORT" UInt16, + "FLOW_END_REASON" UInt8, + "PROTOCOL" UInt8, + "TCP_FLAGS" UInt8, + "TCP_FLAGS_REV" UInt8, + "IDP_CONTENT" Array(UInt8), + "IDP_CONTENT_REV" Array(UInt8), + "PPI_PKT_DIRECTIONS" Array(Int8), + "PPI_PKT_FLAGS" Array(UInt8), + "TLS_JA3_FINGERPRINT" Array(UInt8), + "TLS_SNI" String, + "PPI_PKT_LENGTHS" Array(UInt16), + "DBI_BRST_BYTES" Array(UInt32), + "DBI_BRST_PACKETS" Array(UInt32), + "D_PHISTS_IPT" Array(UInt32), + "D_PHISTS_SIZES" Array(UInt32), + "SBI_BRST_BYTES" Array(UInt32), + "SBI_BRST_PACKETS" Array(UInt32), + "S_PHISTS_IPT" Array(UInt32), + "S_PHISTS_SIZES" Array(UInt32), + "DBI_BRST_TIME_START" Array(DateTime64(9)), + "DBI_BRST_TIME_STOP" Array(DateTime64(9)), + "PPI_PKT_TIMES" Array(DateTime64(9)), + "SBI_BRST_TIME_START" Array(DateTime64(9)), + "SBI_BRST_TIME_STOP" Array(DateTime64(9)) +) +ENGINE = MergeTree +ORDER BY TIME_FIRST +``` + +## Configuration +YAML config + +### Config specification +| Parameter | Description | Default | +|-----------|-------------|---------| +| **connection** | The database connection parameters. | | +| connection.endpoints | The possible endpoints data can be sent to, i.e., all the replicas of a particular shard. In case one endpoint is unreachable, another one is used. | | +| connection.endpoints.endpoint | Connection parameters of one endpoint. | | +| connection.endpoints.endpoint.host | The ClickHouse database host as a domain name or an IP address. | | +| connection.endpoints.endpoint.port | The port of the ClickHouse database. | 9000 | +| connection.username | The database username. | | +| connection.password | The database password. | | +| connection.database | The database name where the specified table is present. | | +| connection.table | The name of the table to insert the data into. | | +| **blocks** | Number of data blocks in circulation. Each block is de-facto a memory buffer that the rows are written to before being sent out to the ClickHouse database. | 64 | +| **inserterThreads** | Number of threads used for data insertion to ClickHouse. In other words, the number of ClickHouse connections that are concurrently used. | 8 | +| **blockInsertThreshold** | Number of rows to be buffered into a block before the block is sent out to be inserted into the database. | 100000 | +| **blockInsertMaxDelaySecs** | Maximum number of seconds to wait before a block gets sent out to be inserted into the database even if the threshold has not been reached yet. | 10 | +| **columns** | List of fields which each row consists of. It is in unirec template format. ([TYPE] [NAME]) | | + + +### Example configuration +```YAML +connection: + endpoints: + - host: localhost + port: 9000 + username: clickhouse + password: clickhouse + database: clickhouse + table: flows + +inserterThreads: 32 +blocks: 1024 +blockInsertThreshold: 100000 + +columns: + - ipaddr DST_IP + - ipaddr SRC_IP + - uint64 BYTES + - uint64 BYTES_REV + - uint64 LINK_BIT_FIELD + - time TIME_FIRST + - time TIME_LAST + - uint32 PACKETS + - uint32 PACKETS_REV + - uint16 DST_PORT + - uint16 SRC_PORT + - uint8 FLOW_END_REASON + - uint8 PROTOCOL + - uint8 TCP_FLAGS + - uint8 TCP_FLAGS_REV + - bytes IDP_CONTENT + - bytes IDP_CONTENT_REV + - int8* PPI_PKT_DIRECTIONS + - uint8* PPI_PKT_FLAGS + - bytes TLS_JA3_FINGERPRINT + - string TLS_SNI + - uint16* PPI_PKT_LENGTHS + - uint32* DBI_BRST_BYTES + - uint32* DBI_BRST_PACKETS + - uint32* D_PHISTS_IPT + - uint32* D_PHISTS_SIZES + - uint32* SBI_BRST_BYTES + - uint32* SBI_BRST_PACKETS + - uint32* S_PHISTS_IPT + - uint32* S_PHISTS_SIZES + - time* DBI_BRST_TIME_START + - time* DBI_BRST_TIME_STOP + - time* PPI_PKT_TIMES + - time* SBI_BRST_TIME_START + - time* SBI_BRST_TIME_STOP +``` diff --git a/modules/clickhouse/src/CMakeLists.txt b/modules/clickhouse/src/CMakeLists.txt new file mode 100644 index 00000000..3534cc5e --- /dev/null +++ b/modules/clickhouse/src/CMakeLists.txt @@ -0,0 +1,19 @@ +add_executable(clickhouse + main.cpp + config.cpp + datatype.cpp + inserter.cpp + manager.cpp +) + +target_link_libraries(clickhouse PRIVATE + clickhouse_cpp::client + common + unirec::unirec++ + unirec::unirec + trap::trap + argparse + yaml-cpp +) + +install(TARGETS clickhouse DESTINATION ${INSTALL_DIR_BIN}) diff --git a/modules/clickhouse/src/clickhouse.hpp b/modules/clickhouse/src/clickhouse.hpp new file mode 100644 index 00000000..26a6f030 --- /dev/null +++ b/modules/clickhouse/src/clickhouse.hpp @@ -0,0 +1,5 @@ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" +#pragma GCC diagnostic ignored "-Wsign-conversion" +#include +#pragma GCC diagnostic pop diff --git a/modules/clickhouse/src/config.cpp b/modules/clickhouse/src/config.cpp new file mode 100644 index 00000000..7ea49d6a --- /dev/null +++ b/modules/clickhouse/src/config.cpp @@ -0,0 +1,206 @@ +/** + * @file config.cpp + * @author Daniel Pelanek + * @brief Parses config xml into config structure. Uses rapidxml. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "config.hpp" +#include "yaml-cpp/yaml.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief Remove leading spaces from string. + * + * @param str + */ +static inline void trimLeft(std::string& str) +{ + str.erase(str.begin(), std::find_if(str.begin(), str.end(), [](unsigned char chr) { + return std::isspace(chr) == 0; + })); +} + +/** + * @brief Remove spaces from string on both sides. + * + * @param str + */ +static inline void trim(std::string& str) +{ + str.erase( + std::find_if( + str.rbegin(), + str.rend(), + [](unsigned char chr) { return std::isspace(chr) == 0; }) + .base(), + str.end()); + + str.erase(str.begin(), std::find_if(str.begin(), str.end(), [](unsigned char chr) { + return std::isspace(chr) == 0; + })); +} + +static Config::Endpoint parseEndpoint(const YAML::Node& node) +{ + Config::Endpoint endpoint; + + if (node["host"]) { + endpoint.host = node["host"].as(); + + if (node["port"]) { + endpoint.port = node["port"].as(); + } + } else { + throw std::runtime_error(std::string("Host parameter missing")); + } + + return endpoint; +} + +static void parseEndpoints(const YAML::Node& node, Config& config) +{ + for (const YAML::Node& endpoint : node) { + config.connection.endpoints.push_back(parseEndpoint(endpoint)); + } +} + +/** + * @brief type from unirec template into local enum. + * + */ +static const std::map g_string_to_columntype + = {{"int8", ColumnType::INT8}, {"int8*", ColumnType::INT8_ARR}, + {"int16", ColumnType::INT16}, {"int16*", ColumnType::INT16_ARR}, + {"int32", ColumnType::INT32}, {"int32*", ColumnType::INT32_ARR}, + {"int64", ColumnType::INT64}, {"int64*", ColumnType::INT64_ARR}, + {"uint8", ColumnType::UINT8}, {"uint8*", ColumnType::UINT8_ARR}, + {"uint16", ColumnType::UINT16}, {"uint16*", ColumnType::UINT16_ARR}, + {"uint32", ColumnType::UINT32}, {"uint32*", ColumnType::UINT32_ARR}, + {"uint64", ColumnType::UINT64}, {"uint64*", ColumnType::UINT64_ARR}, + {"char", ColumnType::CHAR}, {"char*", ColumnType::CHAR_ARR}, + {"float", ColumnType::FLOAT}, {"float*", ColumnType::FLOAT_ARR}, + {"double", ColumnType::DOUBLE}, {"double*", ColumnType::DOUBLE_ARR}, + {"ipaddr", ColumnType::IPADDR}, {"ipaddr*", ColumnType::IPADDR_ARR}, + {"macaddr", ColumnType::MACADDR}, {"macaddr*", ColumnType::MACADDR_ARR}, + {"time", ColumnType::TIME}, {"time*", ColumnType::TIME_ARR}, + {"string", ColumnType::STRING}, {"bytes", ColumnType::BYTES}}; + +static void parseColumns(const YAML::Node& columnsNode, Config& config) +{ + for (const YAML::Node& col : columnsNode) { + auto colValue = col.as(); + // Type/Name can't have space. Trim leading and trailing spaces. + // Leading spaces for name are trimmed below. + trim(colValue); + + Config::Column column; + size_t const spacePos = colValue.find(' '); + + std::string const type = colValue.substr(0, spacePos); + std::string name = colValue.substr(spacePos + 1); + + try { + column.type = g_string_to_columntype.at(type); + + } catch (std::out_of_range& ex) { + std::stringstream sstream; + sstream << "Incorrect column type: " << colValue.substr(0, spacePos); + throw std::runtime_error(sstream.str()); + } + + trimLeft(name); + column.name = name; + + column.fieldID = 0; + + config.columns.push_back(column); + + // Template stored in input interface format. For ensuring format. + config.templateColumnCsv += type; + config.templateColumnCsv += " "; + config.templateColumnCsv += name; + config.templateColumnCsv += ","; + } + + // Trailing comma + config.templateColumnCsv.pop_back(); +} + +static void parseConnection(const YAML::Node& node, Config& config) +{ + parseEndpoints(node["endpoints"], config); + + if (node["username"] && node["password"] && node["database"] && node["table"]) { + config.connection.user = node["username"].as(); + config.connection.password = node["password"].as(); + config.connection.database = node["database"].as(); + config.connection.table = node["table"].as(); + + } else { + throw std::runtime_error(std::string("Argument in connection missing")); + } +} + +static void parseBlocks(const YAML::Node& node, Config& config) +{ + if (node) { + config.blocks = node.as(); + } +} + +static void parseInserterThreads(const YAML::Node& node, Config& config) +{ + if (node) { + config.inserterThreads = node.as(); + } +} + +static void parseBlockInsertThreshold(const YAML::Node& node, Config& config) +{ + if (node) { + config.blockInsertThreshold = node.as(); + } +} + +static void parseBlockInsertMaxDelaySecs(const YAML::Node& node, Config& config) +{ + if (node) { + config.blockInsertMaxDelaySecs = node.as(); + } +} + +static void parseRoot(const YAML::Node& node, Config& config) +{ + parseConnection(node["connection"], config); + parseColumns(node["columns"], config); + + parseBlocks(node["blocks"], config); + parseInserterThreads(node["inserterThreads"], config); + parseBlockInsertThreshold(node["blockInsertThreshold"], config); + parseBlockInsertMaxDelaySecs(node["blockInsertMaxDelaySecs"], config); +} + +Config parseConfig(const std::string& filename) +{ + Config config {}; + + const YAML::Node root = YAML::LoadFile(filename); + + parseRoot(root, config); + + return config; +} diff --git a/modules/clickhouse/src/config.hpp b/modules/clickhouse/src/config.hpp new file mode 100644 index 00000000..02200a47 --- /dev/null +++ b/modules/clickhouse/src/config.hpp @@ -0,0 +1,123 @@ +/** + * @file manager.hpp + * @author Daniel Pelanek + * @brief Declares Config struct and function for parsing it. + * Also declares possible types of column. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include + +/** + * @brief Possible unirec column type. + * + */ +enum ColumnType : uint8_t { + INT8_ARR = 0, + INT16_ARR, + INT32_ARR, + INT64_ARR, + + UINT8_ARR, + UINT16_ARR, + UINT32_ARR, + UINT64_ARR, + + CHAR_ARR, + FLOAT_ARR, + DOUBLE_ARR, + IPADDR_ARR, + MACADDR_ARR, + TIME_ARR, + BYTES, + + MACADDR, + + INT8, + INT16, + INT32, + INT64, + + UINT8, + UINT16, + UINT32, + UINT64, + + CHAR, + FLOAT, + DOUBLE, + + IPADDR, + TIME, + STRING, +}; + +/** + * @class Config + * @brief A struct containing all the configurable parameters + */ +struct Config { + static const uint16_t DEFAULT_PORT = 9000; ///< Default port of clickhouse db + static const uint64_t DEFAULT_INSERTER_THREADS = 32; ///< Default num of inserters + static const uint64_t DEFAULT_BLOCKS = 256; ///< Default num of blocks + static const uint64_t DEFAULT_BLOCK_INSERT_THRESHOLD + = 100000; ///< Default num of columns to trigger insert + static const uint64_t DEFAULT_MAX_BLOCK_INSERT_DELAY = 10; ///< Default max time before insert + + /** + * @brief Data from unirec template about column. + * + */ + struct Column { + std::string name; ///< column name + ColumnType type; ///< column type + ur_field_id_t fieldID; ///< column unirec id + }; + + /** + * @brief Endpoint for clickhouse database instance. + * + */ + struct Endpoint { + std::string host; ///< db hostname + uint16_t port = DEFAULT_PORT; ///< db port + }; + + /** + * @brief Contains database endpoints, information for connecting to + * them and into which table in them to insert data. + * + */ + struct Connection { + std::vector endpoints; ///< Endpoints of databases to send to + std::string user; ///< username for connection + std::string password; ///< password for connection + std::string database; ///< database in instance + std::string table; ///< table name in database + }; + + Connection connection; ///< Clickhouse database connection info + std::vector columns; ///< Columns found in config + std::string templateColumnCsv; ///< For comparing with unirec template when it changes + uint64_t inserterThreads = DEFAULT_INSERTER_THREADS; ///< num of inserters + uint64_t blocks = DEFAULT_BLOCKS; ///< Number of blocks for storing + uint64_t blockInsertThreshold + = DEFAULT_BLOCK_INSERT_THRESHOLD; ///< Higher than triggers insertion + uint64_t blockInsertMaxDelaySecs + = DEFAULT_MAX_BLOCK_INSERT_DELAY; ///< Longer than triggers insertion +}; + +/** + * @brief Parse a XML config into a structured form + * + * @param xml The config as a XML string + * @return The parsed config + */ +Config parseConfig(const std::string& filename); diff --git a/modules/clickhouse/src/datatype.cpp b/modules/clickhouse/src/datatype.cpp new file mode 100644 index 00000000..7d6db5a9 --- /dev/null +++ b/modules/clickhouse/src/datatype.cpp @@ -0,0 +1,659 @@ +/** + * @file inserter.cpp + * @author Daniel Pelanek + * @brief Defines functions for creating column lambdas for creating, writing + * and parsing unirec data into them. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "datatype.hpp" + +#include + +template +class ColumnDateTime64 : public clickhouse::ColumnDateTime64 { +public: + ColumnDateTime64() + : clickhouse::ColumnDateTime64(Precision) + { + } +}; + +namespace Getters { + +template +static Value getValue(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Value value = record.getFieldAsType(fieldID); + return value; +} + +template +static std::vector getValueArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const arr = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(arr.size()); + std::copy(arr.begin(), arr.end(), std::back_inserter(result)); + return result; +} + +static std::vector getBytes(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const arr = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(arr.size()); + std::transform(arr.begin(), arr.end(), std::back_inserter(result), [](std::byte value) { + return static_cast(value); + }); + return result; +} + +static in6_addr getIp(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::IpAddress addr = record.getFieldAsType(fieldID); + return *((in6_addr*) &addr.ip); +} + +static std::vector getIpArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const addrArr + = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(addrArr.size()); + std::transform( + addrArr.begin(), + addrArr.end(), + std::back_inserter(result), + [](const Nemea::IpAddress& value) -> in6_addr { + return *reinterpret_cast(&value.ip); + }); + return result; +} + +static std::vector getMac(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::MacAddress const mac = record.getFieldAsType(fieldID); + std::vector result; + const int nMacBytes = 6; + result.reserve(nMacBytes); + std::copy(std::begin(mac.mac.bytes), std::end(mac.mac.bytes), std::back_inserter(result)); + return result; +} + +static std::vector> +getMacArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const macArr + = record.getFieldAsUnirecArray(fieldID); + std::vector> result; + result.reserve(macArr.size()); + for (const auto& value : macArr) { + result.emplace_back(); + for (const unsigned char byte : value.mac.bytes) { + result.back().push_back(byte); + } + } + return result; +} + +static uint64_t getTime(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UrTime const time = record.getFieldAsType(fieldID); + const uint64_t nsecInSec = 1000000000; + return ( + (static_cast(ur_time_get_sec(time.time)) * nsecInSec) + + static_cast(ur_time_get_nsec(time.time))); +} + +static std::vector getTimeArr(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + Nemea::UnirecArray const timeArr + = record.getFieldAsUnirecArray(fieldID); + std::vector result; + result.reserve(timeArr.size()); + const uint64_t nsecInSec = 1000000000; + std::transform( + timeArr.begin(), + timeArr.end(), + std::back_inserter(result), + [](const Nemea::UrTime& value) -> uint64_t { + return (static_cast(ur_time_get_sec(value.time)) * nsecInSec) + + static_cast(ur_time_get_nsec(value.time)); + }); + return result; +} + +static std::string getString(Nemea::UnirecRecordView& record, ur_field_id_t fieldID) +{ + std::string str = record.getFieldAsType(fieldID); + return str; +} +} // namespace Getters + +template +struct DataTypeTraits {}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt8; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt8"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt16; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt16"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt32; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt32"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt64"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt8; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int8"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt16; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int16"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt32; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int32"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnInt64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Int64"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt16)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt32)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt64)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int8)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int16)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int32)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Int64)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnUInt8; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "UInt8"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnFloat32; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Float32"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Float32)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnFloat64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Float64"; + static constexpr auto GETTER = &Getters::getValue; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Float64)"; + static constexpr auto GETTER = &Getters::getValueArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnIPv6; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "IPv6"; + static constexpr auto GETTER = &Getters::getIp; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(IPv6)"; + static constexpr auto GETTER = &Getters::getIpArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getMac; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT>; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(Array(UInt8))"; + static constexpr auto GETTER = &Getters::getMacArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = ColumnDateTime64; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "DateTime64(9)"; + static constexpr auto GETTER = &Getters::getTime; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT>; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(DateTime64(9))"; + static constexpr auto GETTER = &Getters::getTimeArr; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnString; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "String"; + static constexpr auto GETTER = &Getters::getString; +}; + +template <> +struct DataTypeTraits { + using ColumnType = clickhouse::ColumnArrayT; + static constexpr std::string_view CLICKHOUSE_TYPE_NAME = "Array(UInt8)"; + static constexpr auto GETTER = &Getters::getBytes; +}; + +template +static void visit(ColumnType type, Func func) +{ + switch (type) { + case ColumnType::INT8: + func(DataTypeTraits {}); + break; + case ColumnType::INT16: + func(DataTypeTraits {}); + break; + case ColumnType::INT32: + func(DataTypeTraits {}); + break; + case ColumnType::INT64: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME: + func(DataTypeTraits {}); + break; + case ColumnType::STRING: + func(DataTypeTraits {}); + break; + case ColumnType::INT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::BYTES: + func(DataTypeTraits {}); + break; + default: + throw std::runtime_error("invalid data type"); + } +} + +template +static void visitNonArr(ColumnType type, Func func) +{ + switch (type) { + case ColumnType::INT8: + func(DataTypeTraits {}); + break; + case ColumnType::INT16: + func(DataTypeTraits {}); + break; + case ColumnType::INT32: + func(DataTypeTraits {}); + break; + case ColumnType::INT64: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME: + func(DataTypeTraits {}); + break; + case ColumnType::STRING: + func(DataTypeTraits {}); + break; + default: + throw std::runtime_error("invalid data type"); + } +} + +template +static void visitArr(ColumnType type, Func func) +{ + switch (type) { + case ColumnType::INT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::INT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT8_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT16_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT32_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::UINT64_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::CHAR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::FLOAT_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::DOUBLE_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::IPADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR: + func(DataTypeTraits {}); + break; + case ColumnType::MACADDR_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::TIME_ARR: + func(DataTypeTraits {}); + break; + case ColumnType::BYTES: + func(DataTypeTraits {}); + break; + default: + throw std::runtime_error("invalid data type"); + } +} + +static bool isArr(ColumnType type) +{ + return type < ColumnType::INT8; +} + +static std::shared_ptr makeArrColumn(ColumnType type) +{ + std::shared_ptr column; + visit(type, [&](auto traits) { + using ColType = typename decltype(traits)::ColumnType; + column = std::make_shared(); + }); + + return column; +} + +static std::shared_ptr makeNonArrColumn(ColumnType type) +{ + std::shared_ptr column; + visit(type, [&](auto traits) { + using ColType = clickhouse::ColumnNullableT; + column = std::make_shared(); + }); + + return column; +} + +std::shared_ptr makeColumn(ColumnType type) +{ + if (isArr(type)) { + return makeArrColumn(type); + } + return makeNonArrColumn(type); +} + +GetterFn makeGetter(ColumnType type) +{ + GetterFn getter; + visit(type, [&](auto traits) { + getter = [](Nemea::UnirecRecordView& record, ur_field_id_t fieldID, ValueVariant& value) { + value = decltype(traits)::GETTER(record, fieldID); + }; + }); + return getter; +} + +static ColumnWriterFn makeArrColumnwriter(ColumnType type) +{ + ColumnWriterFn columnwriter; + + visitArr(type, [&](auto traits) { + columnwriter = [](ValueVariant* value, clickhouse::Column& column) { + using ColumnType = typename decltype(traits)::ColumnType; + using ValueType = std::invoke_result_t< + decltype(decltype(traits)::GETTER), + Nemea::UnirecRecordView&, + ur_field_type_t>; + auto* col = dynamic_cast(&column); + if (value) { + col->Append(std::get(*value)); + } + }; + }); + + return columnwriter; +} + +static ColumnWriterFn makeNonArrColumnwriter(ColumnType type) +{ + ColumnWriterFn columnwriter; + + visitNonArr(type, [&](auto traits) { + columnwriter = [](ValueVariant* value, clickhouse::Column& column) { + using ColumnType = clickhouse::ColumnNullableT; + using ValueType = std::invoke_result_t< + decltype(decltype(traits)::GETTER), + Nemea::UnirecRecordView&, + ur_field_type_t>; + auto* col = dynamic_cast(&column); + if (!value) { + col->Append(std::nullopt); + } else { + col->Append(std::get(*value)); + } + }; + }); + + return columnwriter; +} + +ColumnWriterFn makeColumnwriter(ColumnType type) +{ + if (isArr(type)) { + return makeArrColumnwriter(type); + } + return makeNonArrColumnwriter(type); +} + +std::string typeToClickhouse(ColumnType type) +{ + std::string result; + visit(type, [&](auto traits) { result = traits.CLICKHOUSE_TYPE_NAME; }); + return result; +} diff --git a/modules/clickhouse/src/datatype.hpp b/modules/clickhouse/src/datatype.hpp new file mode 100644 index 00000000..7ec39f71 --- /dev/null +++ b/modules/clickhouse/src/datatype.hpp @@ -0,0 +1,150 @@ +/** + * @file datatype.hpp + * @author Daniel Pelanek + * @brief Functions specific to column data + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "clickhouse.hpp" +#include "config.hpp" + +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief All possible types of parsed values sent into clickhouse. + * + */ +using ValueVariant = std::variant< + int8_t, + std::vector, + int16_t, + std::vector, + int32_t, + std::vector, + int64_t, + std::vector, + uint8_t, + std::vector, + uint16_t, + std::vector, + uint32_t, + std::vector, + uint64_t, + std::vector, + float, + std::vector, + double, + std::vector, + std::vector, + std::vector>, + in6_addr, + std::vector, + std::string>; + +/** + * @brief Lambda for converting unirec column data to clickhouse column. + * + */ +using GetterFn = std::function< + void(Nemea::UnirecRecordView& record, ur_field_id_t fieldID, ValueVariant& value)>; + +/** + * @brief Lambda for writing value into clickhouse column. + * + */ +using ColumnWriterFn = std::function; + +/** + * @brief Lambda for creating clickhouse columns. + * + */ +using ColumnFactoryFn = std::function()>; + +/** + * @brief Column specification. + * + * Contains: + * type, name, unirec field id. + * Helper lambdas for creating, loading data, writing to clickhouse + * value when loaded. + * + */ +struct ColumnCtx { + std::string name; ///< Column name + ColumnType type; ///< Column type + ur_field_id_t fieldID; ///< unirec template field id + + ColumnFactoryFn columnFactory = nullptr; ///< lambda for creating columns + GetterFn getter = nullptr; ///< lambda for converting unirec data to clickhouse column + ColumnWriterFn columnWriter = nullptr; ///< lambda for writinng column value + + bool hasValue = false; ///< If a value was stored to column + ValueVariant valueBuffer; ///< Stored value +}; + +/** + * @brief Sent block through inserter; + * + */ +struct BlockCtx { + /** + * @brief Vector of column data to be inserted into ClickHouse. + */ + std::vector> columns; + + /** + * @brief ClickHouse block structure used for insertion. + */ + clickhouse::Block block; + + /** + * @brief Number of rows in the block. + */ + unsigned int rows; +}; + +/** + * @brief for clickhouse. 9 is nanoseconds. + */ +const int g_TIME_PRECISION = 9; + +/** + * @brief Make a ClickHouse column that is able to store values of the supplied data type + * + * @param type The data type + * @return The ClickHouse column object + */ +std::shared_ptr makeColumn(ColumnType type); + +/** + * @brief Makes a function (lambda) which converts unirec column data into clickhouse column + * + * @param type The data type + * @return The Getter function + */ +GetterFn makeGetter(ColumnType type); + +/** + * @brief Converts Columntype into clickhouse string specification of column + * + * @param type The data type + * @return The ClickHouse column name + */ +ColumnWriterFn makeColumnwriter(ColumnType type); + +/** + * @brief Converts Columntype into clickhouse string specification of column + * + * @param type The data type + * @return The ClickHouse column name + */ +std::string typeToClickhouse(ColumnType type); diff --git a/modules/clickhouse/src/inserter.cpp b/modules/clickhouse/src/inserter.cpp new file mode 100644 index 00000000..3247ef82 --- /dev/null +++ b/modules/clickhouse/src/inserter.cpp @@ -0,0 +1,274 @@ +/** + * @file inserter.cpp + * @author Daniel Pelanek + * @brief Defines inserter methods and helper functions. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "inserter.hpp" + +#include +#include + +static constexpr int g_ERR_TABLE_NOT_FOUND = 60; + +/** + * @brief Clickhouse column description + * + */ +struct ColumnDescription { + std::string name; // clickhouse column name + std::string type; // clickhouse data type as string +}; + +/** + * @brief Extracts all column descriptions from a clickhouse block + * + * @param block + * @return std::vector + */ +static std::vector extractBlockDescription(const clickhouse::Block& block) +{ + std::vector columnDescriptions; + + const std::size_t rowCount = block.GetRowCount(); + + if (block.GetColumnCount() < 2 || rowCount == 0) { + return columnDescriptions; + } + + const uint8_t columnNameIndex = 0; + const uint8_t columnTypeIndex = 1; + + const auto& nameColumns = block[columnNameIndex]->As(); + const auto& typeColumns = block[columnTypeIndex]->As(); + + columnDescriptions.reserve(block.GetRowCount()); + + for (std::size_t rowIndex = 0; rowIndex < rowCount; rowIndex++) { + const ColumnDescription columnDescription { + std::string(nameColumns->At(rowIndex)), + std::string(typeColumns->At(rowIndex))}; + columnDescriptions.emplace_back(columnDescription); + } + + return columnDescriptions; +} + +static std::vector +selectTableDescription(clickhouse::Client& client, const std::string& table) +{ + std::vector columnDescriptions; + const auto selectCallback = [&](const clickhouse::Block& block) { + auto partial = extractBlockDescription(block); + columnDescriptions.insert( + columnDescriptions.end(), + std::make_move_iterator(partial.begin()), + std::make_move_iterator(partial.end())); + }; + + const std::string query = "DESCRIBE TABLE " + table; + + client.Select(query, selectCallback); + + return columnDescriptions; +} + +/** + * @brief Describes table predefined in clickhouse database. + * + * @param client + * @param table name from config + * @return std::vector> + */ +static std::vector +describeTable(clickhouse::Client& client, const std::string& tableName) +{ + std::vector description; + + try { + return selectTableDescription(client, tableName); + + } catch (const clickhouse::ServerException& exc) { + if (exc.GetCode() == g_ERR_TABLE_NOT_FOUND) { + std::stringstream sstream; + sstream << "Table " << tableName << " does not exist."; + throw std::runtime_error(sstream.str()); + } + throw; + } + + return description; +} + +/** + * @brief Compares clickhouse schema to the one defined in config. + * + * @param client + * @param table name from config + * @param columns initialized based on config + */ +static void ensureSchema( + clickhouse::Client& client, + const std::string& table, + const std::vector& columns) +{ + // Load clickhouse columns + auto dbColumns = describeTable(client, table); + + auto schemaHint = [&]() { + std::stringstream sstream; + sstream << "hint:\n"; + sstream << "CREATE TABLE " << table << "(\n"; + size_t columnIndex = 0; + for (const auto& column : columns) { + const auto& clickhouseType = typeToClickhouse(columns[columnIndex].type); + sstream << " \"" << column.name << "\" " << clickhouseType + << (columnIndex < columns.size() - 1 ? "," : "") << '\n'; + columnIndex++; + } + sstream << ");"; + return sstream.str(); + }; + + if (columns.size() != dbColumns.size()) { + std::stringstream sstream; + sstream << "Config has " << columns.size() << " columns but table \"" << table << "\" has " + << dbColumns.size() << "\n" + << schemaHint(); + throw std::runtime_error(sstream.str()); + } + + for (size_t i = 0; i < dbColumns.size(); i++) { + const auto& expectedName = columns[i].name; + const auto& expectedType = typeToClickhouse(columns[i].type); + const auto& [actual_name, actual_type] = dbColumns[i]; + + if (expectedName != actual_name) { + std::stringstream sstream; + sstream << "Expected column #" << i << " in table \"" << table << "\" to be named \"" + << expectedName << "\" but it is \"" << actual_name << "\"\n" + << schemaHint(); + throw std::runtime_error(sstream.str()); + } + + if (expectedType != actual_type) { + std::stringstream sstream; + sstream << "Expected column #" << i << " in table \"" << table << "\" to be of type \"" + << expectedType << "\" but it is \"" << actual_type << "\"\n" + << schemaHint(); + throw std::runtime_error(sstream.str()); + } + } +} + +Inserter::Inserter( + int inserterId, + std::shared_ptr logger, + clickhouse::ClientOptions clientOpts, + const std::vector& columns, + const std::string& table, + SyncQueue& filledBlocks, + SyncStack& emptyBlocks) + + : m_id(inserterId) + , m_logger(std::move(logger)) + , m_client_opts(std::move(clientOpts)) + , m_columns(columns) + , m_table(table) + , m_filled_blocks(filledBlocks) + , m_empty_blocks(emptyBlocks) +{ +} + +void Inserter::start() +{ + m_thread = std::thread([this]() { + try { + run(); + } catch (...) { + m_exception = std::current_exception(); + m_errored = true; + } + }); +} + +void Inserter::insert(clickhouse::Block& block) +{ + bool needsReconnect = false; + while (!m_stop_signal) { + try { + if (needsReconnect) { + m_client->ResetConnectionEndpoint(); + ensureSchema(*m_client, m_table, m_columns); + m_logger->warn( + "[Worker {}}] Connected to {}:{} due to error with previous endpoint", + m_id, + m_client->GetCurrentEndpoint()->host.c_str(), + m_client->GetCurrentEndpoint()->port); + } + + m_client->Insert(m_table, block); + break; + + } catch (const std::exception& ex) { + m_logger->error( + "[Worker {}] Insert failed: {} - retrying in 1 second", + m_id, + ex.what()); + needsReconnect = true; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +void Inserter::run() +{ + m_client = std::make_unique(m_client_opts); + ensureSchema(*m_client, m_table, m_columns); + auto endpoint = m_client->GetCurrentEndpoint(); + if (endpoint) { + m_logger + ->info("[Worker {}] Connected to {}:{}", m_id, endpoint->host.c_str(), endpoint->port); + } else { + m_logger->warn("[Worker {}] Connected, but endpoint is not available.", m_id); + } + + while (!m_stop_signal) { + BlockCtx* block = m_filled_blocks.get(); + if (block == nullptr) { + // we might get null as a way to get unblocked and process stop signal + continue; + } + + block->block.RefreshRowCount(); + insert(block->block); + + for (auto& column : block->columns) { + column->Clear(); + } + + block->rows = 0; + m_empty_blocks.put(block); + } +} + +void Inserter::stop() +{ + m_stop_signal = true; +} + +void Inserter::join() +{ + m_thread.join(); +} + +void Inserter::checkError() +{ + if (m_errored) { + std::rethrow_exception(m_exception); + } +} diff --git a/modules/clickhouse/src/inserter.hpp b/modules/clickhouse/src/inserter.hpp new file mode 100644 index 00000000..a9911b12 --- /dev/null +++ b/modules/clickhouse/src/inserter.hpp @@ -0,0 +1,112 @@ +/** + * @file inserter.hpp + * @author Daniel Pelanek + * @brief Declares Inserter class for clickhouse module, + * blocks and colums and helper structures for them. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "clickhouse.hpp" +#include "datatype.hpp" +#include "logger/logger.hpp" +#include "syncqueue.hpp" +#include "syncstack.hpp" + +#include +#include +#include + +/** + * @brief Noncopyable for inheritance. + * + */ +class Noncopyable { +public: + Noncopyable() = default; // Default constructor is fine + Noncopyable(const Noncopyable&) = delete; // Delete copy constructor + Noncopyable& operator=(const Noncopyable&) = delete; // Delete copy assignment operator +}; + +/** + * @brief Nonmoveable for inheritance. + * + */ +class Nonmoveable { +public: + Nonmoveable() = default; // Default constructor is fine + Nonmoveable(Nonmoveable&&) = delete; // Delete move constructor + Nonmoveable& operator=(Nonmoveable&&) = delete; // Delete move assignment operator +}; + +/** + * @brief A worker class responsible for inserting data into a ClickHouse table. + * + */ +class Inserter + : Nonmoveable + , Noncopyable { +public: + /** + * @brief Instantiate an inserter instance + * + * @param inserterId id + * @param logger The logger + * @param clientOpts The Clickhouse client options + * @param columns The column definitions + * @param table The table to insert the data into + * @param filledBlocks A queue of blocks ready to be sent + * @param emptyBlocks A queue of blocks that have been sent and are able to be reused + */ + Inserter( + int inserterId, + std::shared_ptr logger, + clickhouse::ClientOptions clientOpts, + const std::vector& columns, + const std::string& table, + SyncQueue& filledBlocks, + SyncStack& emptyBlocks); + + /** + * @brief Start the inserter thread + */ + void start(); + + /** + * @brief Stop the inserter thread + */ + void stop(); + + /** + * @brief Wait for the inserter thread to stop + */ + void join(); + + /** + * @brief Check if the inserter thread has encountered an error, and if so, rethrow the captured + * exception + */ + void checkError(); + +private: + int m_id; ///< unique thread or task identifier + std::shared_ptr m_logger; ///< logging utility reference + std::thread m_thread; ///< worker thread + std::atomic_bool m_stop_signal = false; ///< signals thread to stop + std::atomic_bool m_errored = false; ///< indicates if an error occurred + std::exception_ptr m_exception = nullptr; ///< stores exception thrown in thread + + clickhouse::ClientOptions m_client_opts; ///< ClickHouse client configuration + const std::vector& m_columns; ///< defines ClickHouse table schema + const std::string& m_table; ///< target ClickHouse table name + SyncQueue& m_filled_blocks; ///< queue of blocks ready to insert + SyncStack& m_empty_blocks; ///< stack of reusable empty blocks + + std::unique_ptr m_client; ///< ClickHouse client instance + + void connect(); ///< establishes connection to ClickHouse + void run(); ///< thread entry point + void insert(clickhouse::Block& block); ///< inserts a block into ClickHouse +}; diff --git a/modules/clickhouse/src/main.cpp b/modules/clickhouse/src/main.cpp new file mode 100644 index 00000000..6c9917b6 --- /dev/null +++ b/modules/clickhouse/src/main.cpp @@ -0,0 +1,175 @@ +/** + * @file main.cpp + * @author Daniel Pelanek + * @brief Clickhouse Module: resend flowdata to clickhouse + * + * This file contains the main function and supporting functions for the Unirec Clickhouse Module. + * This module takes Unirec records from a unidirectional interface, converts them to + * Clickhouse format buffers them and then sends them to the specified Clickhouse server in config. + * It utilizes the Unirec++ library for record handling, argparse for command-line argument parsing, + * rapidxml for config parsing and clickhouse cpp library. + * Ported from ipfixcol2 clickhouse plugin. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "clickhouse.hpp" +#include "config.hpp" +#include "logger/logger.hpp" +#include "manager.hpp" + +#include +#include +#include +#include +#include + +using namespace Nemea; + +static std::atomic g_stopFlag(false); + +static void signalHandler(int signum) +{ + auto logger = Nm::loggerGet("main"); + logger->info("Interrupt signal {} received", signum); + g_stopFlag.store(true); +} + +/** + * @brief Handle format change exception by adjusting the template and check template + * against the one defined in config. + * + * This function is called when a `FormatChangeException` is caught in the main loop. + * It adjusts the template in the input interface to handle the format change but in this + * case the program only continues if the template is the same as defined in config. Meaning + * it only continues if the template changes to the same one. + * + * @param interface input interface for Unirec communication. + * @param manager Manager instance which buffers and sends data to clickhouse. + */ +static void handleFormatChange(UnirecInputInterface& interface, Manager& manager) +{ + interface.changeTemplate(); + + ur_template_t* changedTemplate = interface.getTemplate(); + auto res = std::unique_ptr( + ur_template_string_delimiter(changedTemplate, ','), + &free); + + const Config cfg = manager.getConfig(); + + if (cfg.templateColumnCsv != res.get()) { + throw std::runtime_error( + "Template in input interface doesn't match template in configuration."); + } + + manager.updateFieldIDs(); +} + +/** + * @brief Process unirec record in manager and forward to + * + * @param interface input interface for Unirec communication. + * @param manager Manager instance which buffers and sends data to clickhouse. + */ +static void processNextRecord(UnirecInputInterface& interface, Manager& manager) +{ + std::optional unirecRecord = interface.receive(); + if (!unirecRecord) { + return; + } + + manager.processRecord(*unirecRecord); +} + +/** + * @brief Process Unirec records. + * + * The `processUnirecRecords` function continuously receives Unirec records through the provided + * input interface (`interface`). Each received record is processed, buffered and + * then sent to a clickhouse database. + * + * @param interface input interface for Unirec communication. + * @param manager Manager instance which buffers and sends data to clickhouse. + */ +static void processUnirecRecords(UnirecInputInterface& interface, Manager& manager) +{ + while (!g_stopFlag.load()) { + try { + processNextRecord(interface, manager); + } catch (FormatChangeException& ex) { + handleFormatChange(interface, manager); + } catch (EoFException& ex) { + break; + } catch (std::exception& ex) { + throw; + } + } +} + +int main(int argc, char** argv) +{ + argparse::ArgumentParser program("Unirec Clickhouse"); + + program.add_argument("-c", "--config") + .required() + .help("specify the xml config file. Format is in readme.") + .metavar("xml_file"); + + Unirec unirec({1, 0, "clickhouse", "Unirec clickhouse module"}); + + Nm::loggerInit(); + auto logger = Nm::loggerGet("main"); + + signal(SIGINT, signalHandler); + + try { + unirec.init(argc, argv); + } catch (HelpException& ex) { + std::cerr << program; + return EXIT_SUCCESS; + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + program.parse_args(argc, argv); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + Config config; + try { + config = parseConfig(program.get("--config")); + + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + std::unique_ptr manager; + try { + manager = std::make_unique(config); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + UnirecInputInterface interface = unirec.buildInputInterface(); + + processUnirecRecords(interface, *manager); + + logger->info("here"); + + manager->stop(); + + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/modules/clickhouse/src/manager.cpp b/modules/clickhouse/src/manager.cpp new file mode 100644 index 00000000..fb4037aa --- /dev/null +++ b/modules/clickhouse/src/manager.cpp @@ -0,0 +1,190 @@ +/** + * @file manager.cpp + * @author Daniel Pelanek + * @brief Defines manager methods and helper function. + * + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "manager.hpp" +#include "datatype.hpp" + +#include +#include +#include +#include + +/** + * @brief Initializes columns from config and corresponding lambdas for handling them. + * + * @param columns_cfg Columns from config + * @return std::vector Prepared columns for sending to clickhouse. + */ +static std::vector prepareColumns(const std::vector& columnsCfg) +{ + std::vector columns; + + for (const auto& columnCfg : columnsCfg) { + ColumnCtx column {}; + + column.name = columnCfg.name; + column.type = columnCfg.type; + column.fieldID = columnCfg.type; + + column.getter = makeGetter(columnCfg.type); + column.columnWriter = makeColumnwriter(columnCfg.type); + column.columnFactory = [=]() { return makeColumn(columnCfg.type); }; + + columns.emplace_back(std::move(column)); + } + + return columns; +} + +Manager::Manager(Config config) + : M_CONFIG(std::move(config)) +{ + m_columns = prepareColumns(M_CONFIG.columns); + + std::vector endpoints; + endpoints.reserve(M_CONFIG.connection.endpoints.size()); + std::transform( + M_CONFIG.connection.endpoints.begin(), + M_CONFIG.connection.endpoints.end(), + std::back_inserter(endpoints), + [](const Config::Endpoint& epCfg) { + return clickhouse::Endpoint {epCfg.host, epCfg.port}; + }); + + // Prepare blocks + m_logger->info("Preparing {} blocks", M_CONFIG.blocks); + for (unsigned int i = 0; i < M_CONFIG.blocks; i++) { + m_blocks.emplace_back(std::make_unique()); + BlockCtx& block = *m_blocks.back().get(); + for (const auto& column : m_columns) { + block.columns.emplace_back(column.columnFactory()); + block.block.AppendColumn(column.name, block.columns.back()); + } + m_empty_blocks.put(&block); + } + + // Prepare inserters + m_logger->info("Preparing {} inserter threads", M_CONFIG.inserterThreads); + for (unsigned int i = 0; i < M_CONFIG.inserterThreads; i++) { + auto clientOpts = clickhouse::ClientOptions() + .SetEndpoints(endpoints) + .SetUser(M_CONFIG.connection.user) + .SetPassword(M_CONFIG.connection.password) + .SetDefaultDatabase(M_CONFIG.connection.database); + + m_inserters.emplace_back(std::make_unique( + m_inserters.size() + 1, + m_logger, + clientOpts, + m_columns, + M_CONFIG.connection.table, + m_filled_blocks, + m_empty_blocks)); + } + + // Start inserter threads + m_logger->info("Starting inserter threads"); + for (auto& inserter : m_inserters) { + inserter->start(); + } + + m_logger->info("Clickhouse plugin is ready"); +} + +void Manager::processRecord(Nemea::UnirecRecordView& record) +{ + // Get new empty block if there is no current block + if (m_current_block == nullptr) { + m_current_block = m_empty_blocks.get(); + } + + for (ColumnCtx& ctx : m_columns) { + ctx.getter(record, ctx.fieldID, ctx.valueBuffer); + ctx.hasValue = true; + } + + for (size_t i = 0; i < m_columns.size(); i++) { + m_columns[i].columnWriter( + m_columns[i].hasValue ? &m_columns[i].valueBuffer : nullptr, + *m_current_block->columns[i].get()); + } + + m_current_block->rows++; + + std::time_t const now = std::time(nullptr); + if (m_start_time == 0) { + m_start_time = now; + m_last_insert_time = now; + m_last_stats_print_time = now; + } + + // Send the block for insertion if it is sufficiently full or a block hasn't been sent in a long + // enough time + if (m_current_block->rows >= M_CONFIG.blockInsertThreshold + || (uint64_t(now - m_last_insert_time) >= M_CONFIG.blockInsertMaxDelaySecs + && m_current_block->rows > 0)) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + m_last_insert_time = now; + } + + // Check for any exceptions was thrown by the inserter threads + for (auto& inserter : m_inserters) { + inserter->checkError(); + } +} + +void Manager::updateFieldIDs() +{ + // Export what's left in the last block + if ((m_current_block != nullptr) && m_current_block->rows > 0) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + } + + for (auto& column : m_columns) { + column.fieldID = static_cast(ur_get_id_by_name(column.name.c_str())); + + if (column.fieldID == UR_E_INVALID_NAME) { + printf("Invalid field name: %s\n", column.name.c_str()); + } + } + + m_logger->info("Updated field ids"); +} + +Config Manager::getConfig() const +{ + return this->M_CONFIG; +} + +void Manager::stop() +{ + // Export what's left in the last block + if ((m_current_block != nullptr) && m_current_block->rows > 0) { + m_filled_blocks.put(m_current_block); + m_current_block = nullptr; + } + + // Stop all the threads and wait for them to finish + m_logger->info("Sending stop signal to inserter threads..."); + for (auto& inserter : m_inserters) { + inserter->stop(); + } + for (const auto& inserter : m_inserters) { + (void) inserter; + // Wake up the inserter threads in case they are waiting on a .get() + m_filled_blocks.put(nullptr); + } + + m_logger->info("Waiting for inserter threads to finish..."); + for (auto& inserter : m_inserters) { + inserter->join(); + } +} diff --git a/modules/clickhouse/src/manager.hpp b/modules/clickhouse/src/manager.hpp new file mode 100644 index 00000000..1b3f7b6c --- /dev/null +++ b/modules/clickhouse/src/manager.hpp @@ -0,0 +1,80 @@ +/** + * @file manager.hpp + * @author Daniel Pelanek + * @brief Declares Manager class for clickhouse module + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include "clickhouse.hpp" +#include "datatype.hpp" +#include "inserter.hpp" + +#include + +/** + * @brief Converts Unirec records to clickhouse format, buffers them and + * sends them through inserters. + * + * This class should be instantiated only once. It owns blocks of columns and + * and inserters. It also owns synced data structures which keep track of currently + * filled blocks. It fills the blocks inside process_record and inserters take fully + * filled blocks through sync queue by themselves to send. + * + */ +class Manager + : Nonmoveable + , Noncopyable { +public: + /** + * @brief Instantiate the manager instance + * + * @param config Config instance parsed in main. + */ + explicit Manager(Config config); + + /** + * @brief Stop the plugin and wait till it is stopped (blocking). + */ + void stop(); + + /** + * @brief Takes unirec record, converts it to clickhouse format and stores it. + * Adds to filled blocks if a block was sufficiently filled or none were sent + * in a specified time frame (m_config.block_insert_max_delay_secs). + * + * @param record Unirec record view to parse + */ + void processRecord(Nemea::UnirecRecordView& record); + + /** + * @brief changes unirec ids of fields after getting template in main. + * + */ + void updateFieldIDs(); + + /** + * @brief Returns config specified by argument to program. + * + * @return Config + */ + Config getConfig() const; + +private: + const Config M_CONFIG; ///< application configuration + std::shared_ptr m_logger + = Nm::loggerGet("manager"); ///< logging utility reference + std::vector m_columns; ///< ClickHouse table schema definition + + BlockCtx* m_current_block = nullptr; ///< pointer to the currently filling block + std::vector> m_inserters; ///< inserter worker instances + std::vector> m_blocks; ///< owned memory blocks + SyncStack m_empty_blocks; ///< pool of empty blocks for reuse + SyncQueue m_filled_blocks; ///< queue of blocks ready for insertion + + std::time_t m_start_time = 0; ///< application start time + std::time_t m_last_stats_print_time = 0; ///< last stats print timestamp + std::time_t m_last_insert_time = 0; ///< last data insert timestamp +}; diff --git a/modules/clickhouse/src/syncqueue.hpp b/modules/clickhouse/src/syncqueue.hpp new file mode 100644 index 00000000..a9feddab --- /dev/null +++ b/modules/clickhouse/src/syncqueue.hpp @@ -0,0 +1,67 @@ +/** + * @file + * @author Michal Sedlak + * @brief SyncQueue class implementation + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include + +/** + * @brief A thread-safe queue + */ +template +class SyncQueue { +public: + /** + * @brief Put an item into the queue + * + * @param item The item + */ + void put(Item item) + { + std::lock_guard const lock(m_mutex); + m_items.push(item); + m_size = m_items.size(); + m_avail_cv.notify_all(); + } + + /** + * @brief Get an item from the queue, block and wait if there aren't any + * + * @return The item + */ + Item get() + { + std::unique_lock lock(m_mutex); + while (true) { + if (!m_items.empty()) { + auto item = m_items.front(); + m_items.pop(); + m_size = m_items.size(); + return item; + } + m_avail_cv.wait(lock); + } + } + + /** + * @brief Get the current size of the queue + * + * @return The number of items in the queue + */ + std::size_t size() const { return m_size; } + +private: + std::atomic_size_t m_size = 0; ///< current number of items in the queue + std::queue m_items; ///< underlying container for queued items + std::mutex m_mutex; ///< mutex for synchronizing access + std::condition_variable m_avail_cv; ///< signals availability of items +}; diff --git a/modules/clickhouse/src/syncstack.hpp b/modules/clickhouse/src/syncstack.hpp new file mode 100644 index 00000000..0f1c1552 --- /dev/null +++ b/modules/clickhouse/src/syncstack.hpp @@ -0,0 +1,67 @@ +/** + * @file + * @author Michal Sedlak + * @brief SyncStack class implementation + * @date 2024 + * + * Copyright(c) 2024 CESNET z.s.p.o. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include + +/** + * @brief A thread-safe stack + */ +template +class SyncStack { +public: + /** + * @brief Put an item into the queue + * + * @param item The item + */ + void put(Item item) + { + std::lock_guard const lock(m_mutex); + m_items.push(item); + m_size = m_items.size(); + m_avail_cv.notify_all(); + } + + /** + * @brief Get an item from the queue, block and wait if there aren't any + * + * @return The item + */ + Item get() + { + std::unique_lock lock(m_mutex); + while (true) { + if (!m_items.empty()) { + auto item = m_items.top(); + m_items.pop(); + m_size = m_items.size(); + return item; + } + m_avail_cv.wait(lock); + } + } + + /** + * @brief Get the current size of the queue + * + * @return The number of items in the queue + */ + std::size_t size() const { return m_size; } + +private: + std::atomic_size_t m_size = 0; ///< current number of items in the queue + std::stack m_items; ///< underlying container for stacked items + std::mutex m_mutex; ///< mutex for synchronizing access + std::condition_variable m_avail_cv; ///< signals availability of items +}; diff --git a/pkg/rpm/nemea-modules-ng.spec.in b/pkg/rpm/nemea-modules-ng.spec.in index bb06d98e..eb74a5f5 100644 --- a/pkg/rpm/nemea-modules-ng.spec.in +++ b/pkg/rpm/nemea-modules-ng.spec.in @@ -36,6 +36,7 @@ that make up the main components of the test environment. %files %license LICENSE +%{_bindir}/nemea/clickhouse %{_bindir}/nemea/listDetector %{_bindir}/nemea/sampler %{_bindir}/nemea/telemetry_stats