From 6d6c5a11dfcfafa28b6ef86643804ca53abc7e9c Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 23 Sep 2024 15:20:10 +0200 Subject: [PATCH] MINIFICPP-2460 Add tests for S2S tests using SSL --- .../integration/cluster/ContainerStore.py | 10 +- .../integration/cluster/DockerTestCluster.py | 3 + .../cluster/DockerTestDirectoryBindings.py | 121 +++++++++++++----- .../cluster/containers/MinifiContainer.py | 5 +- .../cluster/containers/NifiContainer.py | 61 ++++++--- .../test/integration/convert_cert_to_jks.sh | 45 +++++++ .../MiNiFi_integration_test_driver.py | 62 ++------- .../test/integration/features/environment.py | 7 +- docker/test/integration/features/s2s.feature | 78 ++++++++++- .../test/integration/features/steps/steps.py | 44 +++++-- .../test/integration/minifi/core/InputPort.py | 3 + .../minifi/core/RemoteProcessGroup.py | 4 + .../Minifi_flow_json_serializer.py | 3 +- .../Minifi_flow_yaml_serializer.py | 2 +- libminifi/src/RemoteProcessorGroupPort.cpp | 1 + libminifi/src/core/flow/FlowSchema.cpp | 2 +- 16 files changed, 325 insertions(+), 126 deletions(-) create mode 100755 docker/test/integration/convert_cert_to_jks.sh diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index f48646158e..f1e13a8948 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -17,6 +17,7 @@ from .containers.MinifiContainer import MinifiOptions from .containers.MinifiContainer import MinifiContainer from .containers.NifiContainer import NifiContainer +from .containers.NifiContainer import NiFiOptions from .containers.ZookeeperContainer import ZookeeperContainer from .containers.KafkaBrokerContainer import KafkaBrokerContainer from .containers.S3ServerContainer import S3ServerContainer @@ -52,6 +53,7 @@ def __init__(self, network, image_store, kubernetes_proxy, feature_id): self.network = network self.image_store = image_store self.kubernetes_proxy = kubernetes_proxy + self.nifi_options = NiFiOptions() def get_container_name_with_postfix(self, container_name: str): if not container_name.endswith(self.feature_id): @@ -83,13 +85,14 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c logging.info('Container name was not provided; using generated name \'%s\'', container_name) feature_context = FeatureContext(feature_id=context.feature_id, - root_ca_cert=context.test.root_ca_cert, - root_ca_key=context.test.root_ca_key) + root_ca_cert=context.root_ca_cert, + root_ca_key=context.root_ca_key) if engine == 'nifi': return self.containers.setdefault(container_name, NifiContainer(feature_context=feature_context, config_dir=self.data_directories["nifi_config_dir"], + options=self.nifi_options, name=container_name, vols=self.vols, network=self.network, @@ -405,3 +408,6 @@ def enable_ssl_in_grafana_loki(self): def enable_multi_tenancy_in_grafana_loki(self): self.grafana_loki_options.enable_multi_tenancy = True + + def enable_ssl_in_nifi(self): + self.nifi_options.use_ssl = True diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 430ec52f9b..153ffce265 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -418,3 +418,6 @@ def wait_for_lines_on_grafana_loki(self, lines: List[str], timeout_seconds: int, def set_value_on_plc_with_modbus(self, container_name, modbus_cmd): return self.modbus_checker.set_value_on_plc_with_modbus(container_name, modbus_cmd) + + def enable_ssl_in_nifi(self): + self.container_store.enable_ssl_in_nifi() diff --git a/docker/test/integration/cluster/DockerTestDirectoryBindings.py b/docker/test/integration/cluster/DockerTestDirectoryBindings.py index fec53dfb19..87cf2b4ec6 100644 --- a/docker/test/integration/cluster/DockerTestDirectoryBindings.py +++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py @@ -12,12 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - import logging import os import shutil import hashlib +import subprocess +import OpenSSL.crypto +from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert class DockerTestDirectoryBindings: @@ -59,39 +60,39 @@ def create_new_data_directories(self): shutil.copytree(test_dir + "/resources/minifi", self.data_directories[self.feature_id]["minifi_config_dir"], dirs_exist_ok=True) shutil.copytree(test_dir + "/resources/minifi-controller", self.data_directories[self.feature_id]["resources_dir"] + "/minifi-controller") - def get_data_directories(self, feature_id): - return self.data_directories[feature_id] + def get_data_directories(self): + return self.data_directories[self.feature_id] - def docker_path_to_local_path(self, feature_id, docker_path): + def docker_path_to_local_path(self, docker_path): # Docker paths are currently hard-coded if docker_path == "/tmp/input": - return self.data_directories[feature_id]["input_dir"] + return self.data_directories[self.feature_id]["input_dir"] if docker_path == "/tmp/output": - return self.data_directories[feature_id]["output_dir"] + return self.data_directories[self.feature_id]["output_dir"] if docker_path == "/tmp/resources": - return self.data_directories[feature_id]["resources_dir"] + return self.data_directories[self.feature_id]["resources_dir"] # Might be worth reworking these if docker_path == "/tmp/output/success": - self.create_directory(self.data_directories[feature_id]["output_dir"] + "/success") - return self.data_directories[feature_id]["output_dir"] + "/success" + self.create_directory(self.data_directories[self.feature_id]["output_dir"] + "/success") + return self.data_directories[self.feature_id]["output_dir"] + "/success" if docker_path == "/tmp/output/failure": - self.create_directory(self.data_directories[feature_id]["output_dir"] + "/failure") - return self.data_directories[feature_id]["output_dir"] + "/failure" + self.create_directory(self.data_directories[self.feature_id]["output_dir"] + "/failure") + return self.data_directories[self.feature_id]["output_dir"] + "/failure" raise Exception("Docker directory \"%s\" has no preset bindings." % docker_path) - def get_directory_bindings(self, feature_id): + def get_directory_bindings(self): """ Performs a standard container flow deployment with the addition of volumes supporting test input/output directories. """ vols = {} - vols[self.data_directories[feature_id]["input_dir"]] = {"bind": "/tmp/input", "mode": "rw"} - vols[self.data_directories[feature_id]["output_dir"]] = {"bind": "/tmp/output", "mode": "rw"} - vols[self.data_directories[feature_id]["resources_dir"]] = {"bind": "/tmp/resources", "mode": "rw"} - vols[self.data_directories[feature_id]["system_certs_dir"]] = {"bind": "/usr/local/share/certs", "mode": "rw"} - vols[self.data_directories[feature_id]["minifi_config_dir"]] = {"bind": "/tmp/minifi_config", "mode": "rw"} - vols[self.data_directories[feature_id]["nifi_config_dir"]] = {"bind": "/tmp/nifi_config", "mode": "rw"} - vols[self.data_directories[feature_id]["kubernetes_config_dir"]] = {"bind": "/tmp/kubernetes_config", "mode": "rw"} + vols[self.data_directories[self.feature_id]["input_dir"]] = {"bind": "/tmp/input", "mode": "rw"} + vols[self.data_directories[self.feature_id]["output_dir"]] = {"bind": "/tmp/output", "mode": "rw"} + vols[self.data_directories[self.feature_id]["resources_dir"]] = {"bind": "/tmp/resources", "mode": "rw"} + vols[self.data_directories[self.feature_id]["system_certs_dir"]] = {"bind": "/usr/local/share/certs", "mode": "rw"} + vols[self.data_directories[self.feature_id]["minifi_config_dir"]] = {"bind": "/tmp/minifi_config", "mode": "rw"} + vols[self.data_directories[self.feature_id]["nifi_config_dir"]] = {"bind": "/tmp/nifi_config", "mode": "rw"} + vols[self.data_directories[self.feature_id]["kubernetes_config_dir"]] = {"bind": "/tmp/kubernetes_config", "mode": "rw"} return vols @staticmethod @@ -120,21 +121,24 @@ def put_file_contents(file_abs_path, contents): test_input_file.write(contents) os.chmod(file_abs_path, 0o0777) - def put_test_resource(self, feature_id, file_name, contents): + def put_test_resource(self, file_name, contents): """ Creates a resource file in the test resource dir and writes the given content to it. """ - file_abs_path = os.path.join(self.data_directories[feature_id]["resources_dir"], file_name) + file_abs_path = os.path.join(self.data_directories[self.feature_id]["resources_dir"], file_name) self.put_file_contents(file_abs_path, contents) - def put_test_input(self, feature_id, file_name, contents): - file_abs_path = os.path.join(self.data_directories[feature_id]["input_dir"], file_name) + def get_test_resource_path(self, file_name): + return os.path.join(self.data_directories[self.feature_id]["resources_dir"], file_name) + + def put_test_input(self, file_name, contents): + file_abs_path = os.path.join(self.data_directories[self.feature_id]["input_dir"], file_name) self.put_file_contents(file_abs_path, contents) - def put_file_to_docker_path(self, feature_id, path, file_name, contents): - file_abs_path = os.path.join(self.docker_path_to_local_path(feature_id, path), file_name) + def put_file_to_docker_path(self, path, file_name, contents): + file_abs_path = os.path.join(self.docker_path_to_local_path(path), file_name) self.put_file_contents(file_abs_path, contents) @staticmethod @@ -146,14 +150,67 @@ def generate_md5_hash(file_path): return md5_hash.hexdigest() - def put_random_file_to_docker_path(self, test_id: str, path: str, file_name: str, file_size: int): - file_abs_path = os.path.join(self.docker_path_to_local_path(test_id, path), file_name) + def put_random_file_to_docker_path(self, path: str, file_name: str, file_size: int): + file_abs_path = os.path.join(self.docker_path_to_local_path(path), file_name) with open(file_abs_path, 'wb') as test_input_file: test_input_file.write(os.urandom(file_size)) os.chmod(file_abs_path, 0o0777) return self.generate_md5_hash(file_abs_path) - def rm_out_child(self, feature_id, dir): - child = os.path.join(self.data_directories[feature_id]["output_dir"], dir) - logging.info('Removing %s from output folder', child) - shutil.rmtree(child) + def create_cert_files(self): + self.root_ca_cert, self.root_ca_key = make_self_signed_cert("root CA") + + minifi_client_cert, minifi_client_key = make_cert_without_extended_usage(common_name=f"minifi-cpp-flow-{self.feature_id}", + ca_cert=self.root_ca_cert, + ca_key=self.root_ca_key) + minifi_server_cert, minifi_server_key = make_server_cert(common_name=f"server-{self.feature_id}", + ca_cert=self.root_ca_cert, + ca_key=self.root_ca_key) + self_signed_server_cert, self_signed_server_key = make_self_signed_cert(f"server-{self.feature_id}") + + self.put_test_resource('root_ca.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=self.root_ca_cert)) + self.put_test_resource("system_certs_dir/ca-root-nss.crt", + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=self.root_ca_cert)) + self.put_test_resource('minifi_client.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=minifi_client_cert)) + self.put_test_resource('minifi_client.key', + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=minifi_client_key)) + self.put_test_resource('minifi_server.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=minifi_server_cert) + + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=minifi_server_key)) + self.put_test_resource('self_signed_server.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=self_signed_server_cert) + + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=self_signed_server_key)) + self.put_test_resource('minifi_merged_cert.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=minifi_client_cert) + + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=minifi_client_key)) + nifi_client_cert, nifi_client_key = make_server_cert(common_name=f"nifi-{self.feature_id}", + ca_cert=self.root_ca_cert, + ca_key=self.root_ca_key) + self.put_test_resource('nifi_client.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=nifi_client_cert)) + self.put_test_resource('nifi_client.key', + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=nifi_client_key)) + base = os.path.dirname(self.get_test_resource_path('nifi_client.key')) + test_dir = os.environ['TEST_DIRECTORY'] # Based on DockerVerify.sh + cmd = [ + os.path.join(test_dir, "convert_cert_to_jks.sh"), + base, + os.path.join(base, "nifi_client.key"), + os.path.join(base, "nifi_client.crt"), + os.path.join(base, "root_ca.crt"), + ] + subprocess.run(cmd, check=True) diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index fee054f06a..770f42df02 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -116,9 +116,8 @@ def _create_properties(self): if self.options.set_ssl_context_properties: f.write("nifi.remote.input.secure=true\n") - f.write("nifi.security.client.certificate=/tmp/resources/minifi-cpp-flow.crt\n") - f.write("nifi.security.client.private.key=/tmp/resources/minifi-cpp-flow.key\n") - f.write("nifi.security.client.pass.phrase=abcdefgh\n") + f.write("nifi.security.client.certificate=/tmp/resources/minifi_client.crt\n") + f.write("nifi.security.client.private.key=/tmp/resources/minifi_client.key\n") f.write("nifi.security.client.ca.certificate=/tmp/resources/root_ca.crt\n") if not self.options.enable_provenance: diff --git a/docker/test/integration/cluster/containers/NifiContainer.py b/docker/test/integration/cluster/containers/NifiContainer.py index 20cbd72824..c3ae965cbd 100644 --- a/docker/test/integration/cluster/containers/NifiContainer.py +++ b/docker/test/integration/cluster/containers/NifiContainer.py @@ -22,30 +22,53 @@ import os +class NiFiOptions: + def __init__(self): + self.use_ssl = False + + class NifiContainer(FlowContainer): NIFI_VERSION = '2.0.0-M2' NIFI_ROOT = '/opt/nifi/nifi-' + NIFI_VERSION - def __init__(self, feature_context, config_dir, name, vols, network, image_store, command=None): + def __init__(self, feature_context, config_dir, options, name, vols, network, image_store, command=None): if not command: - entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.remote.input.secure\)=.*/\1=false/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.web.http.port\)=.*/\1=8080/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.web.https.port\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.web.https.host\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.web.http.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.keystore\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.keystoreType\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.keystorePasswd\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.keyPasswd\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.truststore\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.truststoreType\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.security.truststorePasswd\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " - r"sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10000/' {nifi_root}/conf/nifi.properties && " - r"cp /tmp/nifi_config/flow.json.gz {nifi_root}/conf && {nifi_root}/bin/nifi.sh run & " - r"nifi_pid=$! &&" - r"tail -F --pid=${{nifi_pid}} {nifi_root}/logs/nifi-app.log").format(name=name, nifi_root=NifiContainer.NIFI_ROOT) + if options.use_ssl: + entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.web.https.port\)=.*/\1=8443/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.web.https.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keystore\)=.*/\1=\/tmp\/resources\/keystore.jks/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keystoreType\)=.*/\1=jks/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keystorePasswd\)=.*/\1=passw0rd1!/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keyPasswd\)=.*/#\1=passw0rd1!/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.truststore\)=.*/\1=\/tmp\/resources\/truststore.jks/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.truststoreType\)=.*/\1=jks/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.truststorePasswd\)=.*/\1=passw0rd1!/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10443/' {nifi_root}/conf/nifi.properties && " + r"cp /tmp/nifi_config/flow.json.gz {nifi_root}/conf && {nifi_root}/bin/nifi.sh run & " + r"nifi_pid=$! &&" + r"tail -F --pid=${{nifi_pid}} {nifi_root}/logs/nifi-app.log").format(name=name, nifi_root=NifiContainer.NIFI_ROOT) + else: + entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.remote.input.secure\)=.*/\1=false/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.web.http.port\)=.*/\1=8080/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.web.https.port\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.web.https.host\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.web.http.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keystore\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keystoreType\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keystorePasswd\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.keyPasswd\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.truststore\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.truststoreType\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.security.truststorePasswd\)=.*/#\1=/' {nifi_root}/conf/nifi.properties && " + r"sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10000/' {nifi_root}/conf/nifi.properties && " + r"cp /tmp/nifi_config/flow.json.gz {nifi_root}/conf && {nifi_root}/bin/nifi.sh run & " + r"nifi_pid=$! &&" + r"tail -F --pid=${{nifi_pid}} {nifi_root}/logs/nifi-app.log").format(name=name, nifi_root=NifiContainer.NIFI_ROOT) command = ["/bin/sh", "-c", entry_command] super().__init__(feature_context, config_dir, name, 'nifi', vols, network, image_store, command) diff --git a/docker/test/integration/convert_cert_to_jks.sh b/docker/test/integration/convert_cert_to_jks.sh new file mode 100755 index 0000000000..33c37b677f --- /dev/null +++ b/docker/test/integration/convert_cert_to_jks.sh @@ -0,0 +1,45 @@ +#!/bin/bash +set -euo pipefail + +# Usage: ./create_jks.sh + +DIR=$1 +SSL_KEY_PATH=$2 +SSL_CERT_PATH=$3 +CA_CERT_PATH=$4 + +KEYSTORE="$DIR/keystore.jks" +TRUSTSTORE="$DIR/truststore.jks" +PKCS12_FILE="$DIR/keystore.p12" +PASSWORD="passw0rd1!" + +cat "${CA_CERT_PATH}" >> "${SSL_CERT_PATH}" + +if [ ! -d "$DIR" ]; then + mkdir -p "$DIR" +fi + +openssl pkcs12 -export \ + -inkey "$SSL_KEY_PATH" \ + -in "$SSL_CERT_PATH" \ + -name "nifi-key" \ + -out "$PKCS12_FILE" \ + -password pass:$PASSWORD + +keytool -importkeystore \ + -destkeystore "$KEYSTORE" \ + -deststoretype jks \ + -destalias nifi-key \ + -srckeystore "$PKCS12_FILE" \ + -srcstoretype pkcs12 \ + -srcalias "nifi-key" \ + -storepass "$PASSWORD" \ + -srcstorepass "$PASSWORD" \ + -noprompt + +keytool -importcert \ + -alias "nifi-cert" \ + -file "$CA_CERT_PATH" \ + -keystore "$TRUSTSTORE" \ + -storepass "$PASSWORD" \ + -noprompt diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 6ad052294b..2eef2f22db 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -12,22 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - import logging import time import uuid -from typing import List - -import OpenSSL.crypto +from typing import List from pydoc import locate - -from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert from minifi.core.InputPort import InputPort - from cluster.DockerTestCluster import DockerTestCluster - from minifi.validators.OutputValidator import OutputValidator from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator @@ -54,44 +46,7 @@ def __init__(self, context, feature_id: str): self.test_file_hash = None self.docker_directory_bindings = context.directory_bindings - self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.feature_id), self.docker_directory_bindings.get_data_directories(self.feature_id)) - self.root_ca_cert, self.root_ca_key = make_self_signed_cert("root CA") - - minifi_client_cert, minifi_client_key = make_cert_without_extended_usage(common_name=f"minifi-cpp-flow-{self.feature_id}", - ca_cert=self.root_ca_cert, - ca_key=self.root_ca_key) - minifi_server_cert, minifi_server_key = make_server_cert(common_name=f"server-{self.feature_id}", - ca_cert=self.root_ca_cert, - ca_key=self.root_ca_key) - self_signed_server_cert, self_signed_server_key = make_self_signed_cert(f"server-{self.feature_id}") - - self.put_test_resource('root_ca.crt', - OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, - cert=self.root_ca_cert)) - self.put_test_resource("system_certs_dir/ca-root-nss.crt", - OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, - cert=self.root_ca_cert)) - self.put_test_resource('minifi_client.crt', - OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, - cert=minifi_client_cert)) - self.put_test_resource('minifi_client.key', - OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, - pkey=minifi_client_key)) - self.put_test_resource('minifi_server.crt', - OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, - cert=minifi_server_cert) - + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, - pkey=minifi_server_key)) - self.put_test_resource('self_signed_server.crt', - OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, - cert=self_signed_server_cert) - + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, - pkey=self_signed_server_key)) - self.put_test_resource('minifi_merged_cert.crt', - OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, - cert=minifi_client_cert) - + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, - pkey=minifi_client_key)) + self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(), self.docker_directory_bindings.get_data_directories()) def get_container_name_with_postfix(self, container_name: str): return self.cluster.container_store.get_container_name_with_postfix(container_name) @@ -196,18 +151,18 @@ def add_test_data(self, path, test_data, file_name=None): if file_name is None: file_name = str(uuid.uuid4()) test_data = decode_escaped_str(test_data) - self.docker_directory_bindings.put_file_to_docker_path(self.feature_id, path, file_name, test_data.encode('utf-8')) + self.docker_directory_bindings.put_file_to_docker_path(path, file_name, test_data.encode('utf-8')) def add_random_test_data(self, path: str, size: int, file_name: str = None): if file_name is None: file_name = str(uuid.uuid4()) - self.test_file_hash = self.docker_directory_bindings.put_random_file_to_docker_path(self.feature_id, path, file_name, size) + self.test_file_hash = self.docker_directory_bindings.put_random_file_to_docker_path(path, file_name, size) def put_test_resource(self, file_name, contents): - self.docker_directory_bindings.put_test_resource(self.feature_id, file_name, contents) + self.docker_directory_bindings.put_test_resource(file_name, contents) - def rm_out_child(self): - self.docker_directory_bindings.rm_out_child(self.feature_id) + def get_test_resource_path(self, file_name): + return self.docker_directory_bindings.get_test_resource_path(file_name) def add_file_system_observer(self, file_system_observer): self.file_system_observer = file_system_observer @@ -482,3 +437,6 @@ def check_lines_on_grafana_loki(self, lines: List[str], timeout_seconds: int, ss def set_value_on_plc_with_modbus(self, container_name, modbus_cmd): assert self.cluster.set_value_on_plc_with_modbus(container_name, modbus_cmd) + + def enable_ssl_in_nifi(self): + self.cluster.enable_ssl_in_nifi() diff --git a/docker/test/integration/features/environment.py b/docker/test/integration/features/environment.py index 5483b1138d..d98d2f3f5c 100644 --- a/docker/test/integration/features/environment.py +++ b/docker/test/integration/features/environment.py @@ -85,11 +85,14 @@ def before_feature(context, feature): context.feature_id = feature_id context.directory_bindings = DockerTestDirectoryBindings(feature_id) context.directory_bindings.create_new_data_directories() + context.directory_bindings.create_cert_files() + context.root_ca_cert = context.directory_bindings.root_ca_cert + context.root_ca_key = context.directory_bindings.root_ca_key if "requires.kubernetes.cluster" in feature.tags: context.kubernetes_proxy = KubernetesProxy( - context.directory_bindings.get_data_directories(context.feature_id)["kubernetes_temp_dir"], + context.directory_bindings.get_data_directories()["kubernetes_temp_dir"], os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc')) - context.kubernetes_proxy.create_config(context.directory_bindings.get_directory_bindings(context.feature_id)) + context.kubernetes_proxy.create_config(context.directory_bindings.get_directory_bindings()) context.kubernetes_proxy.start_cluster() diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature index c236f2ac7b..80a8eb648c 100644 --- a/docker/test/integration/features/s2s.feature +++ b/docker/test/integration/features/s2s.feature @@ -28,7 +28,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi" And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup - And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080 + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow And the "success" relationship of the from-minifi is connected to the PutFile @@ -42,7 +42,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi" And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup - And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080 + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow And the "success" relationship of the from-minifi is connected to the PutFile @@ -56,7 +56,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol And a RemoteProcessGroup node opened on "http://nifi-${feature_id}:8080/nifi" And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup - And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080 + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow And the "success" relationship of the from-minifi is connected to the PutFile @@ -71,9 +71,79 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup And the connection going to the RemoteProcessGroup has "drop empty" set - And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" on port 8080 + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow And the "success" relationship of the from-minifi is connected to the PutFile When both instances start up Then no files are placed in the monitored directory in 50 seconds of running time + + Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And a file with the content "test" is present in "/tmp/input" + And a RemoteProcessGroup node opened on "https://nifi-${feature_id}:8443/nifi" + And a SSL context service is set up for the following remote process group: "RemoteProcessGroup" + And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup + + And SSL is enabled in NiFi flow + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow + And the "success" relationship of the from-minifi is connected to the PutFile + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + + Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL with YAML config + Given a MiNiFi CPP server with yaml config + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And a file with the content "test" is present in "/tmp/input" + And a RemoteProcessGroup node opened on "https://nifi-${feature_id}:8443/nifi" + And a SSL context service is set up for the following remote process group: "RemoteProcessGroup" + And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup + + And SSL is enabled in NiFi flow + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow + And the "success" relationship of the from-minifi is connected to the PutFile + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + + Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using SSL config defined in minifi.properties + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And a file with the content "test" is present in "/tmp/input" + And a RemoteProcessGroup node opened on "https://nifi-${feature_id}:8443/nifi" + And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup + And SSL properties are set in MiNiFi + + And SSL is enabled in NiFi flow + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow + And the "success" relationship of the from-minifi is connected to the PutFile + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds + + Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s using YAML config and SSL config defined in minifi.properties + Given a MiNiFi CPP server with yaml config + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And a file with the content "test" is present in "/tmp/input" + And a RemoteProcessGroup node opened on "https://nifi-${feature_id}:8443/nifi" + And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup + And SSL properties are set in MiNiFi + + And SSL is enabled in NiFi flow + And a NiFi flow receiving data from a RemoteProcessGroup "from-minifi" + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow + And the "success" relationship of the from-minifi is connected to the PutFile + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds + And the Minifi logs do not contain the following message: "ProcessSession rollback" after 1 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 75cf19a34b..92dd64548b 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -47,7 +47,7 @@ # Background @given("the content of \"{directory}\" is monitored") def step_impl(context, directory): - context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(context.feature_id, directory))) + context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(directory))) def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'): @@ -318,7 +318,7 @@ def step_impl(context, processor_name): # NiFi setups -@given("a NiFi flow receiving data from a RemoteProcessGroup \"{source_name}\" on port 8080") +@given("a NiFi flow receiving data from a RemoteProcessGroup \"{source_name}\"") def step_impl(context, source_name): remote_process_group = context.test.get_remote_process_group_by_name("RemoteProcessGroup") source = context.test.generate_input_port_for_remote_process_group(remote_process_group, source_name) @@ -335,6 +335,11 @@ def step_impl(context, flow_name): context.test.acquire_container(context=context, name=flow_name, engine='nifi') +@given("SSL is enabled in NiFi flow") +def step_impl(context): + context.test.enable_ssl_in_nifi() + + @given("a transient MiNiFi flow with the name \"{flow_name}\" is set up") def step_impl(context, flow_name): context.test.acquire_container(context=context, name=flow_name, command=["/bin/sh", "-c", "timeout 10s ./bin/minifi.sh run && sleep 100"]) @@ -496,9 +501,20 @@ def setUpSslContextServiceForProcessor(context, processor_name: str): minifi_key_file = '/tmp/resources/minifi_client.key' root_ca_crt_file = '/tmp/resources/root_ca.crt' ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) - post_elasticsearch_json = context.test.get_node_by_name(processor_name) - post_elasticsearch_json.controller_services.append(ssl_context_service) - post_elasticsearch_json.set_property("SSL Context Service", ssl_context_service.name) + processor = context.test.get_node_by_name(processor_name) + processor.controller_services.append(ssl_context_service) + processor.set_property("SSL Context Service", ssl_context_service.name) + + +def setUpSslContextServiceForRPG(context, rpg_name: str): + minifi_crt_file = '/tmp/resources/minifi_client.crt' + minifi_key_file = '/tmp/resources/minifi_client.key' + root_ca_crt_file = '/tmp/resources/root_ca.crt' + ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(ssl_context_service) + rpg = context.test.get_remote_process_group_by_name(rpg_name) + rpg.add_property("SSL Context Service", ssl_context_service.name) @given(u'a SSL context service is set up for PostElasticsearch and Elasticsearch') @@ -551,14 +567,14 @@ def step_impl(context): root_ca_crt_file = '/tmp/resources/root_ca.crt' ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) - splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.test.root_ca_cert, context.test.root_ca_key) + splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.root_ca_cert, context.root_ca_key) put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP") put_splunk_http.controller_services.append(ssl_context_service) put_splunk_http.set_property("SSL Context Service", ssl_context_service.name) query_splunk_indexing_status = context.test.get_node_by_name("QuerySplunkIndexingStatus") query_splunk_indexing_status.controller_services.append(ssl_context_service) query_splunk_indexing_status.set_property("SSL Context Service", ssl_context_service.name) - context.test.enable_splunk_hec_ssl('splunk', OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, splunk_cert), OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, splunk_key), OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, context.test.root_ca_cert)) + context.test.enable_splunk_hec_ssl('splunk', OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, splunk_cert), OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, splunk_key), OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, context.root_ca_cert)) @given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') @@ -690,11 +706,11 @@ def step_impl(context, content, topic_name): @when("a message with content \"{content}\" is published to the \"{topic_name}\" topic using an ssl connection") def step_impl(context, content, topic_name): ca_cert_file = tempfile.NamedTemporaryFile(delete=False) - ca_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=context.test.root_ca_cert)) + ca_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=context.root_ca_cert)) ca_cert_file.close() os.chmod(ca_cert_file.name, 0o644) - client_cert, client_key = make_client_cert(socket.gethostname(), context.test.root_ca_cert, context.test.root_ca_key) + client_cert, client_key = make_client_cert(socket.gethostname(), context.root_ca_cert, context.root_ca_key) client_cert_file = tempfile.NamedTemporaryFile(delete=False) client_cert_file.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, client_cert)) client_cert_file.close() @@ -1065,6 +1081,11 @@ def step_impl(context): context.test.set_ssl_context_properties_in_minifi() +@given("SSL properties are set in MiNiFi") +def step_impl(context): + context.test.set_ssl_context_properties_in_minifi() + + @given(u'a MiNiFi C2 server is set up') def step_impl(context): context.test.acquire_container(context=context, name="minifi-c2-server", engine="minifi-c2-server") @@ -1280,6 +1301,11 @@ def step_impl(context, processor_name: str): setUpSslContextServiceForProcessor(context, processor_name) +@given(u'a SSL context service is set up for the following remote process group: \"{remote_process_group}\"') +def step_impl(context, remote_process_group: str): + setUpSslContextServiceForRPG(context, remote_process_group) + + # Nginx reverse proxy @given(u'a reverse proxy is set up to forward requests to the Grafana Loki server') def step_impl(context): diff --git a/docker/test/integration/minifi/core/InputPort.py b/docker/test/integration/minifi/core/InputPort.py index 817f5791fc..cb9d60dc62 100644 --- a/docker/test/integration/minifi/core/InputPort.py +++ b/docker/test/integration/minifi/core/InputPort.py @@ -22,3 +22,6 @@ def __init__(self, name=None, remote_process_group=None): super(InputPort, self).__init__(name=name) self.remote_process_group = remote_process_group + self.properties = {} + if self.remote_process_group: + self.properties = self.remote_process_group.properties diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py b/docker/test/integration/minifi/core/RemoteProcessGroup.py index c57b2d1a7c..e3f54fa439 100644 --- a/docker/test/integration/minifi/core/RemoteProcessGroup.py +++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py @@ -27,9 +27,13 @@ def __init__(self, url, name=None): self.name = name self.url = url + self.properties = {} def get_name(self): return self.name def get_uuid(self): return self.uuid + + def add_property(self, name, value): + self.properties[name] = value diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py index e1769f632c..c1043d5067 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -94,7 +94,8 @@ def serialize_node(self, connectable, root, visited): res_group['inputPorts'].append({ 'identifier': str(connectable.instance_id), - 'name': connectable.name + 'name': connectable.name, + 'properties': connectable.properties }) if isinstance(connectable, Processor): diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py index ff8b37de80..dc912b1224 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -96,7 +96,7 @@ def serialize_node(self, connectable, res=None, visited=None): 'id': str(connectable.instance_id), 'name': connectable.name, 'max concurrent tasks': 1, - 'Properties': {} + 'Properties': connectable.properties }) if isinstance(connectable, Processor): diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index aee1f7c390..73e6d41233 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -292,6 +292,7 @@ std::pair RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo if (!token.empty()) client->setRequestHeader("Authorization", token); + client->setVerbose(false); if (client->submit() && client->getResponseCode() == 200) { const std::vector &response_body = client->getResponseBody(); diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index b09b4f5395..4ab3aba0b3 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -141,7 +141,7 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .rpg_proxy_port = {"proxyPort"}, .rpg_input_ports = {"inputPorts"}, .rpg_output_ports = {"outputPorts"}, - .rpg_port_properties = {}, + .rpg_port_properties = {"properties"}, .rpg_port_target_id = {"targetId"}, .parameter_contexts = {"parameterContexts"},