Skip to content

Commit

Permalink
MINIFICPP-2135 Add SSL support for Prometheus reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jun 14, 2023
1 parent bbfa1e0 commit 01931c9
Show file tree
Hide file tree
Showing 16 changed files with 234 additions and 29 deletions.
9 changes: 9 additions & 0 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ An agent identifier should also be defined to identify which agent the metric is

nifi.metrics.publisher.agent.identifier=Agent1

### Configure Prometheus metrics publisher with SSL

The communication between MiNiFi and the Prometheus server can be encrypted using SSL. This can be achieved by adding the SSL certificate path (a single file containing both the certificate and the SSL key) and optionally adding the root CA path when using a self signed certificate to the minifi.properties file. Here is an example with the SSL properties:

# in minifi.properties

nifi.metrics.publisher.PrometheusMetricsPublisher.certificate=/tmp/certs/prometheus-publisher/minifi-cpp.crt
nifi.metrics.publisher.PrometheusMetricsPublisher.ca.certificate=/tmp/certs/prometheus-publisher/root-ca.pem

## System Metrics

The following section defines the currently available metrics to be published by the MiNiFi C++ agent.
Expand Down
12 changes: 12 additions & 0 deletions docker/test/integration/cluster/ContainerStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c
network=self.network,
image_store=self.image_store,
command=command))
elif engine == "prometheus-ssl":
return self.containers.setdefault(container_name,
PrometheusContainer(feature_context=feature_context,
name=container_name,
vols=self.vols,
network=self.network,
image_store=self.image_store,
command=command,
ssl=True))
elif engine == "minifi-c2-server":
return self.containers.setdefault(container_name,
MinifiC2ServerContainer(feature_context=feature_context,
Expand Down Expand Up @@ -313,6 +322,9 @@ def set_ssl_context_properties_in_minifi(self):
def enable_prometheus_in_minifi(self):
self.minifi_options.enable_prometheus = True

def enable_prometheus_with_ssl_in_minifi(self):
self.minifi_options.enable_prometheus_with_ssl = True

def enable_sql_in_minifi(self):
self.minifi_options.enable_sql = True

Expand Down
6 changes: 6 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def set_ssl_context_properties_in_minifi(self):
def enable_prometheus_in_minifi(self):
self.container_store.enable_prometheus_in_minifi()

def enable_prometheus_with_ssl_in_minifi(self):
self.container_store.enable_prometheus_with_ssl_in_minifi()

def enable_sql_in_minifi(self):
self.container_store.enable_sql_in_minifi()

Expand Down Expand Up @@ -334,3 +337,6 @@ def check_connection_size_through_controller(self, connection: str, size: int, m
def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str) -> bool:
manifest = self.minifi_controller_executor.get_manifest(container_name)
return '"agentManifest": {' in manifest and '"componentManifest": {' in manifest and '"agentType": "cpp"' in manifest

def enable_ssl_in_prometheus_checker(self):
self.prometheus_checker.enable_ssl()
15 changes: 12 additions & 3 deletions docker/test/integration/cluster/checkers/PrometheusChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@

class PrometheusChecker:
def __init__(self):
self.prometheus_client = PrometheusConnect(url="http://localhost:9090", disable_ssl=True)
self.use_ssl = False

def enable_ssl(self):
self.use_ssl = True

def _getClient(self):
if self.use_ssl:
return PrometheusConnect(url="https://localhost:9090", disable_ssl=True)
else:
return PrometheusConnect(url="http://localhost:9090", disable_ssl=True)

def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
return wait_for(lambda: self.verify_metric_class(metric_class), timeout_seconds)
Expand Down Expand Up @@ -96,14 +105,14 @@ def verify_agent_status_metrics(self):
def verify_metric_exists(self, metric_name, metric_class, labels={}):
labels['metric_class'] = metric_class
labels['agent_identifier'] = "Agent1"
return len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)) > 0
return len(self._getClient().get_current_metric_value(metric_name=metric_name, label_config=labels)) > 0

def verify_metrics_exist(self, metric_names, metric_class, labels={}):
return all((self.verify_metric_exists(metric_name, metric_class, labels) for metric_name in metric_names))

def verify_metric_larger_than_zero(self, metric_name, metric_class, labels={}):
labels['metric_class'] = metric_class
result = self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)
result = self._getClient().get_current_metric_value(metric_name=metric_name, label_config=labels)
return len(result) > 0 and int(result[0]['value'][1]) > 0

def verify_metrics_larger_than_zero(self, metric_names, metric_class, labels={}):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self):
self.enable_c2_with_ssl = False
self.enable_provenance = False
self.enable_prometheus = False
self.enable_prometheus_with_ssl = False
self.enable_sql = False
self.config_format = "json"
self.use_flow_config_from_url = False
Expand Down Expand Up @@ -117,12 +118,16 @@ def _create_properties(self):
if not self.options.enable_provenance:
f.write("nifi.provenance.repository.class.name=NoOpRepository\n")

if self.options.enable_prometheus:
if self.options.enable_prometheus or self.options.enable_prometheus_with_ssl:
f.write("nifi.metrics.publisher.agent.identifier=Agent1\n")
f.write("nifi.metrics.publisher.class=PrometheusMetricsPublisher\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936\n")
f.write("nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus\n")

if self.options.enable_prometheus_with_ssl:
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.certificate=/tmp/resources/minifi_merged_cert.crt\n")
f.write("nifi.metrics.publisher.PrometheusMetricsPublisher.ca.certificate=/tmp/resources/root_ca.crt\n")

if self.options.use_flow_config_from_url:
f.write(f"nifi.c2.flow.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config?class=minifi-test-class\n")

Expand Down
92 changes: 84 additions & 8 deletions docker/test/integration/cluster/containers/PrometheusContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,65 @@
import os
import tempfile
import docker.types

from .Container import Container
from OpenSSL import crypto
from ssl_utils.SSL_cert_utils import make_cert_without_extended_usage


class PrometheusContainer(Container):
def __init__(self, feature_context, name, vols, network, image_store, command=None):
super().__init__(feature_context, name, 'prometheus', vols, network, image_store, command)
prometheus_yml_content = """
def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False):
engine = "prometheus-ssl" if ssl else "prometheus"
super().__init__(feature_context, name, engine, vols, network, image_store, command)
self.ssl = ssl
if ssl:
prometheus_cert, prometheus_key = make_cert_without_extended_usage(f"prometheus-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key)

self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
self.root_ca_file.write(crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert))
self.root_ca_file.close()
os.chmod(self.root_ca_file.name, 0o644)

self.prometheus_cert_file = tempfile.NamedTemporaryFile(delete=False)
self.prometheus_cert_file.write(crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=prometheus_cert))
self.prometheus_cert_file.close()
os.chmod(self.prometheus_cert_file.name, 0o644)

self.prometheus_key_file = tempfile.NamedTemporaryFile(delete=False)
self.prometheus_key_file.write(crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=prometheus_key))
self.prometheus_key_file.close()
os.chmod(self.prometheus_key_file.name, 0o644)

prometheus_yml_content = """
global:
scrape_interval: 2s
evaluation_interval: 15s
scrape_configs:
- job_name: "minifi"
static_configs:
- targets: ["minifi-cpp-flow-{feature_id}:9936"]
scheme: https
tls_config:
ca_file: /etc/prometheus/certs/root-ca.pem
cert_file: /etc/prometheus/certs/prometheus.crt
key_file: /etc/prometheus/certs/prometheus.key
""".format(feature_id=self.feature_context.id)
self.yaml_file = tempfile.NamedTemporaryFile(delete=False)
self.yaml_file.write(prometheus_yml_content.encode())
self.yaml_file.close()
os.chmod(self.yaml_file.name, 0o644)

prometheus_web_config_content = """
tls_server_config:
cert_file: /etc/prometheus/certs/prometheus.crt
key_file: /etc/prometheus/certs/prometheus.key
"""
self.web_config_file = tempfile.NamedTemporaryFile(delete=False)
self.web_config_file.write(prometheus_web_config_content.encode())
self.web_config_file.close()
os.chmod(self.web_config_file.name, 0o644)
else:
prometheus_yml_content = """
global:
scrape_interval: 2s
evaluation_interval: 15s
Expand All @@ -45,15 +97,39 @@ def deploy(self):

logging.info('Creating and running Prometheus docker container...')

mounts = [docker.types.Mount(
type='bind',
source=self.yaml_file.name,
target='/etc/prometheus/prometheus.yml'
)]

if self.ssl:
mounts.append(docker.types.Mount(
type='bind',
source=self.web_config_file.name,
target='/etc/prometheus/web-config.yml'
))
mounts.append(docker.types.Mount(
type='bind',
source=self.root_ca_file.name,
target='/etc/prometheus/certs/root-ca.pem'
))
mounts.append(docker.types.Mount(
type='bind',
source=self.prometheus_cert_file.name,
target='/etc/prometheus/certs/prometheus.crt'
))
mounts.append(docker.types.Mount(
type='bind',
source=self.prometheus_key_file.name,
target='/etc/prometheus/certs/prometheus.key'
))

self.client.containers.run(
image="prom/prometheus:v2.35.0",
detach=True,
name=self.name,
network=self.network.name,
ports={'9090/tcp': 9090},
mounts=[docker.types.Mount(
type='bind',
source=self.yaml_file.name,
target='/etc/prometheus/prometheus.yml'
)],
mounts=mounts,
entrypoint=self.command)
20 changes: 16 additions & 4 deletions docker/test/integration/features/MiNiFi_integration_test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from pydoc import locate

from ssl_utils.SSL_cert_utils import make_ca, make_client_cert
from ssl_utils.SSL_cert_utils import make_ca, make_cert_without_extended_usage
from minifi.core.InputPort import InputPort

from cluster.DockerTestCluster import DockerTestCluster
Expand Down Expand Up @@ -53,9 +53,9 @@ def __init__(self, context, feature_id: str):
self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.feature_id), self.docker_directory_bindings.get_data_directories(self.feature_id))
self.root_ca_cert, self.root_ca_key = make_ca("root CA")

minifi_client_cert, minifi_client_key = make_client_cert(common_name=f"minifi-cpp-flow-{self.feature_id}",
ca_cert=self.root_ca_cert,
ca_key=self.root_ca_key)
minifi_client_cert, minifi_client_key = make_cert_without_extended_usage(common_name=f"minifi-cpp-flow-{self.feature_id}",
ca_cert=self.root_ca_cert,
ca_key=self.root_ca_key)
self.put_test_resource('root_ca.crt',
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
cert=self.root_ca_cert))
Expand All @@ -67,6 +67,12 @@ def __init__(self, context, feature_id: str):
OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
pkey=minifi_client_key))

self.put_test_resource('minifi_merged_cert.crt',
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
cert=minifi_client_cert)
+ OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
pkey=minifi_client_key))

def get_container_name_with_postfix(self, container_name: str):
return self.cluster.container_store.get_container_name_with_postfix(container_name)

Expand Down Expand Up @@ -347,6 +353,9 @@ def set_ssl_context_properties_in_minifi(self):
def enable_prometheus_in_minifi(self):
self.cluster.enable_prometheus_in_minifi()

def enable_prometheus_with_ssl_in_minifi(self):
self.cluster.enable_prometheus_with_ssl_in_minifi()

def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
self.cluster.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem)

Expand Down Expand Up @@ -388,3 +397,6 @@ def check_connection_size_through_controller(self, connection: str, size: int, m

def manifest_can_be_retrieved_through_minifi_controller(self, container_name: str):
assert self.cluster.manifest_can_be_retrieved_through_minifi_controller(container_name) or self.cluster.log_app_output()

def enable_ssl_in_prometheus_checker(self):
self.cluster.enable_ssl_in_prometheus_checker()
16 changes: 16 additions & 0 deletions docker/test/integration/features/prometheus.feature
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ Feature: MiNiFi can publish metrics to Prometheus server
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds

Scenario: Published metrics are scraped by Prometheus server through SSL connection
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile1 processor is connected to the PutFile
And Prometheus with SSL is enabled in MiNiFi
And a Prometheus server is set up with SSL
When all instances start up
Then "RepositoryMetrics" is published to the Prometheus server in less than 60 seconds
And "QueueMetrics" is published to the Prometheus server in less than 60 seconds
And "GetFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "GetFile1" processor
And "PutFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "PutFile" processor
And "FlowInformation" is published to the Prometheus server in less than 60 seconds
And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds
And "AgentStatus" is published to the Prometheus server in less than 60 seconds

Scenario: Multiple GetFile metrics are reported by Prometheus
Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input"
And a GetFile processor with the name "GetFile2" and the "Input Directory" property set to "/tmp/input"
Expand Down
13 changes: 13 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@ def step_impl(context):
context.test.enable_prometheus_in_minifi()


@given("Prometheus with SSL is enabled in MiNiFi")
def step_impl(context):
context.test.enable_prometheus_with_ssl_in_minifi()


# HTTP proxy setup
@given("the http proxy server is set up")
@given("a http proxy server is set up accordingly")
Expand Down Expand Up @@ -983,6 +988,14 @@ def step_impl(context):
context.test.acquire_container(context=context, name="prometheus", engine="prometheus")


@given("a Prometheus server is set up with SSL")
def step_impl(context):
context.test.enable_ssl_in_prometheus_checker()
context.test.acquire_container(context=context, name="prometheus", engine="prometheus-ssl",
command=['/bin/prometheus', '--web.config.file=/etc/prometheus/web-config.yml', '--config.file=/etc/prometheus/prometheus.yml', '--storage.tsdb.path=/prometheus',
'--web.console.libraries=/usr/share/prometheus/console_libraries', '--web.console.templates=/usr/share/prometheus/consoles', '--log.level=debug'])


@then("\"{metric_class}\" are published to the Prometheus server in less than {timeout_seconds:d} seconds")
@then("\"{metric_class}\" is published to the Prometheus server in less than {timeout_seconds:d} seconds")
def step_impl(context, metric_class, timeout_seconds):
Expand Down
4 changes: 4 additions & 0 deletions docker/test/integration/ssl_utils/SSL_cert_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def _make_cert(common_name, ca_cert, ca_key, extended_key_usage=None):
if extended_key_usage:
extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, extended_key_usage))

cert.add_extensions([
crypto.X509Extension(b"subjectAltName", False, b"DNS.1:" + common_name.encode())
])

cert.add_extensions(extensions)

cert.set_issuer(ca_cert.get_subject())
Expand Down
24 changes: 21 additions & 3 deletions extensions/prometheus/PrometheusExposerWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,27 @@

namespace org::apache::nifi::minifi::extensions::prometheus {

PrometheusExposerWrapper::PrometheusExposerWrapper(uint32_t port)
: exposer_(std::to_string(port)) {
logger_->log_info("Started Prometheus metrics publisher on port %" PRIu32, port);
PrometheusExposerWrapper::PrometheusExposerWrapper(const PrometheusExposerConfig& config)
: exposer_(parseExposerConfig(config)) {
logger_->log_info("Started Prometheus metrics publisher on port %" PRIu32, config.port);
}

std::vector<std::string> PrometheusExposerWrapper::parseExposerConfig(const PrometheusExposerConfig& config) {
std::vector<std::string> result;
result.push_back("listening_ports");
if (config.certificate) {
result.push_back(std::to_string(config.port) + "s");
result.push_back("ssl_certificate");
result.push_back(*config.certificate);
} else {
result.push_back(std::to_string(config.port));
}

if (config.ca_certificate) {
result.push_back("ssl_ca_file");
result.push_back(*config.ca_certificate);
}
return result;
}

void PrometheusExposerWrapper::registerMetric(const std::shared_ptr<PublishedMetricGaugeCollection>& metric) {
Expand Down
Loading

0 comments on commit 01931c9

Please sign in to comment.