From 2ca3ee925d8777e52f986b1d77bb43c020cddec4 Mon Sep 17 00:00:00 2001 From: Salvatore Cuzzilla Date: Tue, 4 Jun 2024 16:41:00 +0200 Subject: [PATCH] adding kafka opts - ssl_key_password & enable_ssl_certificate_verification ... --- doc/CONFIG-KEYS | 20 ++++++++-- doc/Changelog | 4 +- src/dataDelivery/kafka_delivery.cc | 9 +++-- src/dataDelivery/kafka_delivery.h | 9 +++-- src/utils/cfg_handler.cc | 62 ++++++++++++++++++++++++------ src/utils/cfg_handler.h | 7 +++- 6 files changed, 87 insertions(+), 24 deletions(-) diff --git a/doc/CONFIG-KEYS b/doc/CONFIG-KEYS index 790c56a..e3caf2e 100644 --- a/doc/CONFIG-KEYS +++ b/doc/CONFIG-KEYS @@ -8,7 +8,7 @@ makes the whole line to be ignored by the interpreter, making it a comment. KEY: iface DESC: Defining the network interface receiving the gRPC data-stream. The logical name of the interface can be retrived using shell commads - like 'ip address'. No default value is set and a coherent value is mandatory. + like "ip address". No default value is set and a coherent value is mandatory. DEFAULT: none KEY: ipv4_socket_cisco @@ -134,6 +134,13 @@ DESC: When set to true, the producer will ensure that messages are successf Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. DEFAULT: "true" +KEY: enable_ssl_certificate_verification +VALUES: ["true" or "false"] +DESC: This is valid only when security_protocol is set to "ssl" and it is enabling/disabling the OpenSSL's builtin broker (server) + certificate verification. + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. +DEFAULT: "true" + KEY: log_level VALUES: [value >= "0" and value <= "7"] DESC: Logging level (syslog(3) levels) @@ -148,23 +155,28 @@ DESC: Defines if the communication between the collector and the Kafka's br DEFAULT: none KEY: ssl_ca_location -DESC: This is valid only when security_protocol is set to "ssl" and it's including the file or +DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the file or directory path to CA certificate(s) for verifying the broker's key Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. DEFAULT: none KEY: ssl_certificate_location -DESC: This is valid only when security_protocol is set to "ssl" and it's including the path to +DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the path to client's public key (PEM) used for authentication Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. DEFAULT: none KEY: ssl_key_location -DESC: This is valid only when security_protocol is set to "ssl" and it's including the path to +DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the path to client's private key (PEM) used for authentication. Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. DEFAULT: none +KEY: ssl_key_password +DESC: This is valid only when security_protocol is set to "ssl" and it is including the client's private key passphrase + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. +DEFAULT: none + KEY: topic DESC: Defines the Kafka's topic name where the processed gRPC messages are delivered. This is a mandatory option and when the selected delivery method is "zmq" it's automatically set to "dummy_topic". diff --git a/doc/Changelog b/doc/Changelog index b67def2..02ddb50 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -7,8 +7,10 @@ The keys used are: !: fixed/modified feature, -: deleted feature, +: new feature -current (main branch) -- 29-05-2024 +current (main branch) -- 04-06-2024 + Adding the ability to disable the checks related to socket binding to a particular device + + Adding the ability to configure the Kafka option "enable.ssl.certificate.verification". Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + + Adding the ability to configure the Kafka option "ssl.key.password". Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md v1.1.4 -- 05-12-2023 + Adding automatic version number retrieval from the VERSION file diff --git a/src/dataDelivery/kafka_delivery.cc b/src/dataDelivery/kafka_delivery.cc index 9377635..140baff 100644 --- a/src/dataDelivery/kafka_delivery.cc +++ b/src/dataDelivery/kafka_delivery.cc @@ -21,14 +21,16 @@ KafkaDelivery::KafkaDelivery() kafka_delivery_cfg_parameters.at("security_protocol"); this->ssl_key_location = kafka_delivery_cfg_parameters.at("ssl_key_location"); + this->ssl_key_password = + kafka_delivery_cfg_parameters.at("ssl_key_password"); this->ssl_certificate_location = kafka_delivery_cfg_parameters.at("ssl_certificate_location"); this->ssl_ca_location = kafka_delivery_cfg_parameters.at("ssl_ca_location"); - this->log_level = - kafka_delivery_cfg_parameters.at("log_level"); this->enable_ssl_certificate_verification = kafka_delivery_cfg_parameters.at("enable_ssl_certificate_verification"); + this->log_level = + kafka_delivery_cfg_parameters.at("log_level"); set_kafka_properties(this->properties); } @@ -40,10 +42,11 @@ void KafkaDelivery::set_kafka_properties(kafka::Properties &properties) properties.put("client.id", get_client_id()); properties.put("security.protocol", get_security_protocol()); properties.put("ssl.key.location", get_ssl_key_location()); + properties.put("ssl.key.password", get_ssl_key_password()); properties.put("ssl.certificate.location", get_ssl_certificate_location()); properties.put("ssl.ca.location", get_ssl_ca_location()); + properties.put("enable.ssl.certificate.verification", get_enable_ssl_certificate_verification()); properties.put("log_level", get_log_level()); - properties.put("enable_ssl_certificate_verification", get_enable_ssl_certificate_verification()); } bool KafkaDelivery::AsyncKafkaProducer( diff --git a/src/dataDelivery/kafka_delivery.h b/src/dataDelivery/kafka_delivery.h index 512294f..2b3aed0 100644 --- a/src/dataDelivery/kafka_delivery.h +++ b/src/dataDelivery/kafka_delivery.h @@ -40,14 +40,16 @@ class KafkaDelivery { return security_protocol; }; std::string get_ssl_key_location() { return ssl_key_location; }; + std::string get_ssl_key_password() { + return ssl_key_password; }; std::string get_ssl_certificate_location() { return ssl_certificate_location; }; std::string get_ssl_ca_location() { return ssl_ca_location; }; + std::string get_enable_ssl_certificate_verification() { + return enable_ssl_certificate_verification; }; std::string get_log_level() { return log_level; }; - const std::string get_enable_ssl_certificate_verification() { - return enable_ssl_certificate_verification; }; private: kafka::Properties properties; kafka::Topic topic; @@ -56,10 +58,11 @@ class KafkaDelivery { std::string client_id; std::string security_protocol; std::string ssl_key_location; + std::string ssl_key_password; std::string ssl_certificate_location; std::string ssl_ca_location; + std::string enable_ssl_certificate_verification; std::string log_level; - const std::string enable_ssl_certificate_verification; }; #endif diff --git a/src/utils/cfg_handler.cc b/src/utils/cfg_handler.cc index 9d35e4e..79dc0b5 100644 --- a/src/utils/cfg_handler.cc +++ b/src/utils/cfg_handler.cc @@ -993,6 +993,7 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path, if (params.at("security_protocol").compare("ssl") == 0) { bool ssl_key_location = kafka_params.exists("ssl_key_location"); + bool ssl_key_password = kafka_params.exists("ssl_key_password"); bool ssl_certificate_location = kafka_params.exists("ssl_certificate_location"); bool ssl_ca_location = kafka_params.exists("ssl_ca_location"); @@ -1001,33 +1002,25 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path, if (ssl_key_location == true && ssl_certificate_location == true && - ssl_ca_location == true && - enable_ssl_certificate_verification == true) { + ssl_ca_location == true) { libconfig::Setting &ssl_key_location = kafka_params.lookup("ssl_key_location"); libconfig::Setting &ssl_certificate_location = kafka_params.lookup("ssl_certificate_location"); libconfig::Setting &ssl_ca_location = kafka_params.lookup("ssl_ca_location"); - libconfig::Setting &enable_ssl_certificate_verification = - kafka_params.lookup("enable_ssl_certificate_verification"); try { std::string ssl_key_location_s = ssl_key_location.c_str(); std::string ssl_certificate_location_s = ssl_certificate_location.c_str(); std::string ssl_ca_location_s = ssl_ca_location.c_str(); - std::string enable_ssl_certificate_verification_s = - enable_ssl_certificate_verification.c_str(); if (ssl_key_location_s.empty() == false && ssl_certificate_location_s.empty() == false && - ssl_ca_location_s.empty() == false && - enable_ssl_certificate_verification_s.empty() == false) { + ssl_ca_location_s.empty() == false) { params.insert({"ssl_key_location", ssl_key_location_s}); params.insert({"ssl_certificate_location", ssl_certificate_location_s}); params.insert({"ssl_ca_location", ssl_ca_location_s}); - params.insert({"enable_ssl_certificate_verification", - enable_ssl_certificate_verification_s}); } else { spdlog::get("multi-logger")-> error("[security_protocol] " @@ -1042,14 +1035,59 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path, } else { spdlog::get("multi-logger")-> error("[security_protocol] configuration issue: " - "a valid security_protocol is mandatory"); + "one or more mandatory ssl params are missing"); return false; } + + if (ssl_key_password == true) { + libconfig::Setting &ssl_key_password = + kafka_params.lookup("ssl_key_password"); + try { + std::string ssl_key_password_s = ssl_key_password.c_str(); + if (ssl_key_password_s.empty() == false) { + params.insert({"ssl_key_password", ssl_key_password_s}); + } else { + spdlog::get("multi-logger")-> + error("[security_protocol] " + "configuration issue: is invalid"); + return false; + } + } catch (const libconfig::SettingTypeException &ste) { + spdlog::get("multi-logger")->error("[security_protocol] " + "configuration issue: {}", ste.what()); + return false; + } + } else { + params.insert({"ssl_key_password", "NULL"}); + } + + if (enable_ssl_certificate_verification == true) { + libconfig::Setting &enable_ssl_certificate_verification = + kafka_params.lookup("enable_ssl_certificate_verification"); + try { + std::string enable_ssl_certificate_verification_s = + enable_ssl_certificate_verification.c_str(); + if (enable_ssl_certificate_verification_s.empty() == false) { + params.insert({"enable_ssl_certificate_verification", + enable_ssl_certificate_verification_s}); + } else { + spdlog::get("multi-logger")-> + error("[security_protocol] " + "configuration issue: is invalid"); + return false; + } + } catch (const libconfig::SettingTypeException &ste) { + spdlog::get("multi-logger")->error("[security_protocol] " + "configuration issue: {}", ste.what()); + return false; + } + } else { + params.insert({"enable_ssl_certificate_verification", "NULL"}); + } } else { params.insert({"ssl_key_location", "NULL"}); params.insert({"ssl_certificate_location", "NULL"}); params.insert({"ssl_ca_location", "NULL"}); - params.insert({"enable_ssl_certificate_verification", "NULL"}); } return true; diff --git a/src/utils/cfg_handler.h b/src/utils/cfg_handler.h index d3104bb..a69d893 100644 --- a/src/utils/cfg_handler.h +++ b/src/utils/cfg_handler.h @@ -206,10 +206,14 @@ class KafkaCfgHandler { // return security_protocol; }; //const std::string &get_kafka_ssl_key_location() const { // return ssl_key_location; }; + //const std::string &get_kafka_ssl_key_password() const { + // return ssl_key_password; }; //const std::string &get_kafka_ssl_certificate_location() const { // return ssl_certificate_location; }; //const std::string &get_kafka_ssl_ca_location() const { // return ssl_ca_location; }; + //const std::string &get_enable_ssl_certificate_verification() const { + // return enable_ssl_certificate_verification; }; //const std::string &get_kafka_log_level() const { // return log_level; }; private: @@ -219,10 +223,11 @@ class KafkaCfgHandler { const std::string client_id; const std::string security_protocol; const std::string ssl_key_location; + const std::string ssl_key_password; const std::string ssl_certificate_location; const std::string ssl_ca_location; - const std::string log_level; const std::string enable_ssl_certificate_verification; + const std::string log_level; }; // ZMQ configuration parameters