Skip to content

Commit

Permalink
ready for travis
Browse files Browse the repository at this point in the history
  • Loading branch information
MrTrustworthy committed Aug 18, 2019
1 parent f59d532 commit 2de02e6
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 118 deletions.
15 changes: 3 additions & 12 deletions esque/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@


class Broker(KafkaResource):
def __init__(
self, cluster, *, broker_id: int = None, host: str = None, port: int = None
):
def __init__(self, cluster, *, broker_id: int = None, host: str = None, port: int = None):
self.cluster = cluster
self.broker_id = broker_id
self.host = host
Expand All @@ -27,9 +25,7 @@ def from_attributes(cls, cluster, broker_id: int, host: str, port: int) -> "Brok
def get_all(cls, cluster) -> List["Broker"]:
metadata = cluster.get_metadata().brokers.values()
brokers = [
cls.from_attributes(
cluster, broker_id=broker.id, host=broker.host, port=broker.port
)
cls.from_attributes(cluster, broker_id=broker.id, host=broker.host, port=broker.port)
for broker in metadata
]
return sorted(brokers, key=operator.attrgetter("broker_id"))
Expand All @@ -38,9 +34,4 @@ def describe(self):
return self.cluster.retrieve_config(ConfigResource.Type.BROKER, self.broker_id)

def as_dict(self):
return {
"cluster": self.cluster,
"broker_id": self.broker_id,
"host": self.host,
"port": self.port,
}
return {"cluster": self.cluster, "broker_id": self.broker_id, "host": self.host, "port": self.port}
55 changes: 32 additions & 23 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
from esque.broker import Broker
from esque.cli.helpers import ensure_approval, HandleFileOnFinished
from esque.cli.options import State, no_verify_option, pass_state
from esque.cli.output import bold, pretty, pretty_topic_diffs, pretty_new_topic_configs, blue_bold, green_bold, \
pretty_unchanged_topic_configs
from esque.cli.output import (
bold,
pretty,
pretty_topic_diffs,
pretty_new_topic_configs,
blue_bold,
green_bold,
pretty_unchanged_topic_configs,
)
from esque.clients import FileConsumer, FileProducer, AvroFileProducer, AvroFileConsumer, PingConsumer, PingProducer
from esque.cluster import Cluster
from esque.config import PING_TOPIC, Config, PING_GROUP_ID
from esque.consumergroup import ConsumerGroupController
from esque.topic import Topic
from esque.errors import (
ConsumerGroupDoesNotExistException,
ContextNotDefinedException,
TopicAlreadyExistsException,
)
from esque.errors import ConsumerGroupDoesNotExistException, ContextNotDefinedException, TopicAlreadyExistsException
from esque.topic_controller import TopicController


Expand Down Expand Up @@ -104,7 +107,7 @@ def create_topic(state: State, topic_name: str):
def edit_topic(state: State, topic_name: str):
controller = TopicController(state.cluster)
topic = TopicController(state.cluster).get_cluster_topic(topic_name)
new_conf = click.edit(topic.to_yaml(only_editable=True), extension='.yml')
new_conf = click.edit(topic.to_yaml(only_editable=True), extension=".yml")

# edit process can be aborted, ex. in vim via :q!
if new_conf is None:
Expand Down Expand Up @@ -137,16 +140,21 @@ def apply(state: State, file: str):

# Calculate changes
to_create = [yaml_topic for yaml_topic in yaml_topics if yaml_topic.name not in cluster_topic_names]
to_edit = [yaml_topic for yaml_topic in yaml_topics if
yaml_topic not in to_create and topic_controller.diff_with_cluster(yaml_topic) != {}]
to_edit = [
yaml_topic
for yaml_topic in yaml_topics
if yaml_topic not in to_create and topic_controller.diff_with_cluster(yaml_topic) != {}
]
to_edit_diffs = {t.name: topic_controller.diff_with_cluster(t) for t in to_edit}
to_ignore = [yaml_topic for yaml_topic in yaml_topics if yaml_topic not in to_create and yaml_topic not in to_edit]

# Sanity check - the 3 groups of topics should be complete and have no overlap
assert set(to_create).isdisjoint(set(to_edit)) \
and set(to_create).isdisjoint(set(to_ignore)) \
and set(to_edit).isdisjoint(set(to_ignore)) \
and len(to_create) + len(to_edit) + len(to_ignore) == len(yaml_topics)
assert (
set(to_create).isdisjoint(set(to_edit))
and set(to_create).isdisjoint(set(to_ignore))
and set(to_edit).isdisjoint(set(to_ignore))
and len(to_create) + len(to_edit) + len(to_ignore) == len(yaml_topics)
)

# Print diffs so the user can check
click.echo(pretty_unchanged_topic_configs(to_ignore))
Expand All @@ -158,6 +166,12 @@ def apply(state: State, file: str):
click.echo("No changes detected, aborting")
return

# Warn users when replication & num_partition changes are attempted
if len(to_edit) > 0:
click.echo(
"Notice: changes to `replication_factor` and `num_partitions` can not be applied on already existing topics"
)

# Get approval
if not ensure_approval("Apply changes?", no_verify=state.no_verify):
click.echo("Cancelling changes")
Expand Down Expand Up @@ -188,9 +202,7 @@ def delete_topic(state: State, topic_name: str):
@click.argument("topic-name", required=True, type=click.STRING, autocompletion=list_topics)
@pass_state
def describe_topic(state, topic_name):
partitions, config = (
TopicController(state.cluster).get_cluster_topic(topic_name).describe()
)
partitions, config = TopicController(state.cluster).get_cluster_topic(topic_name).describe()

click.echo(bold(f"Topic: {topic_name}"))

Expand Down Expand Up @@ -276,8 +288,7 @@ def get_topics(state, topic):
)
@pass_state
def transfer(
state: State, topic: str, from_context: str, to_context: str, numbers: int, last: bool, avro: bool,
keep_file: bool
state: State, topic: str, from_context: str, to_context: str, numbers: int, last: bool, avro: bool, keep_file: bool
):
current_timestamp_milliseconds = int(round(time.time() * 1000))
unique_name = topic + "_" + str(current_timestamp_milliseconds)
Expand Down Expand Up @@ -321,7 +332,7 @@ def _produce_from_file(topic: str, to_context: str, working_dir: pathlib.Path, a


def _consume_to_file(
working_dir: pathlib.Path, topic: str, group_id: str, from_context: str, numbers: int, avro: bool, last: bool
working_dir: pathlib.Path, topic: str, group_id: str, from_context: str, numbers: int, avro: bool, last: bool
) -> int:
if avro:
consumer = AvroFileConsumer(group_id, topic, working_dir, last)
Expand All @@ -343,9 +354,7 @@ def ping(state, times, wait):
deltas = []
try:
try:
topic_controller.create_topics(
[topic_controller.get_cluster_topic(PING_TOPIC)]
)
topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)])
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

Expand Down
6 changes: 3 additions & 3 deletions esque/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def get_value(unit: str, value: Any) -> str:
if isinstance(value, str) and " -> " in value:
values = value.split(" -> ")
return (
click.style(get_value(unit, values[0]), fg="red")
+ " -> "
+ click.style(get_value(unit, values[1]), fg="green")
click.style(get_value(unit, values[0]), fg="red")
+ " -> "
+ click.style(get_value(unit, values[1]), fg="green")
)

if unit in CONVERSION_MAPPING:
Expand Down
2 changes: 1 addition & 1 deletion esque/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def bootstrap_servers(self):

def get_metadata(self):
return self.confluent_client.list_topics(timeout=1)

@property
def brokers(self):
metadata = self.confluent_client.list_topics(timeout=1)
Expand Down
4 changes: 1 addition & 3 deletions esque/consumergroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ def get_consumer_offsets(self, group_coordinator, consumer_id, topic_assignment,

if verbose:
for topic in consumer_offsets.keys():
topic_offsets = self.topic_controller.get_cluster_topic(
topic
).get_offsets()
topic_offsets = self.topic_controller.get_cluster_topic(topic).get_offsets()
for partition_id, consumer_offset in consumer_offsets[topic].items():
consumer_offsets[topic][partition_id] = {
"consumer_offset": consumer_offset,
Expand Down
12 changes: 6 additions & 6 deletions esque/topic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Tuple, Union, Any
from typing import Dict, List, Tuple, Union

import yaml

Expand All @@ -10,11 +10,11 @@

class Topic(KafkaResource):
def __init__(
self,
name: Union[str, bytes],
num_partitions: int = None,
replication_factor: int = None,
config: Dict[str, str] = None,
self,
name: Union[str, bytes],
num_partitions: int = None,
replication_factor: int = None,
config: Dict[str, str] = None,
):
# Should we warn in those cases to force clients to migrate to string-only?
if isinstance(name, bytes):
Expand Down
33 changes: 8 additions & 25 deletions esque/topic_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@ def __init__(self, cluster: Cluster):
self.cluster: Cluster = cluster

@raise_for_kafka_exception
def list_topics(
self, *, search_string: str = None, sort: bool = True, hide_internal: bool = True
) -> List[Topic]:
def list_topics(self, *, search_string: str = None, sort: bool = True, hide_internal: bool = True) -> List[Topic]:
self.cluster.confluent_client.poll(timeout=1)
topic_results = self.cluster.confluent_client.list_topics().topics.values()
topic_names = [t.topic for t in topic_results]
if search_string:
topic_names = [
topic for topic in topic_names if re.match(search_string, topic)
]
topic_names = [topic for topic in topic_names if re.match(search_string, topic)]
if hide_internal:
topic_names = [topic for topic in topic_names if not topic.startswith("__")]
if sort:
Expand All @@ -54,10 +50,7 @@ def create_topics(self, topics: List[Topic]):
partitions = topic.num_partitions if topic.num_partitions is not None else DEFAULT_PARTITIONS
replicas = topic.replication_factor if topic.replication_factor is not None else DEFAULT_REPLICATION
new_topic = NewTopic(
topic.name,
num_partitions=partitions,
replication_factor=replicas,
config=topic.config
topic.name, num_partitions=partitions, replication_factor=replicas, config=topic.config
)
future_list = self.cluster.confluent_client.create_topics([new_topic])
ensure_kafka_futures_done(list(future_list.values()))
Expand All @@ -66,9 +59,7 @@ def create_topics(self, topics: List[Topic]):
@invalidate_cache_after
def alter_configs(self, topics: List[Topic]):
for topic in topics:
config_resource = ConfigResource(
ConfigResource.Type.TOPIC, topic.name, topic.config
)
config_resource = ConfigResource(ConfigResource.Type.TOPIC, topic.name, topic.config)
future_list = self.cluster.confluent_client.alter_configs([config_resource])
ensure_kafka_futures_done(list(future_list.values()))

Expand All @@ -86,28 +77,20 @@ def get_cluster_topic(self, topic_name: str) -> Topic:
def update_from_cluster(self, topic: Topic):
"""Takes a topic and, based on its name, updates all attributes from the cluster"""
if topic.is_only_local: # only have to instantiate those once
topic._pykafka_topic = self.cluster.pykafka_client.cluster.topics[
topic.name
]
topic._confluent_topic = self.cluster.confluent_client.list_topics(
topic=topic.name, timeout=10
).topics
topic._pykafka_topic = self.cluster.pykafka_client.cluster.topics[topic.name]
topic._confluent_topic = self.cluster.confluent_client.list_topics(topic=topic.name, timeout=10).topics

# TODO put the topic instances into a cache of this class
topic.low_watermark = topic._pykafka_topic.earliest_available_offsets()
topic.high_watermark = topic._pykafka_topic.latest_available_offsets()
topic.partitions = list(topic._pykafka_topic.partitions.keys())
topic.config = self.cluster.retrieve_config(
ConfigResource.Type.TOPIC, topic.name
)
topic.config = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, topic.name)
topic.is_only_local = False
return topic

@raise_for_kafka_exception
def diff_with_cluster(self, topic: Topic) -> Dict[str, AttributeDiff]:
cluster_state = self.cluster.retrieve_config(
ConfigResource.Type.TOPIC, topic.name
)
cluster_state = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, topic.name)
out = {}
for name, old_value in cluster_state.items():
new_val = topic.config.get(name)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def changed_topic_object(cluster, topic):
yield Topic(topic, 1, 3, {"cleanup.policy": "compact"})



@pytest.fixture()
def topic(topic_factory: Callable[[int, str], Tuple[str, int]]) -> Iterable[str]:
topic_id = "".join(random.choices(ascii_letters, k=5))
Expand Down Expand Up @@ -137,6 +136,7 @@ def producer(test_config: Config):
producer_config = test_config.create_confluent_config()
yield Producer(producer_config)


@pytest.fixture()
def avro_producer(test_config: Config):
producer_config = test_config.create_confluent_config()
Expand Down
Loading

0 comments on commit 2de02e6

Please sign in to comment.