Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/saluki/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from typing import Tuple, List

import datetime
from confluent_kafka import Message
from streaming_data_types import DESERIALISERS
from streaming_data_types.exceptions import StreamingDataTypesException
Expand Down Expand Up @@ -38,4 +38,22 @@ def _deserialise_and_print_messages(msgs: List[Message], partition: int | None)
if partition is not None and msg.partition() != partition:
continue
schema, deserialised = try_to_deserialise_message(msg.value())
logger.info(f"{msg.offset()}:({schema}) {deserialised}")
time = _parse_timestamp(msg)
logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}")


def _parse_timestamp(msg: Message) -> str:
"""
Parse a message timestamp.

See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp
:param msg: the message to parse.
:return: either the string-formatted timestamp or "Unknown" if not able to parse.
"""
timestamp_type, timestamp_ms_from_epoch = msg.timestamp()
if timestamp_type == 1: # TIMESTAMP_CREATE_TIME
return datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000).strftime(
'%Y-%m-%d %H:%M:%S.%f')
else:
# TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME
return "Unknown"
Loading