Skip to content

Commit

Permalink
Merge pull request #27 from real-digital/22-kafka-topic-terraform-style
Browse files Browse the repository at this point in the history
Refactoring of topic handling in preparation of #22
  • Loading branch information
MrTrustworthy authored Aug 18, 2019
2 parents bc2b44a + 2de02e6 commit 4e44be2
Show file tree
Hide file tree
Showing 16 changed files with 506 additions and 243 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ wheels/
MANIFEST
*.pyc
*.pyo
pip-wheel-metadata

# Esque specific
esque.cfg
Expand Down
34 changes: 29 additions & 5 deletions esque/broker.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
from typing import List
import operator

from confluent_kafka.admin import ConfigResource

from esque.helpers import unpack_confluent_config
from esque.resource import KafkaResource


class Broker:
def __init__(self, cluster, broker_id):
class Broker(KafkaResource):
def __init__(self, cluster, *, broker_id: int = None, host: str = None, port: int = None):
self.cluster = cluster
self.broker_id = broker_id
self.host = host
self.port = port

@classmethod
def from_id(cls, cluster, broker_id) -> "Broker":
return cls(cluster=cluster, broker_id=broker_id)

@classmethod
def from_attributes(cls, cluster, broker_id: int, host: str, port: int) -> "Broker":
return cls(cluster, broker_id=broker_id, host=host, port=port)

@classmethod
def get_all(cls, cluster) -> List["Broker"]:
metadata = cluster.get_metadata().brokers.values()
brokers = [
cls.from_attributes(cluster, broker_id=broker.id, host=broker.host, port=broker.port)
for broker in metadata
]
return sorted(brokers, key=operator.attrgetter("broker_id"))

def describe(self):
conf = self.cluster.retrieve_config(ConfigResource.Type.BROKER, self.broker_id)
return unpack_confluent_config(conf)
return self.cluster.retrieve_config(ConfigResource.Type.BROKER, self.broker_id)

def as_dict(self):
return {"cluster": self.cluster, "broker_id": self.broker_id, "host": self.host, "port": self.port}
166 changes: 102 additions & 64 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,22 @@
from esque.broker import Broker
from esque.cli.helpers import ensure_approval, HandleFileOnFinished
from esque.cli.options import State, no_verify_option, pass_state
from esque.cli.output import bold, pretty, pretty_topic_diffs, get_output_new_topics, blue_bold, green_bold
from esque.cli.output import (
bold,
pretty,
pretty_topic_diffs,
pretty_new_topic_configs,
blue_bold,
green_bold,
pretty_unchanged_topic_configs,
)
from esque.clients import FileConsumer, FileProducer, AvroFileProducer, AvroFileConsumer, PingConsumer, PingProducer
from esque.cluster import Cluster
from esque.config import PING_TOPIC, Config, PING_GROUP_ID
from esque.consumergroup import ConsumerGroupController
from esque.topic import Topic
from esque.errors import ConsumerGroupDoesNotExistException, ContextNotDefinedException, TopicAlreadyExistsException
from esque.topic import TopicController
from esque.topic_controller import TopicController


@click.group(help="(Kafka-)esque.")
Expand Down Expand Up @@ -62,20 +71,6 @@ def list_contexts(ctx, args, incomplete):
return [context for context in config.available_contexts if context.startswith(incomplete)]


@edit.command("topic")
@click.argument("topic-name", required=True)
@pass_state
def edit_topic(state: State, topic_name: str):
controller = TopicController(state.cluster)
topic = TopicController(state.cluster).get_topic(topic_name)
new_conf = click.edit(topic.to_yaml())
topic.from_yaml(new_conf)
diff = pretty_topic_diffs({topic_name: topic.config_diff()})
click.echo(diff)
if ensure_approval("Are you sure?"):
controller.alter_configs([topic])


@esque.command("ctx", help="Switch clusters.")
@click.argument("context", required=False, default=None, autocompletion=list_contexts)
@pass_state
Expand All @@ -98,54 +93,97 @@ def ctx(state, context):
@no_verify_option
@pass_state
def create_topic(state: State, topic_name: str):
if ensure_approval("Are you sure?", no_verify=state.no_verify):
topic_controller = TopicController(state.cluster)
TopicController(state.cluster).create_topics([(topic_controller.get_topic(topic_name))])
if not ensure_approval("Are you sure?", no_verify=state.no_verify):
click.echo("Aborted")
return

topic_controller = TopicController(state.cluster)
topic_controller.create_topics([Topic(topic_name)])


@edit.command("topic")
@click.argument("topic-name", required=True)
@pass_state
def edit_topic(state: State, topic_name: str):
controller = TopicController(state.cluster)
topic = TopicController(state.cluster).get_cluster_topic(topic_name)
new_conf = click.edit(topic.to_yaml(only_editable=True), extension=".yml")

# edit process can be aborted, ex. in vim via :q!
if new_conf is None:
click.echo("Change aborted")
return

topic.from_yaml(new_conf)
diff = pretty_topic_diffs({topic_name: controller.diff_with_cluster(topic)})
click.echo(diff)
if ensure_approval("Are you sure?"):
controller.alter_configs([topic])


@esque.command("apply", help="Apply a configuration")
@click.option("-f", "--file", help="Config file path", required=True)
@no_verify_option
@pass_state
def apply(state: State, file: str):
# Get topic data based on the YAML
yaml_topic_configs = yaml.safe_load(open(file)).get("topics")
yaml_topics = [Topic.from_dict(conf) for conf in yaml_topic_configs]
yaml_topic_names = [t.name for t in yaml_topics]
if not len(yaml_topic_names) == len(set(yaml_topic_names)):
raise ValueError("Duplicate topic names in the YAML!")

# Get topic data based on the cluster state
topic_controller = TopicController(state.cluster)
yaml_data = yaml.safe_load(open(file))
topic_configs = yaml_data.get("topics")
topics = []
for topic_config in topic_configs:
topics.append(
topic_controller.get_topic(
topic_config.get("name"),
topic_config.get("num_partitions"),
topic_config.get("replication_factor"),
topic_config.get("config"),
)
cluster_topics = topic_controller.list_topics(search_string="|".join(yaml_topic_names))
cluster_topic_names = [t.name for t in cluster_topics]

# Calculate changes
to_create = [yaml_topic for yaml_topic in yaml_topics if yaml_topic.name not in cluster_topic_names]
to_edit = [
yaml_topic
for yaml_topic in yaml_topics
if yaml_topic not in to_create and topic_controller.diff_with_cluster(yaml_topic) != {}
]
to_edit_diffs = {t.name: topic_controller.diff_with_cluster(t) for t in to_edit}
to_ignore = [yaml_topic for yaml_topic in yaml_topics if yaml_topic not in to_create and yaml_topic not in to_edit]

# Sanity check - the 3 groups of topics should be complete and have no overlap
assert (
set(to_create).isdisjoint(set(to_edit))
and set(to_create).isdisjoint(set(to_ignore))
and set(to_edit).isdisjoint(set(to_ignore))
and len(to_create) + len(to_edit) + len(to_ignore) == len(yaml_topics)
)

# Print diffs so the user can check
click.echo(pretty_unchanged_topic_configs(to_ignore))
click.echo(pretty_new_topic_configs(to_create))
click.echo(pretty_topic_diffs(to_edit_diffs))

# Check for actionable changes
if len(to_edit) + len(to_create) == 0:
click.echo("No changes detected, aborting")
return

# Warn users when replication & num_partition changes are attempted
if len(to_edit) > 0:
click.echo(
"Notice: changes to `replication_factor` and `num_partitions` can not be applied on already existing topics"
)
editable_topics = topic_controller.filter_existing_topics(topics)
topics_to_be_changed = [topic for topic in editable_topics if topic.config_diff() != {}]
topic_config_diffs = {topic.name: topic.config_diff() for topic in topics_to_be_changed}

if len(topic_config_diffs) > 0:
click.echo(pretty_topic_diffs(topic_config_diffs))
if ensure_approval("Are you sure to change configs?", no_verify=state.no_verify):
topic_controller.alter_configs(topics_to_be_changed)
click.echo(
click.style(
pretty({"Successfully changed topics": [topic.name for topic in topics_to_be_changed]}), fg="green"
)
)
else:
click.echo("No topics to edit.")

new_topics = [topic for topic in topics if topic not in editable_topics]
if len(new_topics) > 0:
click.echo(get_output_new_topics(new_topics))
if ensure_approval("Are you sure to create the new topics?", no_verify=state.no_verify):
topic_controller.create_topics(new_topics)
click.echo(
click.style(pretty({"Successfully created topics": [topic.name for topic in new_topics]}), fg="green")
)
else:
click.echo("No new topics to create.")

# Get approval
if not ensure_approval("Apply changes?", no_verify=state.no_verify):
click.echo("Cancelling changes")
return

# apply changes
topic_controller.create_topics(to_create)
topic_controller.alter_configs(to_edit)

# output confirmation
changes = {"unchanged": len(to_ignore), "created": len(to_create), "changed": len(to_edit)}
click.echo(click.style(pretty({"Successfully applied changes": changes}), fg="green"))


@delete.command("topic")
Expand All @@ -155,7 +193,7 @@ def apply(state: State, file: str):
def delete_topic(state: State, topic_name: str):
topic_controller = TopicController(state.cluster)
if ensure_approval("Are you sure?", no_verify=state.no_verify):
topic_controller.delete_topic(topic_controller.get_topic(topic_name))
topic_controller.delete_topic(topic_controller.get_cluster_topic(topic_name))

assert topic_name not in topic_controller.list_topics()

Expand All @@ -164,7 +202,7 @@ def delete_topic(state: State, topic_name: str):
@click.argument("topic-name", required=True, type=click.STRING, autocompletion=list_topics)
@pass_state
def describe_topic(state, topic_name):
partitions, config = TopicController(state.cluster).get_topic(topic_name).describe()
partitions, config = TopicController(state.cluster).get_cluster_topic(topic_name).describe()

click.echo(bold(f"Topic: {topic_name}"))

Expand All @@ -190,7 +228,7 @@ def get_offsets(state, topic_name):
@click.argument("broker-id", required=True)
@pass_state
def describe_broker(state, broker_id):
broker = Broker(state.cluster, broker_id).describe()
broker = Broker.from_id(state.cluster, broker_id).describe()
click.echo(pretty(broker, break_lists=True))


Expand All @@ -211,9 +249,9 @@ def describe_consumergroup(state, consumer_id, verbose):
@get.command("brokers")
@pass_state
def get_brokers(state):
brokers = state.cluster.brokers
brokers = Broker.get_all(state.cluster)
for broker in brokers:
click.echo(f"{broker['id']}: {broker['host']}:{broker['port']}")
click.echo(f"{broker.broker_id}: {broker.host}:{broker.port}")


@get.command("consumergroups")
Expand Down Expand Up @@ -316,7 +354,7 @@ def ping(state, times, wait):
deltas = []
try:
try:
topic_controller.create_topics([topic_controller.get_topic(PING_TOPIC)])
topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)])
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

Expand All @@ -334,7 +372,7 @@ def ping(state, times, wait):
except KeyboardInterrupt:
pass
finally:
topic_controller.delete_topic(topic_controller.get_topic(PING_TOPIC))
topic_controller.delete_topic(topic_controller.get_cluster_topic(PING_TOPIC))
click.echo("--- statistics ---")
click.echo(f"{len(deltas)} messages sent/received")
click.echo(f"min/avg/max = {min(deltas):.2f}/{(sum(deltas)/len(deltas)):.2f}/{max(deltas):.2f} ms")
click.echo(f"min/avg/max = {min(deltas):.2f}/{(sum(deltas) / len(deltas)):.2f}/{max(deltas):.2f} ms")
20 changes: 16 additions & 4 deletions esque/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ def pretty_topic_diffs(topics_config_diff: Dict[str, Dict[str, Tuple[str, str]]]
config_diff_attributes[attribute] = value[0] + " -> " + value[1]
output.append({click.style(name, bold=True, fg="yellow"): {"Config Diff": config_diff_attributes}})

return pretty({"Topics to change": output})
return pretty({"Configuration changes": output})


def get_output_new_topics(new_topics: List[Topic]) -> str:
def pretty_new_topic_configs(new_topics: List[Topic]) -> str:
new_topic_configs = []
for topic in new_topics:
new_topic_config = {
Expand All @@ -146,6 +146,19 @@ def get_output_new_topics(new_topics: List[Topic]) -> str:
return pretty({"New topics to create": new_topic_configs})


def pretty_unchanged_topic_configs(new_topics: List[Topic]) -> str:
new_topic_configs = []
for topic in new_topics:
new_topic_config = {
"num_partitions: ": topic.num_partitions,
"replication_factor: ": topic.replication_factor,
"config": topic.config,
}
new_topic_configs.append({click.style(topic.name, bold=True, fg="blue"): new_topic_config})

return pretty({"No changes": new_topic_configs})


def pretty_size(value: Any) -> str:
if type(value) != int:
value = int(value)
Expand All @@ -160,7 +173,7 @@ def pretty_size(value: Any) -> str:
]
for sign, size in units:
if value >= size:
return f"{pretty_float(value/size)} {sign}"
return f"{pretty_float(value / size)} {sign}"


def bold(s: str) -> str:
Expand All @@ -185,7 +198,6 @@ def green_bold(s: str) -> str:
"member_id": bold,
}


CONVERSION_MAPPING = {
"ms": pretty_duration,
"seconds": partial(pretty_duration, multiplier=1000),
Expand Down
12 changes: 6 additions & 6 deletions esque/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from confluent_kafka.admin import AdminClient, ConfigResource

from esque.config import Config
from esque.helpers import ensure_kafka_futures_done
from esque.helpers import ensure_kafka_futures_done, unpack_confluent_config


class Cluster:
Expand All @@ -20,6 +20,9 @@ def __init__(self):
def bootstrap_servers(self):
return self._config.bootstrap_servers

def get_metadata(self):
return self.confluent_client.list_topics(timeout=1)

@property
def brokers(self):
metadata = self.confluent_client.list_topics(timeout=1)
Expand All @@ -30,11 +33,8 @@ def brokers(self):

def retrieve_config(self, config_type: ConfigResource.Type, id):
requested_resources = [ConfigResource(config_type, str(id))]

futures = self.confluent_client.describe_configs(requested_resources)

(old_resource, future), = futures.items()

future = ensure_kafka_futures_done([future])

return future.result()
result = future.result()
return unpack_confluent_config(result)
2 changes: 1 addition & 1 deletion esque/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def bootstrap_servers(self):
return [f"{host_name}:{self.bootstrap_port}" for host_name in self.bootstrap_hosts]

def context_switch(self, context: str):
click.echo((f"Switched to context: {context}"))
click.echo(f"Switched to context: {context}")
if context not in self.available_contexts:
raise ContextNotDefinedException(f"{context} not defined in {config_path()}")
self._update_config("Context", "current", context)
Expand Down
Loading

0 comments on commit 4e44be2

Please sign in to comment.