Skip to content

Commit

Permalink
Take bucket arn as arg, parse whole message object to add to bucket, …
Browse files Browse the repository at this point in the history
…use naming structure as defined on confluent docs
  • Loading branch information
dakota002 committed Aug 30, 2024
1 parent 1673cd6 commit 6dd7d68
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 13 deletions.
7 changes: 4 additions & 3 deletions gcn_monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def host_port(host_port_str):
show_default=True,
help="Log level",
)
def main(prometheus, loglevel):
@click.option("--bucketarn", help="Bucket ARN")
def main(prometheus, loglevel, bucketarn):
"""Monitor connectivity of a Kafka client.
Specify the Kafka client configuration in environment variables using the
Expand All @@ -51,7 +52,7 @@ def main(prometheus, loglevel):
* Convert to upper-case.
* Replace periods (`.`) with single underscores (`_`).
* Replace dashes (`-`) with double underscores (`__`).
* Replace underscores (`-`) with triple underscores (`___`).
* Replace underscores (`_`) with triple underscores (`___`).
"""
logging.basicConfig(level=loglevel)

Expand All @@ -60,4 +61,4 @@ def main(prometheus, loglevel):
)
log.info("Prometheus listening on %s", prometheus.netloc)

kafka.run()
kafka.run(bucketarn)
81 changes: 71 additions & 10 deletions gcn_monitor/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import json
import logging
from datetime import datetime, timezone
import re

import boto3
import gcn_kafka
Expand All @@ -18,12 +18,6 @@

log = logging.getLogger(__name__)
s3_client = boto3.client("s3")
resource_client = boto3.client("resourcegroupstaggingapi")

BUCKET_NAME = bucket = resource_client.get_resources(
TagFilters=[{"Key": "aws:cloudformation:logical-id", "Values": ["NoticesBucket"]}],
ResourceTypeFilters=["s3"],
)["ResourceTagMappingList"][0]["ResourceARN"].replace("arn:aws:s3:::", "")


def stats_cb(data):
Expand All @@ -32,7 +26,70 @@ def stats_cb(data):
metrics.broker_state.labels(broker["name"]).state(broker["state"])


def run():
def sanitize_topic(topic):
"""Replaces invalid characters in the topic string so it can be used as a file name
Args:
topic (string): Kafka message topic
Returns:
safe_name: A formatted version of the topic string with underscores(`_`) in place of illegal characters
"""
invalid_chars = r'[\/:*?"<>|]'
safe_name = re.sub(invalid_chars, "_", topic)
return safe_name.strip()


def parse_into_s3_object(message):
"""Parses a Kafka message into a key and body.
Args:
message (Message): Any Kafka message
Returns:
key (string): The Key for S3's put_object method, formatted as `topics/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>` defined at https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-names
body (dict): The Kafka message converted to a dict to be JSON serializable to include all fields available, such as:
- error
- headers
- key
- latency
- leader_epoch
- offset
- partition
- timestamp
- topic
- value
"""
topic = sanitize_topic(message.topic())
offset = message.offset()
partition = message.partition()
key = f"topics/{topic}/partition={partition}/{topic}+{partition}+{offset}.json"
properties = [
method_name
for method_name in dir(message)
if not method_name.startswith("__") and not method_name.startswith("set_")
]
return key, dict(
[
(
messageKey,
(
getattr(message, messageKey)().decode()
if isinstance(attr_value := getattr(message, messageKey)(), bytes)
and callable(getattr(message, messageKey))
else attr_value()
if callable(attr_value)
else attr_value
),
)
for messageKey in properties
]
)


def run(bucketarn):
log.info("Creating consumer")
config = gcn_kafka.config_from_env()
config["stats_cb"] = stats_cb
Expand All @@ -47,11 +104,15 @@ def run():
while True:
for message in consumer.consume(timeout=1):
topic = message.topic()
key, body = parse_into_s3_object(message)
s3_client.put_object(
Bucket=bucketarn.replace("arn:aws:s3:::", ""),
Key=key,
Body=body,
)
if error := message.error():
log.error("topic %s: got error %s", topic, error)
else:
log.info("topic %s: got message", topic)
partition = message.partition()
metrics.received.labels(topic, partition).inc()
key = f"{topic}-{datetime.now(timezone.utc)}"
s3_client.put_object(Bucket=BUCKET_NAME, Key=key, Body=message.value())

0 comments on commit 6dd7d68

Please sign in to comment.