Skip to content

Commit

Permalink
MINIFICPP-2135 Add SSL support for Prometheus reporter
Browse files Browse the repository at this point in the history
Closes #1587
Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
lordgamez authored and szaszm committed Jul 20, 2023
1 parent 0cf3c8f commit 93bc729
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 37 deletions.
9 changes: 9 additions & 0 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ An agent identifier should also be defined to identify which agent the metric is

nifi.metrics.publisher.agent.identifier=Agent1

### Configure Prometheus metrics publisher with SSL

The communication between MiNiFi and Prometheus can be encrypted using SSL. This can be achieved by adding the SSL certificate path (a single file containing both the MiNiFi certificate and the MiNiFi SSL key) and optionally adding the root CA path if Prometheus uses a self-signed certificate, to the minifi.properties file. Here is an example with the SSL properties:

# in minifi.properties

nifi.metrics.publisher.PrometheusMetricsPublisher.certificate=/tmp/certs/prometheus-publisher/minifi-cpp.crt
nifi.metrics.publisher.PrometheusMetricsPublisher.ca.certificate=/tmp/certs/prometheus-publisher/root-ca.pem

## System Metrics

The following section defines the currently available metrics to be published by the MiNiFi C++ agent.
Expand Down
12 changes: 12 additions & 0 deletions docker/test/integration/cluster/ContainerStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c
network=self.network,
image_store=self.image_store,
command=command))
elif engine == "prometheus-ssl":
return self.containers.setdefault(container_name,
PrometheusContainer(feature_context=feature_context,
name=container_name,
vols=self.vols,
network=self.network,
image_store=self.image_store,
command=command,
ssl=True))
elif engine == "minifi-c2-server":
return self.containers.setdefault(container_name,
MinifiC2ServerContainer(feature_context=feature_context,
Expand Down Expand Up @@ -313,6 +322,9 @@ def set_ssl_context_properties_in_minifi(self):
def enable_prometheus_in_minifi(self):
self.minifi_options.enable_prometheus = True

def enable_prometheus_with_ssl_in_minifi(self):
self.minifi_options.enable_prometheus_with_ssl = True

def enable_sql_in_minifi(self):
self.minifi_options.enable_sql = True

Expand Down
3 changes: 3 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def set_ssl_context_properties_in_minifi(self):
def enable_prometheus_in_minifi(self):
self.container_store.enable_prometheus_in_minifi()

def enable_prometheus_with_ssl_in_minifi(self):
self.container_store.enable_prometheus_with_ssl_in_minifi()

def enable_sql_in_minifi(self):
self.container_store.enable_sql_in_minifi()

Expand Down
29 changes: 17 additions & 12 deletions docker/test/integration/cluster/containers/MinifiContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self):
self.enable_c2_with_ssl = False
self.enable_provenance = False
self.enable_prometheus = False
self.enable_prometheus_with_ssl = False
self.enable_sql = False
self.config_format = "json"
self.use_flow_config_from_url = False
Expand Down Expand Up @@ -118,20 +119,24 @@ def _create_properties(self):
if not self.options.enable_provenance:
f.write("nifi.provenance.repository.class.name=NoOpRepository\n")

if self.options.enable_prometheus or self.options.enable_log_metrics_publisher:
classes = []
if self.options.enable_prometheus:
f.write("nifi.metrics.publisher.agent.identifier=Agent1\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus\n")
classes.append("PrometheusMetricsPublisher")
metrics_publisher_classes = []
if self.options.enable_prometheus or self.options.enable_prometheus_with_ssl:
f.write("nifi.metrics.publisher.agent.identifier=Agent1\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus\n")
metrics_publisher_classes.append("PrometheusMetricsPublisher")

if self.options.enable_log_metrics_publisher:
f.write("nifi.metrics.publisher.LogMetricsPublisher.metrics=RepositoryMetrics\n")
f.write("nifi.metrics.publisher.LogMetricsPublisher.logging.interval=1s\n")
classes.append("LogMetricsPublisher")
if self.options.enable_prometheus_with_ssl:
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.certificate=/tmp/resources/minifi_merged_cert.crt\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.ca.certificate=/tmp/resources/root_ca.crt\n")

f.write("nifi.metrics.publisher.class=" + ",".join(classes) + "\n")
if self.options.enable_log_metrics_publisher:
f.write("nifi.metrics.publisher.LogMetricsPublisher.metrics=RepositoryMetrics\n")
f.write("nifi.metrics.publisher.LogMetricsPublisher.logging.interval=1s\n")
metrics_publisher_classes.append("LogMetricsPublisher")

if metrics_publisher_classes:
f.write("nifi.metrics.publisher.class=" + ",".join(metrics_publisher_classes) + "\n")

if self.options.use_flow_config_from_url:
f.write(f"nifi.c2.flow.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config?class=minifi-test-class\n")
Expand Down
57 changes: 49 additions & 8 deletions docker/test/integration/cluster/containers/PrometheusContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,42 @@
import os
import tempfile
import docker.types

from .Container import Container
from OpenSSL import crypto
from ssl_utils.SSL_cert_utils import make_cert_without_extended_usage


class PrometheusContainer(Container):
def __init__(self, feature_context, name, vols, network, image_store, command=None):
super().__init__(feature_context, name, 'prometheus', vols, network, image_store, command)
def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False):
engine = "prometheus-ssl" if ssl else "prometheus"
super().__init__(feature_context, name, engine, vols, network, image_store, command)
self.ssl = ssl
extra_ssl_settings = ""
if ssl:
prometheus_cert, prometheus_key = make_cert_without_extended_usage(f"prometheus-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key)

self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
self.root_ca_file.write(crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert))
self.root_ca_file.close()
os.chmod(self.root_ca_file.name, 0o644)

self.prometheus_cert_file = tempfile.NamedTemporaryFile(delete=False)
self.prometheus_cert_file.write(crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=prometheus_cert))
self.prometheus_cert_file.close()
os.chmod(self.prometheus_cert_file.name, 0o644)

self.prometheus_key_file = tempfile.NamedTemporaryFile(delete=False)
self.prometheus_key_file.write(crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=prometheus_key))
self.prometheus_key_file.close()
os.chmod(self.prometheus_key_file.name, 0o644)

extra_ssl_settings = """
scheme: https
tls_config:
ca_file: /etc/prometheus/certs/root-ca.pem
"""

prometheus_yml_content = """
global:
scrape_interval: 2s
Expand All @@ -30,7 +60,9 @@ def __init__(self, feature_context, name, vols, network, image_store, command=No
- job_name: "minifi"
static_configs:
- targets: ["minifi-cpp-flow-{feature_id}:9936"]
""".format(feature_id=self.feature_context.id)
{extra_ssl_settings}
""".format(feature_id=self.feature_context.id, extra_ssl_settings=extra_ssl_settings)

self.yaml_file = tempfile.NamedTemporaryFile(delete=False)
self.yaml_file.write(prometheus_yml_content.encode())
self.yaml_file.close()
Expand All @@ -45,15 +77,24 @@ def deploy(self):

logging.info('Creating and running Prometheus docker container...')

mounts = [docker.types.Mount(
type='bind',
source=self.yaml_file.name,
target='/etc/prometheus/prometheus.yml'
)]

if self.ssl:
mounts.append(docker.types.Mount(
type='bind',
source=self.root_ca_file.name,
target='/etc/prometheus/certs/root-ca.pem'
))

self.client.containers.run(
image="prom/prometheus:v2.35.0",
detach=True,
name=self.name,
network=self.network.name,
ports={'9090/tcp': 9090},
mounts=[docker.types.Mount(
type='bind',
source=self.yaml_file.name,
target='/etc/prometheus/prometheus.yml'
)],
mounts=mounts,
entrypoint=self.command)
17 changes: 13 additions & 4 deletions docker/test/integration/features/MiNiFi_integration_test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from pydoc import locate

from ssl_utils.SSL_cert_utils import make_ca, make_client_cert
from ssl_utils.SSL_cert_utils import make_ca, make_cert_without_extended_usage
from minifi.core.InputPort import InputPort

from cluster.DockerTestCluster import DockerTestCluster
Expand Down Expand Up @@ -53,9 +53,9 @@ def __init__(self, context, feature_id: str):
self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.feature_id), self.docker_directory_bindings.get_data_directories(self.feature_id))
self.root_ca_cert, self.root_ca_key = make_ca("root CA")

minifi_client_cert, minifi_client_key = make_client_cert(common_name=f"minifi-cpp-flow-{self.feature_id}",
ca_cert=self.root_ca_cert,
ca_key=self.root_ca_key)
minifi_client_cert, minifi_client_key = make_cert_without_extended_usage(common_name=f"minifi-cpp-flow-{self.feature_id}",
ca_cert=self.root_ca_cert,
ca_key=self.root_ca_key)
self.put_test_resource('root_ca.crt',
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
cert=self.root_ca_cert))
Expand All @@ -67,6 +67,12 @@ def __init__(self, context, feature_id: str):
OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
pkey=minifi_client_key))

self.put_test_resource('minifi_merged_cert.crt',
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
cert=minifi_client_cert)
+ OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
pkey=minifi_client_key))

def get_container_name_with_postfix(self, container_name: str):
return self.cluster.container_store.get_container_name_with_postfix(container_name)

Expand Down Expand Up @@ -347,6 +353,9 @@ def set_ssl_context_properties_in_minifi(self):
def enable_prometheus_in_minifi(self):
self.cluster.enable_prometheus_in_minifi()

def enable_prometheus_with_ssl_in_minifi(self):
self.cluster.enable_prometheus_with_ssl_in_minifi()

def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
self.cluster.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem)

Expand Down
16 changes: 16 additions & 0 deletions docker/test/integration/features/prometheus.feature
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ Feature: MiNiFi can publish metrics to Prometheus server
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds

Scenario: Published metrics are scraped by Prometheus server through SSL connection
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile1 processor is connected to the PutFile
And Prometheus with SSL is enabled in MiNiFi
And a Prometheus server is set up with SSL
When all instances start up
Then "RepositoryMetrics" is published to the Prometheus server in less than 60 seconds
And "QueueMetrics" is published to the Prometheus server in less than 60 seconds
And "GetFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "GetFile1" processor
And "PutFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "PutFile" processor
And "FlowInformation" is published to the Prometheus server in less than 60 seconds
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds

Scenario: Multiple GetFile metrics are reported by Prometheus
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
And a GetFile processor with the name "GetFile2" and the "Input Directory" property set to "/tmp/input"
Expand Down
10 changes: 10 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ def step_impl(context):
context.test.enable_log_metrics_publisher_in_minifi()


@given("Prometheus with SSL is enabled in MiNiFi")
def step_impl(context):
context.test.enable_prometheus_with_ssl_in_minifi()


# HTTP proxy setup
@given("the http proxy server is set up")
@given("a http proxy server is set up accordingly")
Expand Down Expand Up @@ -989,6 +994,11 @@ def step_impl(context):
context.test.acquire_container(context=context, name="prometheus", engine="prometheus")


@given("a Prometheus server is set up with SSL")
def step_impl(context):
context.test.acquire_container(context=context, name="prometheus", engine="prometheus-ssl")


@then("\"{metric_class}\" are published to the Prometheus server in less than {timeout_seconds:d} seconds")
@then("\"{metric_class}\" is published to the Prometheus server in less than {timeout_seconds:d} seconds")
def step_impl(context, metric_class, timeout_seconds):
Expand Down
4 changes: 4 additions & 0 deletions docker/test/integration/ssl_utils/SSL_cert_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def _make_cert(common_name, ca_cert, ca_key, extended_key_usage=None):
if extended_key_usage:
extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, extended_key_usage))

cert.add_extensions([
crypto.X509Extension(b"subjectAltName", False, b"DNS.1:" + common_name.encode())
])

cert.add_extensions(extensions)

cert.set_issuer(ca_cert.get_subject())
Expand Down
24 changes: 21 additions & 3 deletions extensions/prometheus/PrometheusExposerWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,27 @@

namespace org::apache::nifi::minifi::extensions::prometheus {

PrometheusExposerWrapper::PrometheusExposerWrapper(uint32_t port)
: exposer_(std::to_string(port)) {
logger_->log_info("Started Prometheus metrics publisher on port %" PRIu32, port);
PrometheusExposerWrapper::PrometheusExposerWrapper(const PrometheusExposerConfig& config)
: exposer_(parseExposerConfig(config)) {
logger_->log_info("Started Prometheus metrics publisher on port %" PRIu32 "%s", config.port, config.certificate ? " with TLS enabled" : "");
}

std::vector<std::string> PrometheusExposerWrapper::parseExposerConfig(const PrometheusExposerConfig& config) {
std::vector<std::string> result;
result.push_back("listening_ports");
if (config.certificate) {
result.push_back(std::to_string(config.port) + "s");
result.push_back("ssl_certificate");
result.push_back(*config.certificate);
} else {
result.push_back(std::to_string(config.port));
}

if (config.ca_certificate) {
result.push_back("ssl_ca_file");
result.push_back(*config.ca_certificate);
}
return result;
}

void PrometheusExposerWrapper::registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) {
Expand Down
13 changes: 12 additions & 1 deletion extensions/prometheus/PrometheusExposerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@
#pragma once

#include <memory>
#include <string>
#include <vector>

#include "MetricsExposer.h"
#include "prometheus/exposer.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "controllers/SSLContextService.h"

namespace org::apache::nifi::minifi::extensions::prometheus {

struct PrometheusExposerConfig {
uint32_t port;
std::optional<std::string> certificate;
std::optional<std::string> ca_certificate;
};

class PrometheusExposerWrapper : public MetricsExposer {
public:
explicit PrometheusExposerWrapper(uint32_t port);
explicit PrometheusExposerWrapper(const PrometheusExposerConfig& config);
void registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) override;
void removeMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) override;

private:
static std::vector<std::string> parseExposerConfig(const PrometheusExposerConfig& config);

::prometheus::Exposer exposer_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<PrometheusExposerWrapper>::getLogger()};
};
Expand Down
Loading

0 comments on commit 93bc729

Please sign in to comment.