From e198ec710ab3dcd2d1b103fe3a528668c91aaab5 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Thu, 19 Jan 2023 09:04:40 +0100 Subject: [PATCH] Refactor component configs (#63) Closes #62 --- kpops/cli/main.py | 6 +- kpops/cli/pipeline_config.py | 34 +- kpops/component_handlers/__init__.py | 6 +- .../component_handlers/helm_wrapper/model.py | 24 +- ...or_handler.py => kafka_connect_handler.py} | 141 +----- kpops/components/base_components/kafka_app.py | 10 +- .../base_components/kafka_connect.py | 158 +++++- .../base_components/kubernetes_app.py | 29 +- .../base_components/pipeline_component.py | 2 + .../producer/producer_app.py | 14 +- .../streams_bootstrap/streams/streams_app.py | 10 +- kpops/pipeline_generator/pipeline.py | 9 +- tests/cli/test_handlers.py | 16 +- .../kafka_connect/test_connect_handler.py | 387 +-------------- .../kafka_connect/test_connect_wrapper.py | 31 +- tests/components/test_kafka_app.py | 21 +- ..._connect.py => test_kafka_connect_sink.py} | 458 ++++++++---------- tests/components/test_kafka_connect_source.py | 387 +++++++++++++++ tests/components/test_kubernetes_app.py | 62 ++- tests/components/test_producer_app.py | 5 +- tests/components/test_streams_app.py | 34 +- tests/pipeline/resources/defaults.yaml | 4 +- .../kafka-connect-sink/pipeline.yaml | 2 + .../defaults_development.yaml | 3 +- .../pipeline-with-env-defaults/defaults.yaml | 3 +- .../defaults_development.yaml | 3 +- .../pipeline/snapshots/snap_test_pipeline.py | 169 ++++++- tests/pipeline/test_components/components.py | 1 + .../components.py | 1 + 29 files changed, 1066 insertions(+), 964 deletions(-) rename kpops/component_handlers/kafka_connect/{connector_handler.py => kafka_connect_handler.py} (50%) rename tests/components/{test_kafka_connect.py => test_kafka_connect_sink.py} (51%) create mode 100644 tests/components/test_kafka_connect_source.py diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 859872703..1670f35ea 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -10,7 +10,9 @@ from kpops.cli.pipeline_config import ENV_PREFIX, PipelineConfig from kpops.cli.registry import Registry from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.connector_handler import ConnectorHandler +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper @@ -110,7 +112,7 @@ def setup_handlers( components_module: str | None, config: PipelineConfig ) -> ComponentHandlers: schema_handler = SchemaHandler.load_schema_handler(components_module, config) - connector_handler = ConnectorHandler.from_pipeline_config(config) + connector_handler = KafkaConnectHandler.from_pipeline_config(config) proxy_wrapper = ProxyWrapper(config) topic_handler = TopicHandler(proxy_wrapper) diff --git a/kpops/cli/pipeline_config.py b/kpops/cli/pipeline_config.py index 9942db3d1..71b0bfa6f 100644 --- a/kpops/cli/pipeline_config.py +++ b/kpops/cli/pipeline_config.py @@ -3,11 +3,7 @@ from pydantic import BaseConfig, BaseSettings, Field from pydantic.env_settings import SettingsSourceCallable -from kpops.component_handlers.helm_wrapper.model import ( - HelmConfig, - HelmDiffConfig, - HelmRepoConfig, -) +from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig from kpops.utils.yaml_loading import load_yaml_file ENV_PREFIX = "KPOPS_" @@ -24,22 +20,6 @@ class TopicNameConfig(BaseSettings): ) -class KafkaConnectResetterHelmConfig(BaseSettings): - helm_config: HelmRepoConfig = Field( - default=HelmRepoConfig( - repository_name="bakdata-kafka-connect-resetter", - url="https://bakdata.github.io/kafka-connect-resetter/", - ), - description="Configuration of Kafka connect resetter Helm Chart", - ) - version: str = "1.0.4" - helm_values: dict = Field( - default={}, - description="Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", - ) - namespace: str = Field(default="") - - class PipelineConfig(BaseSettings): defaults_path: Path = Field( default=..., @@ -91,18 +71,6 @@ class PipelineConfig(BaseSettings): helm_config: HelmConfig = Field(default=HelmConfig()) helm_diff_config: HelmDiffConfig = Field(default=HelmDiffConfig()) - streams_bootstrap_helm_config: HelmRepoConfig = Field( - default=HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", - ), - description="Configuration for Streams Bootstrap Helm Charts", - ) - kafka_connect_resetter_config: KafkaConnectResetterHelmConfig = Field( - default=KafkaConnectResetterHelmConfig(), - description="Configuration of kafka connect resetter helm chart and values. " - "This is used for cleaning/resettting Kafka connectors, see https://github.com/bakdata/kafka-connect-resetter", - ) retain_clean_jobs: bool = Field( default=False, env=f"{ENV_PREFIX}RETAIN_CLEAN_JOBS", diff --git a/kpops/component_handlers/__init__.py b/kpops/component_handlers/__init__.py index 3da70d66a..988ca7ee7 100644 --- a/kpops/component_handlers/__init__.py +++ b/kpops/component_handlers/__init__.py @@ -2,7 +2,9 @@ from typing import TYPE_CHECKING -from kpops.component_handlers.kafka_connect.connector_handler import ConnectorHandler +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) if TYPE_CHECKING: from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler @@ -13,7 +15,7 @@ class ComponentHandlers: def __init__( self, schema_handler: SchemaHandler | None, - connector_handler: ConnectorHandler, + connector_handler: KafkaConnectHandler, topic_handler: TopicHandler, ) -> None: self.schema_handler = schema_handler diff --git a/kpops/component_handlers/helm_wrapper/model.py b/kpops/component_handlers/helm_wrapper/model.py index 04f40690d..d0197a66a 100644 --- a/kpops/component_handlers/helm_wrapper/model.py +++ b/kpops/component_handlers/helm_wrapper/model.py @@ -3,29 +3,33 @@ from typing import Iterator import yaml +from pydantic import BaseModel, Field +from kpops.utils.pydantic import CamelCaseConfig -@dataclass -class HelmDiffConfig: + +class HelmDiffConfig(BaseModel): enable: bool = False - ignore: set[str] = field( - default_factory=set, - ) + ignore: set[str] = Field(default_factory=set) -@dataclass -class RepoAuthFlags: +class RepoAuthFlags(BaseModel): username: str | None = None password: str | None = None ca_file: Path | None = None insecure_skip_tls_verify: bool = False + class Config(CamelCaseConfig): + pass -@dataclass -class HelmRepoConfig: + +class HelmRepoConfig(BaseModel): repository_name: str url: str - repo_auth_flags: RepoAuthFlags = field(default_factory=RepoAuthFlags) + repo_auth_flags: RepoAuthFlags = Field(default=RepoAuthFlags()) + + class Config(CamelCaseConfig): + pass @dataclass diff --git a/kpops/component_handlers/kafka_connect/connector_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py similarity index 50% rename from kpops/component_handlers/kafka_connect/connector_handler.py rename to kpops/component_handlers/kafka_connect/kafka_connect_handler.py index 50e4e9d0c..5a00241a5 100644 --- a/kpops/component_handlers/kafka_connect/connector_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -2,70 +2,29 @@ import logging import sys -from functools import cached_property from typing import TYPE_CHECKING -from kpops.component_handlers.helm_wrapper.helm import Helm -from kpops.component_handlers.helm_wrapper.helm_diff import HelmDiff -from kpops.component_handlers.helm_wrapper.model import ( - HelmConfig, - HelmUpgradeInstallFlags, -) -from kpops.component_handlers.helm_wrapper.utils import trim_release_name from kpops.component_handlers.kafka_connect.connect_wrapper import ConnectWrapper from kpops.component_handlers.kafka_connect.exception import ConnectorNotFoundException -from kpops.component_handlers.kafka_connect.model import ( - KafkaConnectConfig, - KafkaConnectorType, - KafkaConnectResetterConfig, - KafkaConnectResetterValues, -) +from kpops.component_handlers.kafka_connect.model import KafkaConnectConfig from kpops.component_handlers.kafka_connect.timeout import timeout from kpops.utils.colorify import greenify, magentaify, yellowify from kpops.utils.dict_differ import render_diff if TYPE_CHECKING: - from kpops.cli.pipeline_config import KafkaConnectResetterHelmConfig, PipelineConfig + from kpops.cli.pipeline_config import PipelineConfig -log = logging.getLogger("KafkaConnect") +log = logging.getLogger("KafkaConnectHandler") -class ConnectorHandler: +class KafkaConnectHandler: def __init__( self, connect_wrapper: ConnectWrapper, timeout: int, - kafka_connect_resetter_helm_config: KafkaConnectResetterHelmConfig, - broker: str, - helm_diff: HelmDiff, - helm_config: HelmConfig = HelmConfig(), ): self._connect_wrapper = connect_wrapper self._timeout = timeout - self._helm_config = helm_config - self._helm_repo_config = kafka_connect_resetter_helm_config.helm_config - self.helm_diff = helm_diff - - helm_repo_config = kafka_connect_resetter_helm_config.helm_config - self.kafka_connect_resseter_chart = ( - f"{helm_repo_config.repository_name}/kafka-connect-resetter" - ) - self.chart_version = kafka_connect_resetter_helm_config.version - self.namespace = ( - kafka_connect_resetter_helm_config.namespace # namespace where the re-setter jobs should be deployed to - ) - self.broker = broker - self.values = kafka_connect_resetter_helm_config.helm_values - - @cached_property - def helm(self): - helm = Helm(self._helm_config) - helm.add_repo( - self._helm_repo_config.repository_name, - self._helm_repo_config.url, - self._helm_repo_config.repo_auth_flags, - ) - return helm def create_connector( self, @@ -127,55 +86,6 @@ def destroy_connector(self, connector_name: str, dry_run: bool) -> None: f"Connector Destruction: the connector {connector_name} does not exist. Skipping." ) - def clean_connector( - self, - connector_name: str, - connector_type: KafkaConnectorType, - dry_run: bool, - retain_clean_jobs: bool, - **kwargs, - ) -> None: - """ - Cleans the connector from the cluster. At first, it deletes the previous cleanup job (connector resetter) - to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. - If the retain_clean_jobs flag is set to false the cleanup job will be deleted. - :param connector_name: Name of the connector - :param connector_type: Type of the connector (SINK or SOURCE) - :param dry_run: If the cleanup should be run in dry run mode or not - :param retain_clean_jobs: If the cleanup job should be kept - :param kwargs: Other values for the KafkaConnectResetter - """ - suffix = "-clean" - clean_up_release_name = connector_name + suffix - trimmed_name = trim_release_name(clean_up_release_name, suffix) - - log.info( - magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}" - ) - ) - self.__uninstall_kafka_resetter(trimmed_name, dry_run) - - log.info( - magentaify( - f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}" - ) - ) - - stdout = self.__install_kafka_resetter( - trimmed_name, connector_name, connector_type, dry_run, kwargs - ) - - if dry_run and self.helm_diff.config.enable: - current_release = self.helm.get_manifest(trimmed_name, self.namespace) - new_release = Helm.load_helm_manifest(stdout) - helm_diff = HelmDiff.get_diff(current_release, new_release) - self.helm_diff.log_helm_diff(helm_diff, log) - - if not retain_clean_jobs: - log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) - self.__uninstall_kafka_resetter(trimmed_name, dry_run) - def __dry_run_connector_creation( self, connector_name: str, kafka_connect_config: KafkaConnectConfig ) -> None: @@ -233,52 +143,11 @@ def __dry_run_connector_deletion(self, connector_name: str) -> None: f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping." ) - def __install_kafka_resetter( - self, - release_name: str, - connector_name: str, - connector_type: KafkaConnectorType, - dry_run: bool, - kwargs, - ) -> str: - return self.helm.upgrade_install( - release_name=release_name, - namespace=self.namespace, - chart=self.kafka_connect_resseter_chart, - dry_run=dry_run, - flags=HelmUpgradeInstallFlags( - version=self.chart_version, wait_for_jobs=True, wait=True - ), - values={ - **KafkaConnectResetterValues( - config=KafkaConnectResetterConfig( - connector=connector_name, - brokers=self.broker, - **kwargs, - ), - connector_type=connector_type.value, - name_override=connector_name, - ).dict(), - **self.values, - }, - ) - - def __uninstall_kafka_resetter(self, release_name: str, dry_run: bool) -> None: - self.helm.uninstall( - namespace=self.namespace, - release_name=release_name, - dry_run=dry_run, - ) - @classmethod def from_pipeline_config( cls, pipeline_config: PipelineConfig - ) -> ConnectorHandler: # TODO: annotate as typing.Self once mypy supports it + ) -> KafkaConnectHandler: # TODO: annotate as typing.Self once mypy supports it return cls( connect_wrapper=ConnectWrapper(host=pipeline_config.kafka_connect_host), timeout=pipeline_config.timeout, - kafka_connect_resetter_helm_config=pipeline_config.kafka_connect_resetter_config, - helm_config=pipeline_config.helm_config, - broker=pipeline_config.broker, - helm_diff=HelmDiff(pipeline_config.helm_diff_config), ) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index c2fb849c5..f9e0d54d3 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -6,7 +6,10 @@ from typing_extensions import override from kpops.component_handlers.helm_wrapper.helm import Helm -from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags +from kpops.component_handlers.helm_wrapper.model import ( + HelmRepoConfig, + HelmUpgradeInstallFlags, +) from kpops.component_handlers.helm_wrapper.utils import trim_release_name from kpops.components.base_components.kubernetes_app import ( KubernetesApp, @@ -39,6 +42,11 @@ class KafkaApp(KubernetesApp): _type = "kafka-app" app: KafkaAppConfig + repo_config: HelmRepoConfig = HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", + ) + version = "2.7.0" def __init__(self, **kwargs) -> None: super().__init__(**kwargs) diff --git a/kpops/components/base_components/kafka_connect.py b/kpops/components/base_components/kafka_connect.py index f126dce65..258932afb 100644 --- a/kpops/components/base_components/kafka_connect.py +++ b/kpops/components/base_components/kafka_connect.py @@ -1,26 +1,77 @@ +from __future__ import annotations + import json -import os +import logging from abc import ABC +from functools import cached_property from typing import NoReturn +from pydantic import Field from typing_extensions import override -from kpops.cli.pipeline_config import ENV_PREFIX -from kpops.component_handlers.kafka_connect.connector_handler import KafkaConnectorType -from kpops.component_handlers.kafka_connect.model import KafkaConnectConfig +from kpops.component_handlers.helm_wrapper.helm import Helm +from kpops.component_handlers.helm_wrapper.helm_diff import HelmDiff +from kpops.component_handlers.helm_wrapper.model import ( + HelmRepoConfig, + HelmUpgradeInstallFlags, +) +from kpops.component_handlers.helm_wrapper.utils import trim_release_name +from kpops.component_handlers.kafka_connect.model import ( + KafkaConnectConfig, + KafkaConnectorType, + KafkaConnectResetterConfig, + KafkaConnectResetterValues, +) from kpops.components.base_components.base_defaults_component import deduplicate from kpops.components.base_components.models.from_section import FromTopic from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.utils.colorify import magentaify +from kpops.utils.pydantic import CamelCaseConfig + +log = logging.getLogger("KafkaConnector") class KafkaConnector(PipelineComponent, ABC): _type = "kafka-connect" app: KafkaConnectConfig + repo_config: HelmRepoConfig = HelmRepoConfig( + repository_name="bakdata-kafka-connect-resetter", + url="https://bakdata.github.io/kafka-connect-resetter/", + ) + namespace: str + version: str = "1.0.4" + resetter_values: dict = Field( + default_factory=dict, + description="Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", + ) + + class Config(CamelCaseConfig): + pass + def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.prepare_connector_config() + @cached_property + def helm(self) -> Helm: + helm_repo_config = self.repo_config + helm = Helm(self.config.helm_config) + helm.add_repo( + helm_repo_config.repository_name, + helm_repo_config.url, + helm_repo_config.repo_auth_flags, + ) + return helm + + @cached_property + def helm_diff(self) -> HelmDiff: + return HelmDiff(self.config.helm_diff_config) + + @property + def kafka_connect_resetter_chart(self) -> str: + return f"{self.repo_config.repository_name}/kafka-connect-resetter" + def prepare_connector_config(self) -> None: """ Substitute component related variables in config @@ -62,9 +113,96 @@ def clean(self, dry_run: bool) -> None: ) self.handlers.topic_handler.delete_topics(self.to, dry_run=dry_run) + def _run_connect_resetter( + self, + connector_name: str, + connector_type: KafkaConnectorType, + dry_run: bool, + retain_clean_jobs: bool, + **kwargs, + ) -> None: + """ + Cleans the connector from the cluster. At first, it deletes the previous cleanup job (connector resetter) + to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. + If the retain_clean_jobs flag is set to false the cleanup job will be deleted. + :param connector_name: Name of the connector + :param connector_type: Type of the connector (SINK or SOURCE) + :param dry_run: If the cleanup should be run in dry run mode or not + :param retain_clean_jobs: If the cleanup job should be kept + :param kwargs: Other values for the KafkaConnectResetter + """ + suffix = "-clean" + clean_up_release_name = connector_name + suffix + trimmed_name = trim_release_name(clean_up_release_name, suffix) + + log.info( + magentaify( + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}" + ) + ) + self.__uninstall_connect_resetter(trimmed_name, dry_run) + + log.info( + magentaify( + f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}" + ) + ) + + stdout = self.__install_connect_resetter( + trimmed_name, connector_name, connector_type, dry_run, kwargs + ) + + if dry_run and self.helm_diff.config.enable: + current_release = self.helm.get_manifest(trimmed_name, self.namespace) + new_release = Helm.load_helm_manifest(stdout) + helm_diff = HelmDiff.get_diff(current_release, new_release) + self.helm_diff.log_helm_diff(helm_diff, log) + + if not retain_clean_jobs: + log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) + self.__uninstall_connect_resetter(trimmed_name, dry_run) + + def __install_connect_resetter( + self, + release_name: str, + connector_name: str, + connector_type: KafkaConnectorType, + dry_run: bool, + kwargs, + ) -> str: + return self.helm.upgrade_install( + release_name=release_name, + namespace=self.namespace, + chart=self.kafka_connect_resetter_chart, + dry_run=dry_run, + flags=HelmUpgradeInstallFlags( + version=self.version, wait_for_jobs=True, wait=True + ), + values={ + **KafkaConnectResetterValues( + config=KafkaConnectResetterConfig( + connector=connector_name, + brokers=self.config.broker, + **kwargs, + ), + connector_type=connector_type.value, + name_override=connector_name, + ).dict(), + **self.resetter_values, + }, + ) + + def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None: + self.helm.uninstall( + namespace=self.namespace, + release_name=release_name, + dry_run=dry_run, + ) + class KafkaSourceConnector(KafkaConnector): _type = "kafka-source-connector" + offset_topic: str | None = None @override def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @@ -80,12 +218,12 @@ def clean(self, dry_run: bool) -> None: self.__run_kafka_connect_resetter(dry_run) def __run_kafka_connect_resetter(self, dry_run: bool) -> None: - self.handlers.connector_handler.clean_connector( + self._run_connect_resetter( connector_name=self.name, connector_type=KafkaConnectorType.SOURCE, dry_run=dry_run, retain_clean_jobs=self.config.retain_clean_jobs, - offset_topic=os.environ[f"{ENV_PREFIX}KAFKA_CONNECT_RESETTER_OFFSET_TOPIC"], + offset_topic=self.offset_topic, ) @@ -109,17 +247,17 @@ def set_error_topic(self, topic_name: str) -> None: @override def reset(self, dry_run: bool) -> None: - self.__clean_sink_connector(dry_run, delete_consumer_group=False) + self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=False) @override def clean(self, dry_run: bool) -> None: super().clean(dry_run) - self.__clean_sink_connector(dry_run, delete_consumer_group=True) + self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True) - def __clean_sink_connector( + def __run_kafka_connect_resetter( self, dry_run: bool, delete_consumer_group: bool ) -> None: - self.handlers.connector_handler.clean_connector( + self._run_connect_resetter( connector_name=self.name, connector_type=KafkaConnectorType.SINK, dry_run=dry_run, diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index 451ea6b58..671830e48 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -4,7 +4,7 @@ import re from functools import cached_property -from pydantic import BaseModel +from pydantic import BaseModel, Extra from typing_extensions import override from kpops.component_handlers.helm_wrapper.helm import Helm @@ -25,10 +25,8 @@ class KubernetesAppConfig(BaseModel): - namespace: str - class Config(CamelCaseConfig): - pass + extra = Extra.allow # TODO: label and annotations @@ -37,11 +35,12 @@ class KubernetesApp(PipelineComponent): _type = "kubernetes-app" app: KubernetesAppConfig - + repo_config: HelmRepoConfig | None = None + namespace: str version: str | None = None - class Config: - keep_untouched = (cached_property,) + class Config(CamelCaseConfig): + pass def __init__(self, **kwargs): super().__init__(**kwargs) @@ -50,11 +49,11 @@ def __init__(self, **kwargs): @cached_property def helm(self) -> Helm: helm = Helm(self.config.helm_config) - if self.helm_repo_config is not None: + if self.repo_config is not None: helm.add_repo( - self.helm_repo_config.repository_name, - self.helm_repo_config.url, - self.helm_repo_config.repo_auth_flags, + self.repo_config.repository_name, + self.repo_config.url, + self.repo_config.repo_auth_flags, ) return helm @@ -67,14 +66,6 @@ def helm_release_name(self) -> str: """The name for the Helm release. Can be overridden.""" return self.name - @property - def namespace(self) -> str: - return self.app.namespace - - @property - def helm_repo_config(self) -> HelmRepoConfig | None: - return None - @override def deploy(self, dry_run: bool) -> None: stdout = self.helm.upgrade_install( diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 4003c6621..3ab04395c 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +from functools import cached_property from pydantic import BaseConfig, Extra, Field @@ -28,6 +29,7 @@ class PipelineComponent(BaseDefaultsComponent): class Config(BaseConfig): extra = Extra.allow + keep_untouched = (cached_property,) def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 9eaeaa427..d202ba0c4 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -1,7 +1,8 @@ +from __future__ import annotations + from pydantic import BaseConfig, Extra from typing_extensions import override -from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig from kpops.components.base_components.kafka_app import KafkaApp from kpops.components.base_components.models.to_section import ( OutputTopicTypes, @@ -42,17 +43,14 @@ def add_extra_output_topic(self, topic_name: str, role: str) -> None: @override def get_helm_chart(self) -> str: - return f"{self.config.streams_bootstrap_helm_config.repository_name}/{AppType.PRODUCER_APP.value}" + return f"{self.repo_config.repository_name}/{AppType.PRODUCER_APP.value}" @property @override def clean_up_helm_chart(self) -> str: - return f"{self.config.streams_bootstrap_helm_config.repository_name}/{AppType.CLEANUP_PRODUCER_APP.value}" - - @property - @override - def helm_repo_config(self) -> HelmRepoConfig | None: - return self.config.streams_bootstrap_helm_config + return ( + f"{self.repo_config.repository_name}/{AppType.CLEANUP_PRODUCER_APP.value}" + ) @override def clean(self, dry_run: bool) -> None: diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index b46957a20..9317fa712 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -3,7 +3,6 @@ from pydantic import BaseConfig, Extra from typing_extensions import override -from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig from kpops.components.base_components.kafka_app import KafkaApp from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import StreamsAppConfig @@ -54,17 +53,12 @@ def add_extra_output_topic(self, topic_name: str, role: str) -> None: @override def get_helm_chart(self) -> str: - return f"{self.config.streams_bootstrap_helm_config.repository_name}/{AppType.STREAMS_APP.value}" - - @property - @override - def helm_repo_config(self) -> HelmRepoConfig | None: - return self.config.streams_bootstrap_helm_config + return f"{self.repo_config.repository_name}/{AppType.STREAMS_APP.value}" @property @override def clean_up_helm_chart(self) -> str: - return f"{self.config.streams_bootstrap_helm_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" + return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" @override def reset(self, dry_run: bool) -> None: diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index daa8be3f1..081e6d5e1 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -160,9 +160,12 @@ def parse_components( self.components.extend(inflated_components) previous_component = inflated_components.pop() except Exception as ex: - raise ParsingException( - f"Error enriching {component['type']} component {component['name']}" - ) from ex + if "name" in component: + raise ParsingException( + f"Error enriching {component['type']} component {component['name']}" + ) from ex + else: + raise ParsingException() from ex def populate_pipeline_component_names( self, inflated_components: list[PipelineComponent] diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index 4ab862614..509c5e0cc 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -5,7 +5,9 @@ from kpops.cli.main import setup_handlers from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.connector_handler import ConnectorHandler +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from tests.cli.resources.module import CustomSchemaProvider @@ -20,8 +22,8 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): kafka_rest_host="https://testhost:8082", schema_registry_url=None, ) - connector_handler_mock = mocker.patch("kpops.cli.main.ConnectorHandler") - connector_handler = ConnectorHandler.from_pipeline_config(pipeline_config=config) + connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler = KafkaConnectHandler.from_pipeline_config(pipeline_config=config) connector_handler_mock.from_pipeline_config.return_value = connector_handler topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") @@ -44,7 +46,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): assert actual_handlers.topic_handler == expected.topic_handler assert actual_handlers.schema_handler is None - assert isinstance(actual_handlers.connector_handler, ConnectorHandler) + assert isinstance(actual_handlers.connector_handler, KafkaConnectHandler) assert isinstance(actual_handlers.topic_handler, TopicHandler) @@ -59,8 +61,8 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_handler = SchemaHandler.load_schema_handler(MODULE, config) schema_handler_mock.load_schema_handler.return_value = schema_handler - connector_handler_mock = mocker.patch("kpops.cli.main.ConnectorHandler") - connector_handler = ConnectorHandler.from_pipeline_config(pipeline_config=config) + connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler = KafkaConnectHandler.from_pipeline_config(pipeline_config=config) connector_handler_mock.from_pipeline_config.return_value = connector_handler topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") @@ -85,5 +87,5 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): assert actual_handlers.topic_handler == expected.topic_handler assert isinstance(actual_handlers.schema_handler, SchemaHandler) - assert isinstance(actual_handlers.connector_handler, ConnectorHandler) + assert isinstance(actual_handlers.connector_handler, KafkaConnectHandler) assert isinstance(actual_handlers.topic_handler, TopicHandler) diff --git a/tests/component_handlers/kafka_connect/test_connect_handler.py b/tests/component_handlers/kafka_connect/test_connect_handler.py index 7edad81cf..e12df7d3c 100644 --- a/tests/component_handlers/kafka_connect/test_connect_handler.py +++ b/tests/component_handlers/kafka_connect/test_connect_handler.py @@ -1,25 +1,14 @@ -from pathlib import Path from unittest import mock from unittest.mock import MagicMock import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import KafkaConnectResetterHelmConfig, PipelineConfig -from kpops.component_handlers.helm_wrapper.helm_diff import HelmDiff -from kpops.component_handlers.helm_wrapper.model import ( - HelmDiffConfig, - HelmRepoConfig, - HelmUpgradeInstallFlags, - RepoAuthFlags, -) -from kpops.component_handlers.kafka_connect.connect_wrapper import ConnectWrapper -from kpops.component_handlers.kafka_connect.connector_handler import ConnectorHandler from kpops.component_handlers.kafka_connect.exception import ConnectorNotFoundException -from kpops.component_handlers.kafka_connect.model import ( - KafkaConnectConfig, - KafkaConnectorType, +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, ) +from kpops.component_handlers.kafka_connect.model import KafkaConnectConfig from kpops.utils.colorify import greenify, magentaify, yellowify CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop" @@ -27,54 +16,46 @@ class TestConnectorHandler: - @pytest.fixture(autouse=True) + @pytest.fixture def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.connector_handler.log.info" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.info" ) - @pytest.fixture(autouse=True) + @pytest.fixture def log_warning_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.connector_handler.log.warning" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.warning" ) - @pytest.fixture(autouse=True) + @pytest.fixture def log_error_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.connector_handler.log.error" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.error" ) - @pytest.fixture(autouse=True) + @pytest.fixture def renderer_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.connector_handler.render_diff" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.render_diff" ) - @pytest.fixture(autouse=True) - def helm_wrapper_mock(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch( - "kpops.component_handlers.kafka_connect.connector_handler.Helm" - ).return_value - - @pytest.fixture(autouse=True) - def helm_repo_config(self) -> HelmRepoConfig: - return HelmRepoConfig( - repository_name="bakdata-kafka-connect-resetter", - url="https://bakdata.github.io/kafka-connect-resetter/", + @staticmethod + def connector_handler(connect_wrapper: MagicMock) -> KafkaConnectHandler: + return KafkaConnectHandler( + connect_wrapper=connect_wrapper, + timeout=0, ) def test_should_create_connector_in_dry_run( self, renderer_diff_mock: MagicMock, log_info_mock: MagicMock, - helm_repo_config: HelmRepoConfig, ): connector_wrapper = MagicMock() + handler = self.connector_handler(connector_wrapper) renderer_diff_mock.return_value = None - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) - config = KafkaConnectConfig() handler.create_connector(CONNECTOR_NAME, config, True) connector_wrapper.get_connector.assert_called_once_with(CONNECTOR_NAME) @@ -98,11 +79,9 @@ def test_should_create_connector_in_dry_run( def test_should_log_correct_message_when_create_connector_and_connector_not_exists_in_dry_run( self, log_info_mock: MagicMock, - helm_repo_config, ): connector_wrapper = MagicMock() - - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) connector_wrapper.get_connector.side_effect = ConnectorNotFoundException() @@ -130,7 +109,6 @@ def test_should_log_invalid_config_when_create_connector_in_dry_run( self, renderer_diff_mock: MagicMock, log_error_mock: MagicMock, - helm_repo_config, ): connector_wrapper = MagicMock() @@ -140,7 +118,7 @@ def test_should_log_invalid_config_when_create_connector_in_dry_run( ] connector_wrapper.validate_connector_config.return_value = errors - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) config = KafkaConnectConfig() @@ -160,11 +138,9 @@ def test_should_log_invalid_config_when_create_connector_in_dry_run( def test_should_call_update_connector_config_when_connector_exists_not_dry_run( self, - helm_repo_config, ): connector_wrapper = MagicMock() - - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) config = KafkaConnectConfig() handler.create_connector(CONNECTOR_NAME, config, False) @@ -178,11 +154,10 @@ def test_should_call_update_connector_config_when_connector_exists_not_dry_run( def test_should_call_create_connector_when_connector_does_not_exists_not_dry_run( self, - helm_repo_config, ): connector_wrapper = MagicMock() - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) config = KafkaConnectConfig() connector_wrapper.get_connector.side_effect = ConnectorNotFoundException() @@ -195,11 +170,10 @@ def test_should_call_create_connector_when_connector_does_not_exists_not_dry_run def test_should_print_correct_log_when_destroying_connector_in_dry_run( self, log_info_mock: MagicMock, - helm_repo_config: HelmRepoConfig, ): connector_wrapper = MagicMock() - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) handler.destroy_connector(CONNECTOR_NAME, True) @@ -212,12 +186,11 @@ def test_should_print_correct_log_when_destroying_connector_in_dry_run( def test_should_print_correct_warning_log_when_destroying_connector_and_connector_exists_in_dry_run( self, log_warning_mock: MagicMock, - helm_repo_config, ): connector_wrapper = MagicMock() connector_wrapper.get_connector.side_effect = ConnectorNotFoundException() - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) handler.destroy_connector(CONNECTOR_NAME, True) @@ -227,10 +200,9 @@ def test_should_print_correct_warning_log_when_destroying_connector_and_connecto def test_should_call_delete_connector_when_destroying_existing_connector_not_dry_run( self, - helm_repo_config, ): connector_wrapper = MagicMock() - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) handler.destroy_connector(CONNECTOR_NAME, False) connector_wrapper.assert_has_calls( @@ -243,322 +215,13 @@ def test_should_call_delete_connector_when_destroying_existing_connector_not_dry def test_should_print_correct_warning_log_when_destroying_connector_and_connector_exists_not_dry_run( self, log_warning_mock: MagicMock, - helm_repo_config, ): connector_wrapper = MagicMock() connector_wrapper.get_connector.side_effect = ConnectorNotFoundException() - - handler = self.create_connector_handler(connector_wrapper, helm_repo_config) + handler = self.connector_handler(connector_wrapper) handler.destroy_connector(CONNECTOR_NAME, False) log_warning_mock.assert_called_once_with( f"Connector Destruction: the connector {CONNECTOR_NAME} does not exist. Skipping." ) - - def test_should_call_helm_upgrade_install_with_default_config( - self, helm_wrapper_mock: MagicMock - ): - pipeline_config = PipelineConfig( - defaults_path=Path("fake"), - environment="development", - broker="broker:9092", - kafka_connect_resetter_config=KafkaConnectResetterHelmConfig( - namespace="test-namespace" - ), - ) - - helm_repo_config = pipeline_config.kafka_connect_resetter_config.helm_config - - handler = ConnectorHandler.from_pipeline_config(pipeline_config) - - handler.clean_connector( - connector_name=CONNECTOR_NAME, - connector_type=KafkaConnectorType.SOURCE, - dry_run=True, - retain_clean_jobs=True, - offset_topic="kafka-connect-offsets", - ) - - helm_wrapper_mock.assert_has_calls( - [ - mock.call.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags( - username=None, - password=None, - ca_file=None, - insecure_skip_tls_verify=False, - ), - ), - mock.call.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, - dry_run=True, - ), - mock.call.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, - namespace="test-namespace", - chart=f"{helm_repo_config.repository_name}/kafka-connect-resetter", - dry_run=True, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - values={ - "connectorType": "source", - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_NAME, - "offsetTopic": "kafka-connect-offsets", - }, - "nameOverride": CONNECTOR_NAME, - }, - ), - ] - ) - - def test_should_call_helm_upgrade_install_and_uninstall_when_clean_connector_with_retain_clean_jobs_true( - self, - log_info_mock: MagicMock, - helm_repo_config: HelmRepoConfig, - helm_wrapper_mock: MagicMock, - ): - values = { - "config": { - "brokers": "127.0.0.1", - "connector": CONNECTOR_NAME, - "offsetTopic": "kafka-connect-offsets", - }, - "connectorType": "source", - "nameOverride": CONNECTOR_NAME, - } - handler = self.create_connector_handler( - ConnectWrapper("test"), helm_repo_config, values - ) - handler.clean_connector( - connector_name=CONNECTOR_NAME, - connector_type=KafkaConnectorType.SOURCE, - dry_run=True, - retain_clean_jobs=True, - offset_topic="kafka-connect-offsets", - ) - - log_info_mock.assert_has_calls( - [ - mock.call.log_info( - magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_NAME}" - ) - ), - mock.call.log_info( - magentaify( - f"Connector Cleanup: deploy Connect {KafkaConnectorType.SOURCE.value} resetter for {CONNECTOR_NAME}" - ) - ), - ] - ) - - helm_wrapper_mock.assert_has_calls( - [ - mock.call.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, - dry_run=True, - ), - mock.call.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, - namespace="test-namespace", - chart=f"{helm_repo_config.repository_name}/kafka-connect-resetter", - dry_run=True, - flags=HelmUpgradeInstallFlags( - version="1.2.3", - wait=True, - wait_for_jobs=True, - ), - values=values, - ), - ] - ) - - def test_should_call_helm_upgrade_install_and_uninstall_when_clean_connector_with_retain_clean_jobs_false( - self, - log_info_mock: MagicMock, - helm_repo_config: HelmRepoConfig, - helm_wrapper_mock: MagicMock, - ): - values = { - "config": { - "brokers": "127.0.0.1", - "connector": CONNECTOR_NAME, - "offsetTopic": "kafka-connect-offsets", - }, - "connectorType": "source", - "nameOverride": CONNECTOR_NAME, - } - handler = self.create_connector_handler( - ConnectWrapper("test"), helm_repo_config, values - ) - handler.clean_connector( - connector_name=CONNECTOR_NAME, - connector_type=KafkaConnectorType.SOURCE, - dry_run=True, - retain_clean_jobs=False, - offset_topic="kafka-connect-offsets", - ) - - log_info_mock.assert_has_calls( - [ - mock.call.log_info( - magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_NAME}" - ) - ), - mock.call.log_info( - magentaify( - f"Connector Cleanup: deploy Connect {KafkaConnectorType.SOURCE.value} resetter for {CONNECTOR_NAME}" - ) - ), - mock.call.log_info( - magentaify("Connector Cleanup: uninstall Kafka Resetter.") - ), - ] - ) - - helm_wrapper_mock.assert_has_calls( - [ - mock.call.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, - dry_run=True, - ), - mock.call.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, - namespace="test-namespace", - chart=f"{helm_repo_config.repository_name}/kafka-connect-resetter", - dry_run=True, - flags=HelmUpgradeInstallFlags( - version="1.2.3", - wait=True, - wait_for_jobs=True, - ), - values=values, - ), - mock.call.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, - dry_run=True, - ), - ] - ) - - def test_should_trim_long_name_when_cleaning_connector( - self, - helm_repo_config: HelmRepoConfig, - helm_wrapper_mock: MagicMock, - ): - long_name = "long-name-which-indicates-trimming-this-trim-is-dirty-and-this-suffix-should-be-gone-after" - values = { - "config": { - "brokers": "127.0.0.1", - "connector": long_name, - "offsetTopic": "kafka-connect-offsets", - }, - "connectorType": "source", - "nameOverride": long_name, - } - connector_wrapper = MagicMock() - handler = self.create_connector_handler( - connector_wrapper, helm_repo_config, values - ) - - handler.clean_connector( - connector_name=long_name, - connector_type=KafkaConnectorType.SOURCE, - dry_run=False, - retain_clean_jobs=False, - ) - helm_wrapper_mock.assert_has_calls( - [ - mock.call.uninstall( - namespace="test-namespace", - release_name="long-name-which-indicates-trimming-this-trim-i-clean", - dry_run=False, - ), - mock.call.upgrade_install( - release_name="long-name-which-indicates-trimming-this-trim-i-clean", - namespace="test-namespace", - chart=f"{helm_repo_config.repository_name}/kafka-connect-resetter", - dry_run=False, - flags=HelmUpgradeInstallFlags( - version="1.2.3", - wait=True, - wait_for_jobs=True, - ), - values=values, - ), - mock.call.uninstall( - namespace="test-namespace", - release_name="long-name-which-indicates-trimming-this-trim-i-clean", - dry_run=False, - ), - ] - ) - - def test_should_log_helm_diff_when_dry_run_and_enabled( - self, - helm_repo_config: HelmRepoConfig, - helm_wrapper_mock: MagicMock, - ): - values = { - "config": { - "brokers": "127.0.0.1", - "connector": CONNECTOR_NAME, - "offsetTopic": "kafka-connect-offsets", - }, - "connectorType": "source", - "nameOverride": CONNECTOR_NAME, - } - - connector_wrapper = MagicMock() - helm_diff = MagicMock() - handler = self.create_connector_handler( - connector_wrapper, helm_repo_config, values, helm_diff - ) - - handler.clean_connector( - connector_name=CONNECTOR_NAME, - connector_type=KafkaConnectorType.SOURCE, - dry_run=True, - retain_clean_jobs=False, - ) - - helm_wrapper_mock.get_manifest.assert_called_once_with( - CONNECTOR_CLEAN_NAME, "test-namespace" - ) - helm_diff.log_helm_diff.assert_called_once() - - @staticmethod - def create_connector_handler( - connector_wrapper: MagicMock | ConnectWrapper, - helm_repo_config: HelmRepoConfig, - values=None, - helm_diff: HelmDiff | MagicMock = HelmDiff(HelmDiffConfig()), - ) -> ConnectorHandler: - if values is None: - values = {} - resetter_helm_config = KafkaConnectResetterHelmConfig( - helm_config=helm_repo_config, - version="1.2.3", - helm_values=values, - namespace="test-namespace", - ) - return ConnectorHandler( - connector_wrapper, - 0, - resetter_helm_config, - "broker:9092", - helm_diff, - ) diff --git a/tests/component_handlers/kafka_connect/test_connect_wrapper.py b/tests/component_handlers/kafka_connect/test_connect_wrapper.py index 832429220..4cbb03792 100644 --- a/tests/component_handlers/kafka_connect/test_connect_wrapper.py +++ b/tests/component_handlers/kafka_connect/test_connect_wrapper.py @@ -26,7 +26,6 @@ class TestConnectorApiWrapper(unittest.TestCase): @pytest.fixture(autouse=True) - @responses.activate def setup(self): config = PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -111,7 +110,7 @@ def test_should_return_correct_response_when_connector_created(self): @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_rais_connector_exists_exception_when_connector_exists( - self, log_warning + self, log_warning: MagicMock ): responses.add( responses.POST, @@ -144,7 +143,9 @@ def test_should_create_correct_get_connector_request(self, mock_get: MagicMock): @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") - def test_should_return_correct_response_when_getting_connector(self, log_info): + def test_should_return_correct_response_when_getting_connector( + self, log_info: MagicMock + ): connector_name = "test-connector" actual_response = { @@ -178,7 +179,9 @@ def test_should_return_correct_response_when_getting_connector(self, log_info): @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") - def test_should_raise_connector_not_found_when_getting_connector(self, log_info): + def test_should_raise_connector_not_found_when_getting_connector( + self, log_info: MagicMock + ): connector_name = "test-connector" responses.add( @@ -198,7 +201,7 @@ def test_should_raise_connector_not_found_when_getting_connector(self, log_info) @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_rebalance_in_progress_when_getting_connector( - self, log_warning + self, log_warning: MagicMock ): connector_name = "test-connector" @@ -245,7 +248,9 @@ def test_should_create_correct_update_connector_request(self, mock_put: MagicMoc @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") - def test_should_return_correct_response_when_update_connector(self, log_info): + def test_should_return_correct_response_when_update_connector( + self, log_info: MagicMock + ): connector_name = "test-connector" actual_response = { @@ -284,7 +289,7 @@ def test_should_return_correct_response_when_update_connector(self, log_info): @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_return_correct_response_when_update_connector_created( - self, log_info + self, log_info: MagicMock ): connector_name = "test-connector" @@ -322,7 +327,7 @@ def test_should_return_correct_response_when_update_connector_created( @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_connector_exists_exception_when_update_connector( - self, log_warning + self, log_warning: MagicMock ): connector_name = "test-connector" @@ -360,7 +365,9 @@ def test_should_create_correct_delete_connector_request( @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") - def test_should_return_correct_response_when_deleting_connector(self, log_info): + def test_should_return_correct_response_when_deleting_connector( + self, log_info: MagicMock + ): connector_name = "test-connector" actual_response = { @@ -394,7 +401,9 @@ def test_should_return_correct_response_when_deleting_connector(self, log_info): @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") - def test_should_raise_connector_not_found_when_deleting_connector(self, log_info): + def test_should_raise_connector_not_found_when_deleting_connector( + self, log_info: MagicMock + ): connector_name = "test-connector" responses.add( @@ -414,7 +423,7 @@ def test_should_raise_connector_not_found_when_deleting_connector(self, log_info @responses.activate @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_rebalance_in_progress_when_deleting_connector( - self, log_warning + self, log_warning: MagicMock ): connector_name = "test-connector" diff --git a/tests/components/test_kafka_app.py b/tests/components/test_kafka_app.py index 2d2935c16..939c8c120 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_kafka_app.py @@ -6,7 +6,10 @@ from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags +from kpops.component_handlers.helm_wrapper.model import ( + HelmRepoConfig, + HelmUpgradeInstallFlags, +) from kpops.components.base_components import KafkaApp DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -28,25 +31,30 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - def test_default_brokers(self, config: PipelineConfig, handlers: ComponentHandlers): + def test_default_configs(self, config: PipelineConfig, handlers: ComponentHandlers): kafka_app = KafkaApp( handlers=handlers, config=config, **{ "type": "streams-app", "name": "example-name", + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": { "outputTopic": "test", "brokers": "fake-broker:9092", }, }, - "version": "1.2.3", }, ) assert kafka_app.app.streams.brokers == "fake-broker:9092" - assert kafka_app.version == "1.2.3" + + assert kafka_app.repo_config == HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", + ) + assert kafka_app.version == "2.7.0" + assert kafka_app.namespace == "test-namespace" def test_should_deploy_kafka_app( self, config: PipelineConfig, handlers: ComponentHandlers, mocker: MockerFixture @@ -57,8 +65,8 @@ def test_should_deploy_kafka_app( **{ "type": "streams-app", "name": "example-name", + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": { "outputTopic": "test", "brokers": "fake-broker:9092", @@ -78,7 +86,6 @@ def test_should_deploy_kafka_app( True, "test-namespace", { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092", "outputTopic": "test"}, }, HelmUpgradeInstallFlags(version="1.2.3"), diff --git a/tests/components/test_kafka_connect.py b/tests/components/test_kafka_connect_sink.py similarity index 51% rename from tests/components/test_kafka_connect.py rename to tests/components/test_kafka_connect_sink.py index 9e192da7e..af4e7e51f 100644 --- a/tests/components/test_kafka_connect.py +++ b/tests/components/test_kafka_connect_sink.py @@ -1,18 +1,20 @@ -import os from pathlib import Path -from unittest.mock import MagicMock +from unittest.mock import MagicMock, call import pytest from pytest_mock import MockerFixture from kpops.cli.pipeline_config import PipelineConfig, TopicNameConfig from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.connector_handler import KafkaConnectorType -from kpops.component_handlers.kafka_connect.model import KafkaConnectConfig -from kpops.components.base_components.kafka_connect import ( - KafkaSinkConnector, - KafkaSourceConnector, +from kpops.component_handlers.helm_wrapper.model import ( + HelmUpgradeInstallFlags, + RepoAuthFlags, ) +from kpops.component_handlers.kafka_connect.model import ( + KafkaConnectConfig, + KafkaConnectorType, +) +from kpops.components import KafkaSinkConnector from kpops.components.base_components.models.from_section import ( FromSection, FromTopic, @@ -23,33 +25,44 @@ TopicConfig, ToSection, ) +from kpops.utils.colorify import magentaify DEFAULTS_PATH = Path(__file__).parent / "resources" +CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop" +CONNECTOR_CLEAN_NAME = "test-connector-with-long-name-0123456789abcdef-clean" -@pytest.fixture -def config(): - return PipelineConfig( - defaults_path=DEFAULTS_PATH, - environment="development", - topic_name_config=TopicNameConfig( - default_error_topic_name="${component_type}-error-topic", - default_output_topic_name="${component_type}-output-topic", - ), - pipeline_prefix="", - ) - +class TestKafkaConnectorSink: + @pytest.fixture + def log_info_mock(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch("kpops.components.base_components.kafka_connect.log.info") + + @pytest.fixture + def config(self) -> PipelineConfig: + return PipelineConfig( + defaults_path=DEFAULTS_PATH, + environment="development", + topic_name_config=TopicNameConfig( + default_error_topic_name="${component_type}-error-topic", + default_output_topic_name="${component_type}-output-topic", + ), + broker="broker:9092", + ) -@pytest.fixture -def handlers() -> ComponentHandlers: - return ComponentHandlers( - schema_handler=MagicMock(), - connector_handler=MagicMock(), - topic_handler=MagicMock(), - ) + @pytest.fixture + def handlers(self) -> ComponentHandlers: + return ComponentHandlers( + schema_handler=MagicMock(), + connector_handler=MagicMock(), + topic_handler=MagicMock(), + ) + @pytest.fixture + def helm_mock(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch( + "kpops.components.base_components.kafka_connect.Helm" + ).return_value -class TestKafkaConnectorSink: def test_connector_config_parsing( self, config: PipelineConfig, handlers: ComponentHandlers ): @@ -59,6 +72,7 @@ def test_connector_config_parsing( handlers=handlers, config=config, app=KafkaConnectConfig(**{"topics": topic_name}), + namespace="test-namespace", ) assert getattr(connector.app, "topics") == topic_name @@ -68,6 +82,7 @@ def test_connector_config_parsing( handlers=handlers, config=config, app=KafkaConnectConfig(**{"topics.regex": topic_pattern}), + namespace="test-namespace", ) assert getattr(connector.app, "topics.regex") == topic_pattern @@ -76,6 +91,7 @@ def test_connector_config_parsing( handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", to=ToSection( topics={ "${error_topic_name}": TopicConfig(type=OutputTopicTypes.ERROR), @@ -97,6 +113,7 @@ def test_from_section_parsing_input_topic( handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", from_=FromSection( topics={ topic1: FromTopic(type=InputTopicTypes.INPUT), @@ -119,6 +136,7 @@ def test_from_section_parsing_input_pattern( handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", from_=FromSection( topics={topic_pattern: FromTopic(type=InputTopicTypes.INPUT_PATTERN)} ), @@ -136,6 +154,7 @@ def test_deploy_order( handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", to=ToSection( topics={ "${output_topic_name}": TopicConfig( @@ -178,6 +197,7 @@ def test_destroy( handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", to=ToSection( topics={ "${output_topic_name}": TopicConfig( @@ -198,17 +218,19 @@ def test_destroy( dry_run=True, ) - def test_clean( + def test_reset( self, config: PipelineConfig, handlers: ComponentHandlers, + helm_mock: MagicMock, mocker: MockerFixture, ): connector = KafkaSinkConnector( - name="test-connector", + name=CONNECTOR_NAME, handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", to=ToSection( topics={ "${output_topic_name}": TopicConfig( @@ -224,36 +246,68 @@ def test_clean( mock_clean_connector = mocker.patch.object( connector.handlers.connector_handler, "clean_connector" ) - mock = mocker.MagicMock() - mock.attach_mock(mock_delete_topics, "mock_delete_topics") mock.attach_mock(mock_clean_connector, "mock_clean_connector") + mock.attach_mock(helm_mock, "helm") + + connector.reset(dry_run=True) - connector.clean(dry_run=True) mock.assert_has_calls( [ - mocker.call.mock_delete_topics(connector.to, dry_run=True), - mocker.call.mock_clean_connector( - connector_name="test-connector", - connector_type=KafkaConnectorType.SINK, + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags(), + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + mocker.call.helm.upgrade_install( + release_name=CONNECTOR_CLEAN_NAME, + namespace="test-namespace", + chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run=True, + flags=HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), + values={ + "connectorType": "sink", + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_NAME, + "deleteConsumerGroup": False, + }, + "nameOverride": CONNECTOR_NAME, + }, + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, dry_run=True, - retain_clean_jobs=False, - delete_consumer_group=True, ), ] ) - def test_reset( + mock_delete_topics.assert_not_called() + + def test_clean( self, config: PipelineConfig, handlers: ComponentHandlers, + helm_mock: MagicMock, + log_info_mock, mocker: MockerFixture, ): connector = KafkaSinkConnector( - name="test-connector", + name=CONNECTOR_NAME, handlers=handlers, config=config, app=KafkaConnectConfig(), + namespace="test-namespace", to=ToSection( topics={ "${output_topic_name}": TopicConfig( @@ -270,270 +324,142 @@ def test_reset( connector.handlers.connector_handler, "clean_connector" ) - connector.reset(dry_run=True) - - mock_clean_connector.assert_called_once_with( - connector_name="test-connector", - connector_type=KafkaConnectorType.SINK, - dry_run=True, - retain_clean_jobs=False, - delete_consumer_group=False, - ) - mock_delete_topics.assert_not_called() - - def test_clean_without_to( - self, - config: PipelineConfig, - handlers: ComponentHandlers, - mocker: MockerFixture, - ): - connector = KafkaSinkConnector( - name="test-connector", - handlers=handlers, - config=config, - app=KafkaConnectConfig(), - ) - - mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" - ) - mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, "clean_connector" - ) + mock = mocker.MagicMock() + mock.attach_mock(mock_delete_topics, "mock_delete_topics") + mock.attach_mock(mock_clean_connector, "mock_clean_connector") + mock.attach_mock(helm_mock, "helm") connector.clean(dry_run=True) - mock_clean_connector.assert_called_once_with( - connector_name="test-connector", - connector_type=KafkaConnectorType.SINK, - dry_run=True, - retain_clean_jobs=False, - delete_consumer_group=True, - ) - mock_delete_topics.assert_not_called() - - -class TestKafkaConnectorSource: - def test_from_section_raises_exception( - self, config: PipelineConfig, handlers: ComponentHandlers - ): - with pytest.raises(NotImplementedError): - KafkaSourceConnector( - name="test-connector", - handlers=handlers, - config=config, - app=KafkaConnectConfig(), - from_=FromSection( - topics={ - "connector-topic": FromTopic(type=InputTopicTypes.INPUT), - } + log_info_mock.assert_has_calls( + [ + call.log_info( + magentaify( + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_NAME}" + ) ), - ) - - def test_deploy_order( - self, - config: PipelineConfig, - handlers: ComponentHandlers, - mocker: MockerFixture, - ): - connector = KafkaSourceConnector( - name="test-connector", - handlers=handlers, - config=config, - app=KafkaConnectConfig(), - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - - mock_create_topics = mocker.patch.object( - connector.handlers.topic_handler, "create_topics" - ) - - mock_create_connector = mocker.patch.object( - connector.handlers.connector_handler, "create_connector" + call.log_info( + magentaify( + f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_NAME}" + ) + ), + call.log_info( + magentaify("Connector Cleanup: uninstall Kafka Resetter.") + ), + ] ) - mock = mocker.MagicMock() - mock.attach_mock(mock_create_topics, "mock_create_topics") - mock.attach_mock(mock_create_connector, "mock_create_connector") - connector.deploy(dry_run=True) mock.assert_has_calls( [ - mocker.call.mock_create_topics(to_section=connector.to, dry_run=True), - mocker.call.mock_create_connector( - connector_name="test-connector", - kafka_connect_config=connector.app, + mocker.call.mock_delete_topics(connector.to, dry_run=True), + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags(), + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, dry_run=True, ), - ], - ) - - def test_destroy( - self, - config: PipelineConfig, - handlers: ComponentHandlers, - mocker: MockerFixture, - ): - os.environ[ - "KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC" - ] = "kafka-connect-offsets" - connector = KafkaSourceConnector( - name="test-connector", - handlers=handlers, - config=config, - app=KafkaConnectConfig(), - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + mocker.call.helm.upgrade_install( + release_name=CONNECTOR_CLEAN_NAME, + namespace="test-namespace", + chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run=True, + flags=HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, ), - } - ), - ) - assert connector.handlers.connector_handler - - mock_destroy_connector = mocker.patch.object( - connector.handlers.connector_handler, "destroy_connector" - ) - - connector.destroy(dry_run=True) - - mock_destroy_connector.assert_called_once_with( - connector_name="test-connector", - dry_run=True, + values={ + "connectorType": "sink", + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_NAME, + "deleteConsumerGroup": True, + }, + "nameOverride": CONNECTOR_NAME, + }, + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + ] ) - def test_clean( + def test_clean_without_to( self, config: PipelineConfig, handlers: ComponentHandlers, + helm_mock: MagicMock, mocker: MockerFixture, ): - os.environ[ - "KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC" - ] = "kafka-connect-offsets" - connector = KafkaSourceConnector( - name="test-connector", + connector = KafkaSinkConnector( + name=CONNECTOR_NAME, handlers=handlers, config=config, app=KafkaConnectConfig(), - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), + namespace="test-namespace", ) - assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( connector.handlers.topic_handler, "delete_topics" ) - mock_clean_connector = mocker.spy( + mock_clean_connector = mocker.patch.object( connector.handlers.connector_handler, "clean_connector" ) - mock = mocker.MagicMock() mock.attach_mock(mock_delete_topics, "mock_delete_topics") mock.attach_mock(mock_clean_connector, "mock_clean_connector") + mock.attach_mock(helm_mock, "helm") connector.clean(dry_run=True) mock.assert_has_calls( [ - mocker.call.mock_delete_topics(connector.to, dry_run=True), - mocker.call.mock_clean_connector( - connector_name="test-connector", - connector_type=KafkaConnectorType.SOURCE, + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags( + username=None, + password=None, + ca_file=None, + insecure_skip_tls_verify=False, + ), + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, dry_run=True, - retain_clean_jobs=False, - offset_topic="kafka-connect-offsets", ), - ] - ) - - def test_clean_without_to( - self, - config: PipelineConfig, - handlers: ComponentHandlers, - mocker: MockerFixture, - ): - os.environ[ - "KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC" - ] = "kafka-connect-offsets" - connector = KafkaSourceConnector( - name="test-connector", - handlers=handlers, - config=config, - app=KafkaConnectConfig(), - ) - assert connector.to is None - - assert connector.handlers.connector_handler - - mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" - ) - mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, "clean_connector" - ) - - connector.clean(dry_run=True) - mock_clean_connector.assert_called_once_with( - connector_name="test-connector", - connector_type=KafkaConnectorType.SOURCE, - dry_run=True, - retain_clean_jobs=False, - offset_topic="kafka-connect-offsets", - ) - - mock_delete_topics.assert_not_called() - - def test_reset( - self, - config: PipelineConfig, - handlers: ComponentHandlers, - mocker: MockerFixture, - ): - os.environ[ - "KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC" - ] = "kafka-connect-offsets" - connector = KafkaSourceConnector( - name="test-connector", - handlers=handlers, - config=config, - app=KafkaConnectConfig(), - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + mocker.call.helm.upgrade_install( + release_name=CONNECTOR_CLEAN_NAME, + namespace="test-namespace", + chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run=True, + flags=HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, ), - } - ), - ) - - assert connector.handlers.connector_handler - - mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" - ) - mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, "clean_connector" - ) - - connector.reset(dry_run=True) - - mock_clean_connector.assert_called_once_with( - connector_name="test-connector", - connector_type=KafkaConnectorType.SOURCE, - dry_run=True, - retain_clean_jobs=False, - offset_topic="kafka-connect-offsets", + values={ + "connectorType": "sink", + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_NAME, + "deleteConsumerGroup": True, + }, + "nameOverride": CONNECTOR_NAME, + }, + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + ] ) mock_delete_topics.assert_not_called() diff --git a/tests/components/test_kafka_connect_source.py b/tests/components/test_kafka_connect_source.py new file mode 100644 index 000000000..cd4e14320 --- /dev/null +++ b/tests/components/test_kafka_connect_source.py @@ -0,0 +1,387 @@ +import os +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from kpops.cli.pipeline_config import PipelineConfig, TopicNameConfig +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.helm_wrapper.model import ( + HelmUpgradeInstallFlags, + RepoAuthFlags, +) +from kpops.component_handlers.kafka_connect.model import KafkaConnectConfig +from kpops.components.base_components.kafka_connect import KafkaSourceConnector +from kpops.components.base_components.models.from_section import ( + FromSection, + FromTopic, + InputTopicTypes, +) +from kpops.components.base_components.models.to_section import ( + OutputTopicTypes, + TopicConfig, + ToSection, +) + +DEFAULTS_PATH = Path(__file__).parent / "resources" +CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop" +CONNECTOR_CLEAN_NAME = "test-connector-with-long-name-0123456789abcdef-clean" + + +class TestKafkaConnectorSource: + @pytest.fixture + def config(slef) -> PipelineConfig: + return PipelineConfig( + defaults_path=DEFAULTS_PATH, + environment="development", + topic_name_config=TopicNameConfig( + default_error_topic_name="${component_type}-error-topic", + default_output_topic_name="${component_type}-output-topic", + ), + pipeline_prefix="", + broker="broker:9092", + ) + + @pytest.fixture + def handlers(self) -> ComponentHandlers: + return ComponentHandlers( + schema_handler=MagicMock(), + connector_handler=MagicMock(), + topic_handler=MagicMock(), + ) + + @pytest.fixture + def helm_mock(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch( + "kpops.components.base_components.kafka_connect.Helm" + ).return_value + + def test_from_section_raises_exception( + self, config: PipelineConfig, handlers: ComponentHandlers + ): + with pytest.raises(NotImplementedError): + KafkaSourceConnector( + name="test-connector", + handlers=handlers, + config=config, + app=KafkaConnectConfig(), + namespace="test-namespace", + from_=FromSection( + topics={ + "connector-topic": FromTopic(type=InputTopicTypes.INPUT), + } + ), + ) + + def test_deploy_order( + self, + config: PipelineConfig, + handlers: ComponentHandlers, + mocker: MockerFixture, + ): + connector = KafkaSourceConnector( + name="test-connector", + handlers=handlers, + config=config, + app=KafkaConnectConfig(), + namespace="test-namespace", + to=ToSection( + topics={ + "${output_topic_name}": TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + ), + ) + + mock_create_topics = mocker.patch.object( + connector.handlers.topic_handler, "create_topics" + ) + + mock_create_connector = mocker.patch.object( + connector.handlers.connector_handler, "create_connector" + ) + + mock = mocker.MagicMock() + mock.attach_mock(mock_create_topics, "mock_create_topics") + mock.attach_mock(mock_create_connector, "mock_create_connector") + connector.deploy(dry_run=True) + mock.assert_has_calls( + [ + mocker.call.mock_create_topics(to_section=connector.to, dry_run=True), + mocker.call.mock_create_connector( + connector_name="test-connector", + kafka_connect_config=connector.app, + dry_run=True, + ), + ], + ) + + def test_destroy( + self, + config: PipelineConfig, + handlers: ComponentHandlers, + mocker: MockerFixture, + ): + os.environ[ + "KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC" + ] = "kafka-connect-offsets" + connector = KafkaSourceConnector( + name="test-connector", + handlers=handlers, + config=config, + app=KafkaConnectConfig(), + namespace="test-namespace", + to=ToSection( + topics={ + "${output_topic_name}": TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + ), + ) + assert connector.handlers.connector_handler + + mock_destroy_connector = mocker.patch.object( + connector.handlers.connector_handler, "destroy_connector" + ) + + connector.destroy(dry_run=True) + + mock_destroy_connector.assert_called_once_with( + connector_name="test-connector", + dry_run=True, + ) + + def test_reset( + self, + config: PipelineConfig, + handlers: ComponentHandlers, + helm_mock: MagicMock, + mocker: MockerFixture, + ): + connector = KafkaSourceConnector( + name=CONNECTOR_NAME, + handlers=handlers, + config=config, + app=KafkaConnectConfig(), + namespace="test-namespace", + offset_topic="kafka-connect-offsets", + to=ToSection( + topics={ + "${output_topic_name}": TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + ), + ) + + assert connector.handlers.connector_handler + + mock_delete_topics = mocker.patch.object( + connector.handlers.topic_handler, "delete_topics" + ) + mock_clean_connector = mocker.spy( + connector.handlers.connector_handler, "clean_connector" + ) + + mock = mocker.MagicMock() + mock.attach_mock(mock_clean_connector, "mock_clean_connector") + mock.attach_mock(helm_mock, "helm") + + connector.reset(dry_run=True) + + mock.assert_has_calls( + [ + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags(), + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + mocker.call.helm.upgrade_install( + release_name=CONNECTOR_CLEAN_NAME, + namespace="test-namespace", + chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run=True, + flags=HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), + values={ + "connectorType": "source", + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_NAME, + "offsetTopic": "kafka-connect-offsets", + }, + "nameOverride": CONNECTOR_NAME, + }, + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + ] + ) + mock_delete_topics.assert_not_called() + + def test_clean( + self, + config: PipelineConfig, + handlers: ComponentHandlers, + helm_mock: MagicMock, + mocker: MockerFixture, + ): + connector = KafkaSourceConnector( + name=CONNECTOR_NAME, + handlers=handlers, + config=config, + app=KafkaConnectConfig(), + namespace="test-namespace", + offset_topic="kafka-connect-offsets", + to=ToSection( + topics={ + "${output_topic_name}": TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + ), + ) + assert connector.handlers.connector_handler + + mock_delete_topics = mocker.patch.object( + connector.handlers.topic_handler, "delete_topics" + ) + mock_clean_connector = mocker.spy( + connector.handlers.connector_handler, "clean_connector" + ) + + mock = mocker.MagicMock() + mock.attach_mock(mock_delete_topics, "mock_delete_topics") + mock.attach_mock(mock_clean_connector, "mock_clean_connector") + mock.attach_mock(helm_mock, "helm") + + connector.clean(dry_run=True) + + mock.assert_has_calls( + [ + mocker.call.mock_delete_topics(connector.to, dry_run=True), + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags(), + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + mocker.call.helm.upgrade_install( + release_name=CONNECTOR_CLEAN_NAME, + namespace="test-namespace", + chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run=True, + flags=HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), + values={ + "connectorType": "source", + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_NAME, + "offsetTopic": "kafka-connect-offsets", + }, + "nameOverride": CONNECTOR_NAME, + }, + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + ] + ) + + def test_clean_without_to( + self, + config: PipelineConfig, + handlers: ComponentHandlers, + helm_mock: MagicMock, + mocker: MockerFixture, + ): + connector = KafkaSourceConnector( + name=CONNECTOR_NAME, + handlers=handlers, + config=config, + app=KafkaConnectConfig(), + namespace="test-namespace", + offset_topic="kafka-connect-offsets", + ) + assert connector.to is None + + assert connector.handlers.connector_handler + + mock_delete_topics = mocker.patch.object( + connector.handlers.topic_handler, "delete_topics" + ) + mock_clean_connector = mocker.spy( + connector.handlers.connector_handler, "clean_connector" + ) + + mock = mocker.MagicMock() + mock.attach_mock(mock_delete_topics, "mock_delete_topics") + mock.attach_mock(mock_clean_connector, "mock_clean_connector") + mock.attach_mock(helm_mock, "helm") + + connector.clean(dry_run=True) + + mock.assert_has_calls( + [ + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags(), + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + mocker.call.helm.upgrade_install( + release_name=CONNECTOR_CLEAN_NAME, + namespace="test-namespace", + chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run=True, + flags=HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), + values={ + "connectorType": "source", + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_NAME, + "offsetTopic": "kafka-connect-offsets", + }, + "nameOverride": CONNECTOR_NAME, + }, + ), + mocker.call.helm.uninstall( + namespace="test-namespace", + release_name=CONNECTOR_CLEAN_NAME, + dry_run=True, + ), + ] + ) + mock_delete_topics.assert_not_called() diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index dc2ad2bcf..30733b83f 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -21,6 +21,10 @@ DEFAULTS_PATH = Path(__file__).parent / "resources" +class KubernetesTestValue(KubernetesAppConfig): + name_override: str + + class TestKubernetesApp: @pytest.fixture def config(self) -> PipelineConfig: @@ -48,21 +52,26 @@ def helm_mock(self, mocker: MockerFixture) -> MagicMock: def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.kubernetes_app.log.info") + @pytest.fixture + def app_value(self) -> KubernetesTestValue: + return KubernetesTestValue(**{"name_override": "test-value"}) + def test_should_lazy_load_helm_wrapper_and_not_repo_add( self, config: PipelineConfig, handlers: ComponentHandlers, mocker: MockerFixture, helm_mock: MagicMock, + app_value: KubernetesTestValue, ): - app_config = KubernetesAppConfig(namespace="test-namespace") kubernetes_app = KubernetesApp( _type="test", + app=app_value, handlers=handlers, - app=app_config, config=config, name="test-kubernetes-apps", + namespace="test-namespace", ) mocker.patch.object( @@ -78,7 +87,7 @@ def test_should_lazy_load_helm_wrapper_and_not_repo_add( "test/test-chart", True, "test-namespace", - {"namespace": "test-namespace"}, + {"nameOverride": "test-value"}, HelmUpgradeInstallFlags(), ) @@ -88,25 +97,20 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( handlers: ComponentHandlers, helm_mock: MagicMock, mocker: MockerFixture, + app_value: KubernetesTestValue, ): - app_config = KubernetesAppConfig(namespace="test-namespace") - + repo_config = HelmRepoConfig(repository_name="test-repo", url="mock://test") kubernetes_app = KubernetesApp( _type="test", + app=app_value, handlers=handlers, - app=app_config, config=config, name="test-kubernetes-apps", + namespace="test-namespace", + repo_config=repo_config, version="3.4.5", ) - repo_config = HelmRepoConfig(repository_name="test-repo", url="mock://test") - mocker.patch( - "kpops.components.base_components.kubernetes_app.KubernetesApp.helm_repo_config", - return_value=repo_config, - new_callable=mocker.PropertyMock, - ) - mocker.patch.object( kubernetes_app, "get_helm_chart", @@ -128,7 +132,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( "test/test-chart", True, "test-namespace", - {"namespace": "test-namespace"}, + {"nameOverride": "test-value"}, HelmUpgradeInstallFlags(version="3.4.5"), ), ] @@ -140,15 +144,16 @@ def test_should_print_helm_diff_after_install_when_dry_run_and_helm_diff_enabled handlers: ComponentHandlers, helm_mock: MagicMock, mocker: MagicMock, + app_value: KubernetesTestValue, ): - app_config = KubernetesAppConfig(namespace="test-namespace") kubernetes_app = KubernetesApp( _type="test", + app=app_value, handlers=handlers, - app=app_config, config=config, name="test-kubernetes-apps", + namespace="test-namespace", ) mocker.patch.object( kubernetes_app, "get_helm_chart", return_value="test/test-chart" @@ -160,16 +165,19 @@ def test_should_print_helm_diff_after_install_when_dry_run_and_helm_diff_enabled ) def test_should_raise_not_implemented_error_when_helm_chart_is_not_set( - self, config: PipelineConfig, handlers: ComponentHandlers + self, + config: PipelineConfig, + handlers: ComponentHandlers, + app_value: KubernetesTestValue, ): - app_config = KubernetesAppConfig(namespace="test-namespace") kubernetes_app = KubernetesApp( _type="test", + app=app_value, handlers=handlers, - app=app_config, config=config, name="test-kubernetes-apps", + namespace="test-namespace", ) with pytest.raises(NotImplementedError) as error: @@ -185,15 +193,16 @@ def test_should_call_helm_uninstall_when_destroying_kubernetes_app( handlers: ComponentHandlers, helm_mock: MagicMock, log_info_mock: MagicMock, + app_value: KubernetesTestValue, ): - app_config = KubernetesAppConfig(namespace="test-namespace") kubernetes_app = KubernetesApp( _type="test", + app=app_value, handlers=handlers, - app=app_config, config=config, name="test-kubernetes-apps", + namespace="test-namespace", ) stdout = 'KubernetesAppComponent - release "test-kubernetes-apps" uninstalled' @@ -208,23 +217,25 @@ def test_should_call_helm_uninstall_when_destroying_kubernetes_app( log_info_mock.assert_called_once_with(magentaify(stdout)) def test_should_raise_value_error_when_name_is_not_valid( - self, config: PipelineConfig, handlers: ComponentHandlers + self, + config: PipelineConfig, + handlers: ComponentHandlers, + app_value: KubernetesTestValue, ): - app_config = KubernetesAppConfig(namespace="test") assert KubernetesApp( _type="test", + app=app_value, handlers=handlers, - app=app_config, config=config, name="example-component-with-very-long-name-longer-than-most-of-our-kubernetes-apps", + namespace="test-namespace", ) with pytest.raises(ValueError): assert KubernetesApp( _type="test", handlers=handlers, - app=app_config, config=config, name="Not-Compatible*", ) @@ -233,7 +244,6 @@ def test_should_raise_value_error_when_name_is_not_valid( assert KubernetesApp( _type="test", handlers=handlers, - app=app_config, config=config, name="snake_case", ) diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index f79cbc4a0..eb31944b8 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -54,8 +54,8 @@ def producer_app( "type": "producer-app", "name": self.PRODUCER_APP_NAME, "version": "2.4.2", + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "clean_schemas": True, @@ -76,6 +76,7 @@ def test_output_topics(self, config: PipelineConfig, handlers: ComponentHandlers **{ "type": "producer-app", "name": self.PRODUCER_APP_NAME, + "namespace": "test-namespace", "app": { "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, @@ -129,7 +130,6 @@ def test_deploy_order( True, "test-namespace", { - "namespace": "test-namespace", "streams": { "brokers": "fake-broker:9092", "outputTopic": "producer-output-topic", @@ -234,7 +234,6 @@ def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up( True, "test-namespace", { - "namespace": "test-namespace", "streams": { "brokers": "fake-broker:9092", "outputTopic": "producer-output-topic", diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 740e051df..c7f1f5db7 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -54,9 +54,8 @@ def streams_app( **{ "type": "streams-app", "name": self.STREAMS_APP_NAME, - "version": "2.4.2", + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "to": { @@ -70,17 +69,14 @@ def streams_app( ) def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): - class AnotherType(StreamsApp): - _type = "test" - - streams_app = AnotherType( + streams_app = StreamsApp( handlers=handlers, config=config, **{ - "type": "test", + "type": "streams-app", "name": self.STREAMS_APP_NAME, + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "from": { @@ -119,8 +115,8 @@ def test_no_empty_input_topic( **{ "type": "test", "name": self.STREAMS_APP_NAME, + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "from": { @@ -151,8 +147,8 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle **{ "type": "streams-app", "name": self.STREAMS_APP_NAME, + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "from": { @@ -172,8 +168,8 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle **{ "type": "streams-app", "name": self.STREAMS_APP_NAME, + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "from": {"topics": {"example.*": {"type": "extra-pattern"}}}, @@ -189,8 +185,8 @@ def test_set_streams_output_from_to( **{ "type": "streams-app", "name": self.STREAMS_APP_NAME, + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "to": { @@ -231,8 +227,8 @@ def test_weave_inputs_from_prev_component( **{ "type": "streams-app", "name": self.STREAMS_APP_NAME, + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, }, @@ -267,9 +263,8 @@ def test_deploy_order( **{ "type": "streams-app", "name": self.STREAMS_APP_NAME, - "version": "2.4.2", + "namespace": "test-namespace", "app": { - "namespace": "test-namespace", "streams": {"brokers": "fake-broker:9092"}, }, "to": { @@ -316,7 +311,6 @@ def test_deploy_order( True, "test-namespace", { - "namespace": "test-namespace", "streams": { "brokers": "fake-broker:9092", "outputTopic": "streams-app-output-topic", @@ -332,7 +326,7 @@ def test_deploy_order( insecure_skip_tls_verify=False, ), timeout="5m0s", - version="2.4.2", + version="2.7.0", wait=True, wait_for_jobs=False, ), @@ -373,7 +367,6 @@ def test_reset(self, streams_app: StreamsApp, mocker: MockerFixture): True, "test-namespace", { - "namespace": "test-namespace", "streams": { "brokers": "fake-broker:9092", "outputTopic": "streams-app-output-topic", @@ -381,7 +374,7 @@ def test_reset(self, streams_app: StreamsApp, mocker: MockerFixture): }, }, HelmUpgradeInstallFlags( - version="2.4.2", wait=True, wait_for_jobs=True + version="2.7.0", wait=True, wait_for_jobs=True ), ), mocker.call.helm_uninstall( @@ -417,7 +410,6 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( True, "test-namespace", { - "namespace": "test-namespace", "streams": { "brokers": "fake-broker:9092", "outputTopic": "streams-app-output-topic", @@ -425,7 +417,7 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( }, }, HelmUpgradeInstallFlags( - version="2.4.2", wait=True, wait_for_jobs=True + version="2.7.0", wait=True, wait_for_jobs=True ), ), mocker.call.helm_uninstall( diff --git a/tests/pipeline/resources/defaults.yaml b/tests/pipeline/resources/defaults.yaml index 7ecde6f47..4a2f39152 100644 --- a/tests/pipeline/resources/defaults.yaml +++ b/tests/pipeline/resources/defaults.yaml @@ -1,7 +1,6 @@ kubernetes-app: name: "${component_type}" - app: - namespace: example-namespace + namespace: example-namespace kafka-app: app: @@ -106,6 +105,7 @@ should-inflate: kafka-connect: name: "sink-connector" + namespace: "example-namespace" app: batch.size: "2000" behavior.on.malformed.documents: "warn" diff --git a/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml b/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml index 3283ccb1c..02a28015a 100644 --- a/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml +++ b/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml @@ -10,7 +10,9 @@ topics: example-output: type: output + - type: kafka-sink-connector + namespace: example-namespace name: es-sink-connector app: connector.class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector diff --git a/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml b/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml index d14fd0308..035691c2e 100644 --- a/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml +++ b/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml @@ -1,4 +1,3 @@ kubernetes-app: name: "${component_type}-development" - app: - namespace: development-namespace + namespace: development-namespace diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml index 01bf088bf..f718360d0 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml @@ -1,7 +1,6 @@ kubernetes-app: name: "${component_type}" - app: - namespace: example-namespace + namespace: example-namespace kafka-app: app: streams: diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml index d14fd0308..035691c2e 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml @@ -1,4 +1,3 @@ kubernetes-app: name: "${component_type}-development" - app: - namespace: development-namespace + namespace: development-namespace diff --git a/tests/pipeline/snapshots/snap_test_pipeline.py b/tests/pipeline/snapshots/snap_test_pipeline.py index a7754e9a0..5ba85729b 100644 --- a/tests/pipeline/snapshots/snap_test_pipeline.py +++ b/tests/pipeline/snapshots/snap_test_pipeline.py @@ -11,7 +11,6 @@ { "app": { "nameOverride": "resources-custom-config-app1", - "namespace": "development-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -21,6 +20,12 @@ }, }, "name": "resources-custom-config-app1", + "namespace": "development-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -32,13 +37,13 @@ }, }, "type": "producer", + "version": "2.7.0", }, { "app": { "image": "some-image", "labels": {"pipeline": "resources-custom-config"}, "nameOverride": "resources-custom-config-app2", - "namespace": "development-namespace", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", "errorTopic": "resources-custom-config-app2-error", @@ -48,6 +53,12 @@ }, }, "name": "resources-custom-config-app2", + "namespace": "development-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -64,6 +75,7 @@ }, }, "type": "streams-app", + "version": "2.7.0", }, ] } @@ -76,7 +88,6 @@ "image": "example-registry/fake-image", "imageTag": "0.0.1", "nameOverride": "resources-pipeline-with-inflate-scheduled-producer", - "namespace": "example-namespace", "schedule": "30 3/8 * * *", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -86,6 +97,12 @@ }, }, "name": "resources-pipeline-with-inflate-scheduled-producer", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {"com/bakdata/kafka/fake": "1.0.0"}, "topics": { @@ -111,7 +128,6 @@ }, "commandLine": {"CONVERT_XML": True}, "nameOverride": "resources-pipeline-with-inflate-converter", - "namespace": "example-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -127,6 +143,12 @@ }, }, "name": "resources-pipeline-with-inflate-converter", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -163,7 +185,6 @@ "image": "fake-registry/filter", "imageTag": "2.4.1", "nameOverride": "resources-pipeline-with-inflate-should-inflate", - "namespace": "example-namespace", "replicaCount": 4, "resources": {"requests": {"memory": "3G"}}, "streams": { @@ -178,6 +199,12 @@ }, }, "name": "resources-pipeline-with-inflate-should-inflate", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -214,6 +241,7 @@ "transforms.changeTopic.replacement": "resources-pipeline-with-inflate-should-inflate-index-v1", }, "name": "resources-pipeline-with-inflate-sink-connector", + "namespace": "example-namespace", }, ] } @@ -224,7 +252,6 @@ "app": { "image": "fake-image", "nameOverride": "resources-kafka-connect-sink-streams-app", - "namespace": "example-namespace", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", "config": { @@ -238,6 +265,12 @@ }, "from": {"topics": {"example-topic": {"type": "input"}}}, "name": "resources-kafka-connect-sink-streams-app", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -269,7 +302,15 @@ "topics": "example-output", }, "name": "resources-kafka-connect-sink-es-sink-connector", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-kafka-connect-resetter", + "url": "https://bakdata.github.io/kafka-connect-resetter/", + }, + "resetterValues": {}, "type": "kafka-sink-connector", + "version": "1.0.4", }, ] } @@ -282,7 +323,6 @@ "image": "example-registry/fake-image", "imageTag": "0.0.1", "nameOverride": "resources-first-pipeline-scheduled-producer", - "namespace": "example-namespace", "schedule": "30 3/8 * * *", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -292,6 +332,12 @@ }, }, "name": "resources-first-pipeline-scheduled-producer", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {"com/bakdata/kafka/fake": "1.0.0"}, "topics": { @@ -317,7 +363,6 @@ }, "commandLine": {"CONVERT_XML": True}, "nameOverride": "resources-first-pipeline-converter", - "namespace": "example-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -331,6 +376,12 @@ }, }, "name": "resources-first-pipeline-converter", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -369,7 +420,6 @@ "image": "fake-registry/filter", "imageTag": "2.4.1", "nameOverride": "resources-first-pipeline-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name", - "namespace": "example-namespace", "replicaCount": 4, "resources": {"requests": {"memory": "3G"}}, "streams": { @@ -384,6 +434,12 @@ }, }, "name": "resources-first-pipeline-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -412,7 +468,6 @@ "app": { "commandLine": {"CONVERT_XML": True}, "nameOverride": "resources-no-input-topic-pipeline-streams-app", - "namespace": "example-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -427,6 +482,12 @@ }, "from": {"topics": {".*": {"type": "input-pattern"}}}, "name": "resources-no-input-topic-pipeline-streams-app", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -445,7 +506,6 @@ { "app": { "nameOverride": "resources-no-input-topic-pipeline-streams-app", - "namespace": "example-namespace", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", "config": { @@ -461,6 +521,12 @@ }, }, "name": "resources-no-input-topic-pipeline-streams-app", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -494,7 +560,6 @@ "app": { "image": "fake-image", "nameOverride": "resources-no-user-defined-components-streams-app", - "namespace": "example-namespace", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", "config": { @@ -508,6 +573,12 @@ }, "from": {"topics": {"example-topic": {"type": "input"}}}, "name": "resources-no-user-defined-components-streams-app", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -534,7 +605,6 @@ "image": "example-registry/fake-image", "imageTag": "0.0.1", "nameOverride": "resources-pipeline-with-envs-scheduled-producer", - "namespace": "example-namespace", "schedule": "30 3/8 * * *", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -544,6 +614,12 @@ }, }, "name": "resources-pipeline-with-envs-scheduled-producer", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {"com/bakdata/kafka/fake": "1.0.0"}, "topics": { @@ -569,7 +645,6 @@ }, "commandLine": {"CONVERT_XML": True}, "nameOverride": "resources-pipeline-with-envs-converter", - "namespace": "example-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -583,6 +658,12 @@ }, }, "name": "resources-pipeline-with-envs-converter", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -619,7 +700,6 @@ "image": "fake-registry/filter", "imageTag": "2.4.1", "nameOverride": "resources-pipeline-with-envs-filter", - "namespace": "example-namespace", "replicaCount": 4, "resources": {"requests": {"memory": "3G"}}, "streams": { @@ -634,6 +714,12 @@ }, }, "name": "resources-pipeline-with-envs-filter", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -668,7 +754,6 @@ "app_type": "scheduled-producer", }, "nameOverride": "resources-component-type-substitution-scheduled-producer", - "namespace": "example-namespace", "schedule": "30 3/8 * * *", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -678,6 +763,12 @@ }, }, "name": "resources-component-type-substitution-scheduled-producer", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {"com/bakdata/kafka/fake": "1.0.0"}, "topics": { @@ -703,7 +794,6 @@ }, "commandLine": {"CONVERT_XML": True}, "nameOverride": "resources-component-type-substitution-converter", - "namespace": "example-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -719,6 +809,12 @@ }, }, "name": "resources-component-type-substitution-converter", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -756,7 +852,6 @@ "imageTag": "2.4.1", "label": {"app_name": "filter-app", "app_type": "filter"}, "nameOverride": "resources-component-type-substitution-filter-app", - "namespace": "example-namespace", "replicaCount": 4, "resources": {"requests": {"memory": "3G"}}, "streams": { @@ -771,6 +866,12 @@ }, }, "name": "resources-component-type-substitution-filter-app", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -798,7 +899,6 @@ { "app": { "nameOverride": "resources-custom-config-app1", - "namespace": "development-namespace", "resources": {"limits": {"memory": "2G"}, "requests": {"memory": "2G"}}, "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", @@ -808,6 +908,12 @@ }, }, "name": "resources-custom-config-app1", + "namespace": "development-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -819,13 +925,13 @@ }, }, "type": "producer", + "version": "2.7.0", }, { "app": { "image": "some-image", "labels": {"pipeline": "resources-custom-config"}, "nameOverride": "resources-custom-config-app2", - "namespace": "development-namespace", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", "errorTopic": "app2-dead-letter-topic", @@ -835,6 +941,12 @@ }, }, "name": "resources-custom-config-app2", + "namespace": "development-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -851,6 +963,7 @@ }, }, "type": "streams-app", + "version": "2.7.0", }, ] } @@ -861,7 +974,6 @@ "app": { "image": "fake-image", "nameOverride": "resources-kafka-connect-sink-streams-app-development", - "namespace": "development-namespace", "streams": { "brokers": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", "config": { @@ -875,6 +987,12 @@ }, "from": {"topics": {"example-topic": {"type": "input"}}}, "name": "resources-kafka-connect-sink-streams-app-development", + "namespace": "development-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/", + }, "to": { "models": {}, "topics": { @@ -888,6 +1006,7 @@ }, }, "type": "streams-app", + "version": "2.7.0", }, { "app": { @@ -905,7 +1024,15 @@ "topics": "example-output", }, "name": "resources-kafka-connect-sink-es-sink-connector", + "namespace": "example-namespace", + "repoConfig": { + "repoAuthFlags": {"insecureSkipTlsVerify": False}, + "repositoryName": "bakdata-kafka-connect-resetter", + "url": "https://bakdata.github.io/kafka-connect-resetter/", + }, + "resetterValues": {}, "type": "kafka-sink-connector", + "version": "1.0.4", }, ] } diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 9f233cd9b..05060ef11 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -42,6 +42,7 @@ def inflate(self) -> list[PipelineComponent]: if topic_config.type == OutputTopicTypes.OUTPUT: kafka_connector = KafkaSinkConnector( name="sink-connector", + namespace="example-namespace", handlers=self.handlers, config=self.config, app={ diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index 9b128bf1c..4e23782ad 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -26,6 +26,7 @@ def inflate(self) -> list[PipelineComponent]: if topic_config.type == OutputTopicTypes.OUTPUT: kafka_connector = KafkaSinkConnector( name="sink-connector", + namespace="example-namespace", handlers=self.handlers, config=self.config, app=KafkaConnectConfig(