Skip to content

Commit

Permalink
Merge pull request #35 from rh-marketingops/0.3.0
Browse files Browse the repository at this point in the history
CLI and adjusted table offset tracking
  • Loading branch information
truthordata authored Jan 9, 2023
2 parents aa6c1d3 + 2f8fff1 commit ddc8668
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 25 deletions.
1 change: 1 addition & 0 deletions fluvii/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

4 changes: 4 additions & 0 deletions fluvii/cli/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from fluvii.cli.commands import fluvii_cli

if __name__ == "__main__":
fluvii_cli()
2 changes: 2 additions & 0 deletions fluvii/cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .base import fluvii_cli
from .topics import topics_group
7 changes: 7 additions & 0 deletions fluvii/cli/commands/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import click


@click.version_option(prog_name="fluvii", package_name="fluvii")
@click.group()
def fluvii_cli():
pass
81 changes: 81 additions & 0 deletions fluvii/cli/commands/topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import click
from .base import fluvii_cli
from fluvii.kafka_tools import FluviiToolbox
from fluvii.general_utils import parse_headers
import json


@fluvii_cli.group("topics")
def topics_group():
pass


@topics_group.command(name="list")
@click.option("--include-configs", is_flag=True)
def list_topics(include_configs):
click.echo(json.dumps(FluviiToolbox().list_topics(include_configs=include_configs), indent=4))


@topics_group.command(name="create")
@click.option("--topic-config-dict", type=str, required=False)
@click.option("--topic-list", type=str, required=False)
@click.option("--config-dict", type=str, required=False)
def create_topics(topic_config_dict, topic_list, config_dict):
if topic_config_dict:
topic_config_dict = json.loads(topic_config_dict)
else:
if ', ' in topic_list:
topic_list = topic_list.split(', ')
else:
topic_list = topic_list.split(',')

if config_dict:
config_dict = json.loads(config_dict)
else:
click.echo('There were no configs defined; using defaults of {partitions=3, replication_factor=3}')
config_dict = {'partitions': 3, 'replication_factor': 3}
topic_config_dict = {topic: config_dict for topic in topic_list}
click.echo(f'Creating topics {list(topic_config_dict.keys())}')
FluviiToolbox().create_topics(topic_config_dict)


@topics_group.command(name="delete")
@click.option("--topic-config-dict", type=str, required=False)
@click.option("--topic-list", type=str, required=False)
def delete_topics(topic_config_dict, topic_list):
if topic_list:
if ', ' in topic_list:
topic_list = topic_list.split(', ')
else:
topic_list = topic_list.split(',')
else:
topic_list = list(json.loads(topic_config_dict).keys())
click.echo(f'Deleting topics {topic_list}')
FluviiToolbox().delete_topics(topic_list)


@topics_group.command(name="consume")
@click.option("--topic-offset-dict", type=str, required=True)
@click.option("--output-filepath", type=click.File("w"), required=True)
def consume_topics(topic_offset_dict, output_filepath):
def transform(transaction):
msgs = [{k: msg.__getattribute__(k)() for k in ['key', 'value', 'headers', 'topic', 'partition', 'offset', 'timestamp']} for msg in transaction.messages()]
for msg in msgs:
msg['headers'] = parse_headers(msg['headers'])
return msgs
topic_offset_dict = json.loads(topic_offset_dict)
messages = FluviiToolbox().consume_messages(topic_offset_dict, transform)
click.echo('Messages finished consuming, now outputting to file...')
json.dump(messages, output_filepath, indent=4, sort_keys=True)


@topics_group.command(name="produce")
@click.option("--topic-schema-dict", type=str, required=True)
@click.option("--input-filepath", type=click.File("r"), required=True)
@click.option("--topic-override", type=str, required=False)
def produce_message(topic_schema_dict, input_filepath, topic_override):
FluviiToolbox().produce_messages(
json.loads(topic_schema_dict),
json.loads(input_filepath.read()),
topic_override
)
15 changes: 8 additions & 7 deletions fluvii/fluvii_app/rebalance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ def __getattr__(self, attr):

@property
def needs_recovery(self):
# note: >2 is because: +1 due to table tracking "current" offset, and +1 due to last offset being a transaction marker
# note: >2 is because: 1 due to next-to-last offset being a transaction marker, +1 due last offset being the "next" offset to be read,
has_consumable_offets = self.lowwater != self.highwater
table_is_behind = self.recovery_offset_delta > 2
LOGGER.debug(f'table {self.partition}: has consumable offsets? {has_consumable_offets}; table is behind? {table_is_behind}')
LOGGER.debug(f'table p{self.partition}: has consumable offsets? {has_consumable_offets}; table is behind? {table_is_behind}')
if table_is_behind:
LOGGER.debug(f'table p{self.partition}: table offset {self.table_offset}; highwater {self.highwater}')
return has_consumable_offets and table_is_behind

@property
Expand Down Expand Up @@ -116,7 +118,7 @@ def _pause_all_active_partitions(self):
def _init_table_dbs(self):
for p in self._changelog_partitions:
if not p.table_assigned:
self.tables[p.partition] = SqliteFluvii(f'p{p.partition}', self._config)
self.tables[p.partition] = SqliteFluvii(f'p{p.partition}', self._config, allow_commits=True)
p.table_assigned = True

def _init_recovery_time_remaining_attrs(self):
Expand All @@ -133,8 +135,7 @@ def _get_table_offsets(self):
p.table_offset = self.tables[p.partition].offset
if p.table_offset < p.lowwater:
LOGGER.debug('Adjusting table offset due to it being lower than the changelog lowwater')
p.table_offset = p.lowwater - 2 # -2 since the table marks the latest offset it has (which can never be the "last" offset since it's a marker), not which offset is next (aka how kafka tracks it)
LOGGER.info(f'NOTE: tables are considered "current" if (highwater - table_offset) <= 2')
p.table_offset = p.lowwater
LOGGER.info(f'(table, table offset, highwater) list : {[(p.partition, p.table_offset, p.highwater) for p in self._changelog_partitions]}')

def _set_partition_recovery_statuses(self):
Expand Down Expand Up @@ -166,8 +167,8 @@ def _update_recovery_partition_table_offsets(self):
def _set_all_tables_to_latest_offset(self):
for p in self._changelog_partitions:
if p.table_offset < p.highwater:
LOGGER.debug(f'Setting table p{p.partition} to {p.highwater-2} (highwater-2) to mark as fully recovered')
self.tables[p.partition].set_offset(p.highwater - 2)
LOGGER.debug(f'Setting table p{p.partition} to {p.highwater}')
self.tables[p.partition].set_offset(p.highwater)
self.tables[p.partition].commit()

def _close_tables(self, partitions=None):
Expand Down
1 change: 1 addition & 0 deletions fluvii/kafka_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .fluvii_toolbox import FluviiToolbox
127 changes: 127 additions & 0 deletions fluvii/kafka_tools/fluvii_toolbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from fluvii.general_utils import Admin
from fluvii.producer import Producer
from fluvii.fluvii_app import FluviiConfig
from fluvii.auth import SaslPlainClientConfig
from fluvii.schema_registry import SchemaRegistry
from confluent_kafka.admin import NewTopic, ConfigResource
from fluvii.kafka_tools.topic_dumper import TopicDumperApp
import logging


LOGGER = logging.getLogger(__name__)


class FluviiToolbox:
"""
Helpful functions for interacting with Kafka.
"""

def __init__(self, fluvii_config=None):
if not fluvii_config:
fluvii_config = FluviiConfig()
self._config = fluvii_config
admin_auth = self._config.client_auth_config
if self._config.client_auth_config:
admin_auth = SaslPlainClientConfig(username=admin_auth.username, password=admin_auth.password)
self.admin = Admin(self._config.client_urls, admin_auth)

def list_topics(self, valid_only=True, include_configs=False):
def _valid(topic):
if valid_only:
return not topic.startswith('__') and 'schema' not in topic
return True
topics = sorted([t for t in self.admin.list_topics().topics if _valid(t)])
if include_configs:
futures_dict = self.admin.describe_configs([ConfigResource(2, topic) for topic in topics])
topics = {config_resource.name: {c.name: c.value for c in configs.result().values()} for config_resource, configs in futures_dict.items()}
return topics

def create_topics(self, topic_config_dict, ignore_existing_topics=True):
"""
{'topic_a': {'partitions': 1, 'replication_factor': 1, 'segment.ms': 10000}, 'topic_b': {etc}},
"""
if ignore_existing_topics:
existing = set(self.list_topics())
remove = set(topic_config_dict.keys()) & existing
if remove:
LOGGER.info(f'These topics already exist, ignoring: {remove}')
for i in remove:
topic_config_dict.pop(i)
for topic in topic_config_dict:
topic_config_dict[topic] = NewTopic(
topic=topic,
num_partitions=topic_config_dict[topic].pop('partitions'),
replication_factor=topic_config_dict[topic].pop('replication_factor'),
config=topic_config_dict[topic])
if topic_config_dict:
futures = self.admin.create_topics(list(topic_config_dict.values()), operation_timeout=10)
for topic in topic_config_dict:
futures[topic].result()
LOGGER.info(f'Created topics: {list(topic_config_dict.keys())}')

def alter_topics(self, topic_config_dict, retain_configs=True, ignore_missing_topics=True, protected_configs=[]):
"""
{'topic_a': {'partitions': 1, 'replication_factor': 1, 'segment.ms': 10000}, 'topic_b': {etc}}
"""
current_configs = {}
if retain_configs:
current_configs = self.list_topics(include_configs=True)
for topic in current_configs.items():
current_configs[topic] = {k: v for k, v in current_configs[topic].items() if k not in protected_configs}
topics = current_configs.keys()
else:
topics = self.list_topics()
if ignore_missing_topics:
existing = set(topics)
missing = set(topic_config_dict.keys()) - existing
if missing:
LOGGER.info(f'These topics dont exist, ignoring: {missing}')
for i in missing:
topic_config_dict.pop(i)
for topic in topic_config_dict:
current_configs[topic].update(topic_config_dict[topic])
topic_config_dict[topic] = current_configs[topic]
if topic_config_dict:
futures = self.admin.alter_configs([ConfigResource(2, topic, set_config=configs) for topic, configs in topic_config_dict.items()])
for topic in topic_config_dict:
futures[topic].result()
LOGGER.info(f'Altered topics: {list(topic_config_dict.keys())}')

def delete_topics(self, topics, ignore_missing=True):
if ignore_missing:
existing = set(self.list_topics())
missing = set(topics) - existing
if missing:
LOGGER.info(f'These topics dont exist, ignoring: {missing}')
topics = [i for i in topics if i not in missing]
if topics:
futures = self.admin.delete_topics(topics)
for topic in topics:
futures[topic].result()
LOGGER.info(f'Deleted topics: {topics}')

def produce_messages(self, topic_schema_dict, message_list, topic_override=None):
producer = Producer(
urls=self._config.client_urls,
client_auth_config=self._config.client_auth_config,
topic_schema_dict=topic_schema_dict,
schema_registry=SchemaRegistry(self._config.schema_registry_url, auth_config=self._config.schema_registry_auth_config).registry
)
LOGGER.info('Producing messages...')
poll = 0
if topic_override:
LOGGER.info(f'A topic override was passed; ignoring the topic provided in each message body and using topic {topic_override} instead')
for msg in message_list:
msg['topic'] = topic_override
for message in message_list:
message = {k: message.get(k) for k in ['key', 'value', 'headers', 'topic']}
producer.produce(message.pop('value'), **message)
poll += 1
if poll >= 1000:
producer.poll(0)
poll = 0
producer.flush(30)
LOGGER.info('Produce finished!')

def consume_messages(self, consume_topics_dict, transform_function=None):
return TopicDumperApp(consume_topics_dict, app_function=transform_function, fluvii_config=self._config).run()
79 changes: 79 additions & 0 deletions fluvii/kafka_tools/topic_dumper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from fluvii.fluvii_app import FluviiMultiMessageApp
from confluent_kafka import TopicPartition
import logging
from time import sleep


LOGGER = logging.getLogger(__name__)


class TopicDumperApp(FluviiMultiMessageApp):
"""Note: this should just be converted to a regular consumer, but I already had most of the code so whatever. """
def __init__(self, consume_topics_dict, app_function=None, **kwargs):
"""
consume_topics_dict example: {"topic_a": {0: 100, 2: 400, 3: "earliest"}, "topic_b": {1: 220}}
"""
self._consume_topics_dict = consume_topics_dict
super().__init__(app_function, list(consume_topics_dict.keys()), **kwargs)

def _set_config(self):
super()._set_config()
self._config.consumer_config.batch_consume_max_count = None
self._config.consumer_config.batch_consume_max_time_seconds = None
self._config.consumer_config.auto_offset_reset = 'earliest'

def _init_metrics_manager(self):
pass

def _get_partition_assignment(self):
LOGGER.debug('Getting partition assignments...')
self._consumer.poll(5)
partitions = self._consumer.assignment() # Note: this actually can change in-place as partitions get assigned in the background!
assign_count_prev = 0
checks = 2
while checks:
assign_count_current = len(partitions)
if (assign_count_current > assign_count_prev) or assign_count_current == 0:
assign_count_prev = assign_count_current
else:
checks -= 1
sleep(1)
LOGGER.debug(f'All partition assignments retrieved: {partitions}')
return partitions

def _seek_consumer_to_offsets(self):
LOGGER.info('Setting up consumer to pull from the beginning of the topics...')
self._get_partition_assignment() # mostly just to ensure we can seek
for topic, partitions in self._consume_topics_dict.items():
for p, offset in partitions.items():
partition = TopicPartition(topic=topic, partition=int(p), offset=offset)
watermarks = self._consumer.get_watermark_offsets(partition)
if offset == 'earliest':
offset = watermarks[0]
elif offset == 'latest':
offset = watermarks[1]
elif int(offset) < watermarks[0]:
offset = watermarks[0]
LOGGER.debug(f'Seeking {topic} p{p} to offset {offset}')
self._consumer.seek(partition)

def _finalize_app_batch(self):
if self._app_function:
LOGGER.info('Transforming all messages to desired format...')
self._consumer._messages = self._app_function(self.transaction, *self._app_function_arglist)
raise Exception('Got all messages!')

def _app_shutdown(self):
LOGGER.info('App is shutting down...')
self._shutdown = True
self.kafka_cleanup()

def _runtime_init(self):
super()._runtime_init()
self._seek_consumer_to_offsets()

def run(self, **kwargs):
try:
super().run(**kwargs)
finally:
return self.transaction.messages()
1 change: 1 addition & 0 deletions fluvii/producer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ProducerConfig(KafkaConfigBase):
"""
def __init__(self):
self.transaction_timeout_mins = int(environ.get('FLUVII_TRANSACTION_TIMEOUT_MINUTES', '1'))
self.schema_library_root = environ.get('FLUVII_SCHEMA_LIBRARY_ROOT', '')

def as_client_dict(self):
return {
Expand Down
Loading

0 comments on commit ddc8668

Please sign in to comment.