Skip to content

Commit

Permalink
Refactor component configs (#63)
Browse files Browse the repository at this point in the history
Closes #62
  • Loading branch information
raminqaf authored Jan 19, 2023
1 parent 2a095ab commit e198ec7
Show file tree
Hide file tree
Showing 29 changed files with 1,066 additions and 964 deletions.
6 changes: 4 additions & 2 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from kpops.cli.pipeline_config import ENV_PREFIX, PipelineConfig
from kpops.cli.registry import Registry
from kpops.component_handlers import ComponentHandlers
from kpops.component_handlers.kafka_connect.connector_handler import ConnectorHandler
from kpops.component_handlers.kafka_connect.kafka_connect_handler import (
KafkaConnectHandler,
)
from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler
from kpops.component_handlers.topic.handler import TopicHandler
from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper
Expand Down Expand Up @@ -110,7 +112,7 @@ def setup_handlers(
components_module: str | None, config: PipelineConfig
) -> ComponentHandlers:
schema_handler = SchemaHandler.load_schema_handler(components_module, config)
connector_handler = ConnectorHandler.from_pipeline_config(config)
connector_handler = KafkaConnectHandler.from_pipeline_config(config)
proxy_wrapper = ProxyWrapper(config)
topic_handler = TopicHandler(proxy_wrapper)

Expand Down
34 changes: 1 addition & 33 deletions kpops/cli/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
from pydantic import BaseConfig, BaseSettings, Field
from pydantic.env_settings import SettingsSourceCallable

from kpops.component_handlers.helm_wrapper.model import (
HelmConfig,
HelmDiffConfig,
HelmRepoConfig,
)
from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.yaml_loading import load_yaml_file

ENV_PREFIX = "KPOPS_"
Expand All @@ -24,22 +20,6 @@ class TopicNameConfig(BaseSettings):
)


class KafkaConnectResetterHelmConfig(BaseSettings):
helm_config: HelmRepoConfig = Field(
default=HelmRepoConfig(
repository_name="bakdata-kafka-connect-resetter",
url="https://bakdata.github.io/kafka-connect-resetter/",
),
description="Configuration of Kafka connect resetter Helm Chart",
)
version: str = "1.0.4"
helm_values: dict = Field(
default={},
description="Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.",
)
namespace: str = Field(default="")


class PipelineConfig(BaseSettings):
defaults_path: Path = Field(
default=...,
Expand Down Expand Up @@ -91,18 +71,6 @@ class PipelineConfig(BaseSettings):
helm_config: HelmConfig = Field(default=HelmConfig())
helm_diff_config: HelmDiffConfig = Field(default=HelmDiffConfig())

streams_bootstrap_helm_config: HelmRepoConfig = Field(
default=HelmRepoConfig(
repository_name="bakdata-streams-bootstrap",
url="https://bakdata.github.io/streams-bootstrap/",
),
description="Configuration for Streams Bootstrap Helm Charts",
)
kafka_connect_resetter_config: KafkaConnectResetterHelmConfig = Field(
default=KafkaConnectResetterHelmConfig(),
description="Configuration of kafka connect resetter helm chart and values. "
"This is used for cleaning/resettting Kafka connectors, see https://github.com/bakdata/kafka-connect-resetter",
)
retain_clean_jobs: bool = Field(
default=False,
env=f"{ENV_PREFIX}RETAIN_CLEAN_JOBS",
Expand Down
6 changes: 4 additions & 2 deletions kpops/component_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from typing import TYPE_CHECKING

from kpops.component_handlers.kafka_connect.connector_handler import ConnectorHandler
from kpops.component_handlers.kafka_connect.kafka_connect_handler import (
KafkaConnectHandler,
)

if TYPE_CHECKING:
from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler
Expand All @@ -13,7 +15,7 @@ class ComponentHandlers:
def __init__(
self,
schema_handler: SchemaHandler | None,
connector_handler: ConnectorHandler,
connector_handler: KafkaConnectHandler,
topic_handler: TopicHandler,
) -> None:
self.schema_handler = schema_handler
Expand Down
24 changes: 14 additions & 10 deletions kpops/component_handlers/helm_wrapper/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,33 @@
from typing import Iterator

import yaml
from pydantic import BaseModel, Field

from kpops.utils.pydantic import CamelCaseConfig

@dataclass
class HelmDiffConfig:

class HelmDiffConfig(BaseModel):
enable: bool = False
ignore: set[str] = field(
default_factory=set,
)
ignore: set[str] = Field(default_factory=set)


@dataclass
class RepoAuthFlags:
class RepoAuthFlags(BaseModel):
username: str | None = None
password: str | None = None
ca_file: Path | None = None
insecure_skip_tls_verify: bool = False

class Config(CamelCaseConfig):
pass

@dataclass
class HelmRepoConfig:

class HelmRepoConfig(BaseModel):
repository_name: str
url: str
repo_auth_flags: RepoAuthFlags = field(default_factory=RepoAuthFlags)
repo_auth_flags: RepoAuthFlags = Field(default=RepoAuthFlags())

class Config(CamelCaseConfig):
pass


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,29 @@

import logging
import sys
from functools import cached_property
from typing import TYPE_CHECKING

from kpops.component_handlers.helm_wrapper.helm import Helm
from kpops.component_handlers.helm_wrapper.helm_diff import HelmDiff
from kpops.component_handlers.helm_wrapper.model import (
HelmConfig,
HelmUpgradeInstallFlags,
)
from kpops.component_handlers.helm_wrapper.utils import trim_release_name
from kpops.component_handlers.kafka_connect.connect_wrapper import ConnectWrapper
from kpops.component_handlers.kafka_connect.exception import ConnectorNotFoundException
from kpops.component_handlers.kafka_connect.model import (
KafkaConnectConfig,
KafkaConnectorType,
KafkaConnectResetterConfig,
KafkaConnectResetterValues,
)
from kpops.component_handlers.kafka_connect.model import KafkaConnectConfig
from kpops.component_handlers.kafka_connect.timeout import timeout
from kpops.utils.colorify import greenify, magentaify, yellowify
from kpops.utils.dict_differ import render_diff

if TYPE_CHECKING:
from kpops.cli.pipeline_config import KafkaConnectResetterHelmConfig, PipelineConfig
from kpops.cli.pipeline_config import PipelineConfig

log = logging.getLogger("KafkaConnect")
log = logging.getLogger("KafkaConnectHandler")


class ConnectorHandler:
class KafkaConnectHandler:
def __init__(
self,
connect_wrapper: ConnectWrapper,
timeout: int,
kafka_connect_resetter_helm_config: KafkaConnectResetterHelmConfig,
broker: str,
helm_diff: HelmDiff,
helm_config: HelmConfig = HelmConfig(),
):
self._connect_wrapper = connect_wrapper
self._timeout = timeout
self._helm_config = helm_config
self._helm_repo_config = kafka_connect_resetter_helm_config.helm_config
self.helm_diff = helm_diff

helm_repo_config = kafka_connect_resetter_helm_config.helm_config
self.kafka_connect_resseter_chart = (
f"{helm_repo_config.repository_name}/kafka-connect-resetter"
)
self.chart_version = kafka_connect_resetter_helm_config.version
self.namespace = (
kafka_connect_resetter_helm_config.namespace # namespace where the re-setter jobs should be deployed to
)
self.broker = broker
self.values = kafka_connect_resetter_helm_config.helm_values

@cached_property
def helm(self):
helm = Helm(self._helm_config)
helm.add_repo(
self._helm_repo_config.repository_name,
self._helm_repo_config.url,
self._helm_repo_config.repo_auth_flags,
)
return helm

def create_connector(
self,
Expand Down Expand Up @@ -127,55 +86,6 @@ def destroy_connector(self, connector_name: str, dry_run: bool) -> None:
f"Connector Destruction: the connector {connector_name} does not exist. Skipping."
)

def clean_connector(
self,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
retain_clean_jobs: bool,
**kwargs,
) -> None:
"""
Cleans the connector from the cluster. At first, it deletes the previous cleanup job (connector resetter)
to make sure that there is no running clean job in the cluster. Then it releases a cleanup job.
If the retain_clean_jobs flag is set to false the cleanup job will be deleted.
:param connector_name: Name of the connector
:param connector_type: Type of the connector (SINK or SOURCE)
:param dry_run: If the cleanup should be run in dry run mode or not
:param retain_clean_jobs: If the cleanup job should be kept
:param kwargs: Other values for the KafkaConnectResetter
"""
suffix = "-clean"
clean_up_release_name = connector_name + suffix
trimmed_name = trim_release_name(clean_up_release_name, suffix)

log.info(
magentaify(
f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}"
)
)
self.__uninstall_kafka_resetter(trimmed_name, dry_run)

log.info(
magentaify(
f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}"
)
)

stdout = self.__install_kafka_resetter(
trimmed_name, connector_name, connector_type, dry_run, kwargs
)

if dry_run and self.helm_diff.config.enable:
current_release = self.helm.get_manifest(trimmed_name, self.namespace)
new_release = Helm.load_helm_manifest(stdout)
helm_diff = HelmDiff.get_diff(current_release, new_release)
self.helm_diff.log_helm_diff(helm_diff, log)

if not retain_clean_jobs:
log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter."))
self.__uninstall_kafka_resetter(trimmed_name, dry_run)

def __dry_run_connector_creation(
self, connector_name: str, kafka_connect_config: KafkaConnectConfig
) -> None:
Expand Down Expand Up @@ -233,52 +143,11 @@ def __dry_run_connector_deletion(self, connector_name: str) -> None:
f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping."
)

def __install_kafka_resetter(
self,
release_name: str,
connector_name: str,
connector_type: KafkaConnectorType,
dry_run: bool,
kwargs,
) -> str:
return self.helm.upgrade_install(
release_name=release_name,
namespace=self.namespace,
chart=self.kafka_connect_resseter_chart,
dry_run=dry_run,
flags=HelmUpgradeInstallFlags(
version=self.chart_version, wait_for_jobs=True, wait=True
),
values={
**KafkaConnectResetterValues(
config=KafkaConnectResetterConfig(
connector=connector_name,
brokers=self.broker,
**kwargs,
),
connector_type=connector_type.value,
name_override=connector_name,
).dict(),
**self.values,
},
)

def __uninstall_kafka_resetter(self, release_name: str, dry_run: bool) -> None:
self.helm.uninstall(
namespace=self.namespace,
release_name=release_name,
dry_run=dry_run,
)

@classmethod
def from_pipeline_config(
cls, pipeline_config: PipelineConfig
) -> ConnectorHandler: # TODO: annotate as typing.Self once mypy supports it
) -> KafkaConnectHandler: # TODO: annotate as typing.Self once mypy supports it
return cls(
connect_wrapper=ConnectWrapper(host=pipeline_config.kafka_connect_host),
timeout=pipeline_config.timeout,
kafka_connect_resetter_helm_config=pipeline_config.kafka_connect_resetter_config,
helm_config=pipeline_config.helm_config,
broker=pipeline_config.broker,
helm_diff=HelmDiff(pipeline_config.helm_diff_config),
)
10 changes: 9 additions & 1 deletion kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.helm import Helm
from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags
from kpops.component_handlers.helm_wrapper.model import (
HelmRepoConfig,
HelmUpgradeInstallFlags,
)
from kpops.component_handlers.helm_wrapper.utils import trim_release_name
from kpops.components.base_components.kubernetes_app import (
KubernetesApp,
Expand Down Expand Up @@ -39,6 +42,11 @@ class KafkaApp(KubernetesApp):

_type = "kafka-app"
app: KafkaAppConfig
repo_config: HelmRepoConfig = HelmRepoConfig(
repository_name="bakdata-streams-bootstrap",
url="https://bakdata.github.io/streams-bootstrap/",
)
version = "2.7.0"

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
Expand Down
Loading

0 comments on commit e198ec7

Please sign in to comment.