diff --git a/src/saluki/__init__.py b/src/saluki/__init__.py index 442b0e6..65a9167 100644 --- a/src/saluki/__init__.py +++ b/src/saluki/__init__.py @@ -1,6 +1,6 @@ import logging from typing import Tuple, List - +import datetime from confluent_kafka import Message from streaming_data_types import DESERIALISERS from streaming_data_types.exceptions import StreamingDataTypesException @@ -38,4 +38,22 @@ def _deserialise_and_print_messages(msgs: List[Message], partition: int | None) if partition is not None and msg.partition() != partition: continue schema, deserialised = try_to_deserialise_message(msg.value()) - logger.info(f"{msg.offset()}:({schema}) {deserialised}") + time = _parse_timestamp(msg) + logger.info(f"{msg.offset()} ({time}):({schema}) {deserialised}") + + +def _parse_timestamp(msg: Message) -> str: + """ + Parse a message timestamp. + + See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp + :param msg: the message to parse. + :return: either the string-formatted timestamp or "Unknown" if not able to parse. + """ + timestamp_type, timestamp_ms_from_epoch = msg.timestamp() + if timestamp_type == 1: # TIMESTAMP_CREATE_TIME + return datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000).strftime( + '%Y-%m-%d %H:%M:%S.%f') + else: + # TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME + return "Unknown"