Skip to content

Commit

Permalink
adding kafka opts - ssl_key_password & enable_ssl_certificate_verific…
Browse files Browse the repository at this point in the history
…ation ...
  • Loading branch information
scuzzilla committed Jun 4, 2024
1 parent 85612b1 commit 2ca3ee9
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 24 deletions.
20 changes: 16 additions & 4 deletions doc/CONFIG-KEYS
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ makes the whole line to be ignored by the interpreter, making it a comment.

KEY: iface
DESC: Defining the network interface receiving the gRPC data-stream. The logical name of the interface can be retrived using shell commads
like 'ip address'. No default value is set and a coherent value is mandatory.
like "ip address". No default value is set and a coherent value is mandatory.
DEFAULT: none

KEY: ipv4_socket_cisco
Expand Down Expand Up @@ -134,6 +134,13 @@ DESC: When set to true, the producer will ensure that messages are successf
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
DEFAULT: "true"

KEY: enable_ssl_certificate_verification
VALUES: ["true" or "false"]
DESC: This is valid only when security_protocol is set to "ssl" and it is enabling/disabling the OpenSSL's builtin broker (server)
certificate verification.
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
DEFAULT: "true"

KEY: log_level
VALUES: [value >= "0" and value <= "7"]
DESC: Logging level (syslog(3) levels)
Expand All @@ -148,23 +155,28 @@ DESC: Defines if the communication between the collector and the Kafka's br
DEFAULT: none

KEY: ssl_ca_location
DESC: This is valid only when security_protocol is set to "ssl" and it's including the file or
DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the file or
directory path to CA certificate(s) for verifying the broker's key
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
DEFAULT: none

KEY: ssl_certificate_location
DESC: This is valid only when security_protocol is set to "ssl" and it's including the path to
DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the path to
client's public key (PEM) used for authentication
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
DEFAULT: none

KEY: ssl_key_location
DESC: This is valid only when security_protocol is set to "ssl" and it's including the path to
DESC: This is valid and mandatory only when security_protocol is set to "ssl" and it's including the path to
client's private key (PEM) used for authentication.
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
DEFAULT: none

KEY: ssl_key_password
DESC: This is valid only when security_protocol is set to "ssl" and it is including the client's private key passphrase
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
DEFAULT: none

KEY: topic
DESC: Defines the Kafka's topic name where the processed gRPC messages are delivered. This is a mandatory option and
when the selected delivery method is "zmq" it's automatically set to "dummy_topic".
Expand Down
4 changes: 3 additions & 1 deletion doc/Changelog
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ The keys used are:
!: fixed/modified feature, -: deleted feature, +: new feature


current (main branch) -- 29-05-2024
current (main branch) -- 04-06-2024
+ Adding the ability to disable the checks related to socket binding to a particular device
+ Adding the ability to configure the Kafka option "enable.ssl.certificate.verification". Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
+ Adding the ability to configure the Kafka option "ssl.key.password". Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

v1.1.4 -- 05-12-2023
+ Adding automatic version number retrieval from the VERSION file
Expand Down
9 changes: 6 additions & 3 deletions src/dataDelivery/kafka_delivery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ KafkaDelivery::KafkaDelivery()
kafka_delivery_cfg_parameters.at("security_protocol");
this->ssl_key_location =
kafka_delivery_cfg_parameters.at("ssl_key_location");
this->ssl_key_password =
kafka_delivery_cfg_parameters.at("ssl_key_password");
this->ssl_certificate_location =
kafka_delivery_cfg_parameters.at("ssl_certificate_location");
this->ssl_ca_location =
kafka_delivery_cfg_parameters.at("ssl_ca_location");
this->log_level =
kafka_delivery_cfg_parameters.at("log_level");
this->enable_ssl_certificate_verification =
kafka_delivery_cfg_parameters.at("enable_ssl_certificate_verification");
this->log_level =
kafka_delivery_cfg_parameters.at("log_level");

set_kafka_properties(this->properties);
}
Expand All @@ -40,10 +42,11 @@ void KafkaDelivery::set_kafka_properties(kafka::Properties &properties)
properties.put("client.id", get_client_id());
properties.put("security.protocol", get_security_protocol());
properties.put("ssl.key.location", get_ssl_key_location());
properties.put("ssl.key.password", get_ssl_key_password());
properties.put("ssl.certificate.location", get_ssl_certificate_location());
properties.put("ssl.ca.location", get_ssl_ca_location());
properties.put("enable.ssl.certificate.verification", get_enable_ssl_certificate_verification());
properties.put("log_level", get_log_level());
properties.put("enable_ssl_certificate_verification", get_enable_ssl_certificate_verification());
}

bool KafkaDelivery::AsyncKafkaProducer(
Expand Down
9 changes: 6 additions & 3 deletions src/dataDelivery/kafka_delivery.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ class KafkaDelivery {
return security_protocol; };
std::string get_ssl_key_location() {
return ssl_key_location; };
std::string get_ssl_key_password() {
return ssl_key_password; };
std::string get_ssl_certificate_location() {
return ssl_certificate_location; };
std::string get_ssl_ca_location() {
return ssl_ca_location; };
std::string get_enable_ssl_certificate_verification() {
return enable_ssl_certificate_verification; };
std::string get_log_level() {
return log_level; };
const std::string get_enable_ssl_certificate_verification() {
return enable_ssl_certificate_verification; };
private:
kafka::Properties properties;
kafka::Topic topic;
Expand All @@ -56,10 +58,11 @@ class KafkaDelivery {
std::string client_id;
std::string security_protocol;
std::string ssl_key_location;
std::string ssl_key_password;
std::string ssl_certificate_location;
std::string ssl_ca_location;
std::string enable_ssl_certificate_verification;
std::string log_level;
const std::string enable_ssl_certificate_verification;
};

#endif
Expand Down
62 changes: 50 additions & 12 deletions src/utils/cfg_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path,

if (params.at("security_protocol").compare("ssl") == 0) {
bool ssl_key_location = kafka_params.exists("ssl_key_location");
bool ssl_key_password = kafka_params.exists("ssl_key_password");
bool ssl_certificate_location =
kafka_params.exists("ssl_certificate_location");
bool ssl_ca_location = kafka_params.exists("ssl_ca_location");
Expand All @@ -1001,33 +1002,25 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path,

if (ssl_key_location == true &&
ssl_certificate_location == true &&
ssl_ca_location == true &&
enable_ssl_certificate_verification == true) {
ssl_ca_location == true) {
libconfig::Setting &ssl_key_location =
kafka_params.lookup("ssl_key_location");
libconfig::Setting &ssl_certificate_location =
kafka_params.lookup("ssl_certificate_location");
libconfig::Setting &ssl_ca_location =
kafka_params.lookup("ssl_ca_location");
libconfig::Setting &enable_ssl_certificate_verification =
kafka_params.lookup("enable_ssl_certificate_verification");
try {
std::string ssl_key_location_s = ssl_key_location.c_str();
std::string ssl_certificate_location_s =
ssl_certificate_location.c_str();
std::string ssl_ca_location_s = ssl_ca_location.c_str();
std::string enable_ssl_certificate_verification_s =
enable_ssl_certificate_verification.c_str();
if (ssl_key_location_s.empty() == false &&
ssl_certificate_location_s.empty() == false &&
ssl_ca_location_s.empty() == false &&
enable_ssl_certificate_verification_s.empty() == false) {
ssl_ca_location_s.empty() == false) {
params.insert({"ssl_key_location", ssl_key_location_s});
params.insert({"ssl_certificate_location",
ssl_certificate_location_s});
params.insert({"ssl_ca_location", ssl_ca_location_s});
params.insert({"enable_ssl_certificate_verification",
enable_ssl_certificate_verification_s});
} else {
spdlog::get("multi-logger")->
error("[security_protocol] "
Expand All @@ -1042,14 +1035,59 @@ bool KafkaCfgHandler::lookup_kafka_parameters(const std::string &cfg_path,
} else {
spdlog::get("multi-logger")->
error("[security_protocol] configuration issue: "
"a valid security_protocol is mandatory");
"one or more mandatory ssl params are missing");
return false;
}

if (ssl_key_password == true) {
libconfig::Setting &ssl_key_password =
kafka_params.lookup("ssl_key_password");
try {
std::string ssl_key_password_s = ssl_key_password.c_str();
if (ssl_key_password_s.empty() == false) {
params.insert({"ssl_key_password", ssl_key_password_s});
} else {
spdlog::get("multi-logger")->
error("[security_protocol] "
"configuration issue: is invalid");
return false;
}
} catch (const libconfig::SettingTypeException &ste) {
spdlog::get("multi-logger")->error("[security_protocol] "
"configuration issue: {}", ste.what());
return false;
}
} else {
params.insert({"ssl_key_password", "NULL"});
}

if (enable_ssl_certificate_verification == true) {
libconfig::Setting &enable_ssl_certificate_verification =
kafka_params.lookup("enable_ssl_certificate_verification");
try {
std::string enable_ssl_certificate_verification_s =
enable_ssl_certificate_verification.c_str();
if (enable_ssl_certificate_verification_s.empty() == false) {
params.insert({"enable_ssl_certificate_verification",
enable_ssl_certificate_verification_s});
} else {
spdlog::get("multi-logger")->
error("[security_protocol] "
"configuration issue: is invalid");
return false;
}
} catch (const libconfig::SettingTypeException &ste) {
spdlog::get("multi-logger")->error("[security_protocol] "
"configuration issue: {}", ste.what());
return false;
}
} else {
params.insert({"enable_ssl_certificate_verification", "NULL"});
}
} else {
params.insert({"ssl_key_location", "NULL"});
params.insert({"ssl_certificate_location", "NULL"});
params.insert({"ssl_ca_location", "NULL"});
params.insert({"enable_ssl_certificate_verification", "NULL"});
}

return true;
Expand Down
7 changes: 6 additions & 1 deletion src/utils/cfg_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,14 @@ class KafkaCfgHandler {
// return security_protocol; };
//const std::string &get_kafka_ssl_key_location() const {
// return ssl_key_location; };
//const std::string &get_kafka_ssl_key_password() const {
// return ssl_key_password; };
//const std::string &get_kafka_ssl_certificate_location() const {
// return ssl_certificate_location; };
//const std::string &get_kafka_ssl_ca_location() const {
// return ssl_ca_location; };
//const std::string &get_enable_ssl_certificate_verification() const {
// return enable_ssl_certificate_verification; };
//const std::string &get_kafka_log_level() const {
// return log_level; };
private:
Expand All @@ -219,10 +223,11 @@ class KafkaCfgHandler {
const std::string client_id;
const std::string security_protocol;
const std::string ssl_key_location;
const std::string ssl_key_password;
const std::string ssl_certificate_location;
const std::string ssl_ca_location;
const std::string log_level;
const std::string enable_ssl_certificate_verification;
const std::string log_level;
};

// ZMQ configuration parameters
Expand Down

0 comments on commit 2ca3ee9

Please sign in to comment.