Skip to content

Commit aaeca9a

Browse files
mahajanadhityaemasabmilindl
authored
[KIP-396] List offsets implementation (confluentinc#1576)
--------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Milind L <[email protected]>
1 parent 9bcb404 commit aaeca9a

16 files changed

+742
-28
lines changed

docs/index.rst

+29
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Supporting classes
2525
- :ref:`Message <pythonclient_message>`
2626
- :ref:`TopicPartition <pythonclient_topicpartition>`
2727
- :ref:`ThrottleEvent <pythonclient_throttleevent>`
28+
- :ref:`IsolationLevel <pythonclient_isolation_level>`
2829
- :ref:`TopicCollection <pythonclient_topic_collection>`
2930
- :ref:`TopicPartitionInfo <pythonclient_topic_partition_info>`
3031
- :ref:`Node <pythonclient_node>`
@@ -61,6 +62,8 @@ Supporting classes
6162
- :ref:`UserScramCredentialAlteration <pythonclient_user_scram_credential_alteration>`
6263
- :ref:`UserScramCredentialUpsertion <pythonclient_user_scram_credential_upsertion>`
6364
- :ref:`UserScramCredentialDeletion <pythonclient_user_scram_credential_deletion>`
65+
- :ref:`OffsetSpec <pythonclient_offset_spec>`
66+
- :ref:`ListOffsetsResultInfo <pythonclient_list_offsets_result_info>`
6467
- :ref:`TopicDescription <pythonclient_topic_description>`
6568
- :ref:`DescribeClusterResult <pythonclient_describe_cluster_result>`
6669
- :ref:`BrokerMetadata <pythonclient_broker_metadata>`
@@ -267,6 +270,24 @@ UserScramCredentialDeletion
267270
.. autoclass:: confluent_kafka.admin.UserScramCredentialDeletion
268271
:members:
269272

273+
.. _pythonclient_offset_spec:
274+
275+
**********
276+
OffsetSpec
277+
**********
278+
279+
.. autoclass:: confluent_kafka.admin.OffsetSpec
280+
:members:
281+
282+
.. _pythonclient_list_offsets_result_info:
283+
284+
*********************
285+
ListOffsetsResultInfo
286+
*********************
287+
288+
.. autoclass:: confluent_kafka.admin.ListOffsetsResultInfo
289+
:members:
290+
270291
.. _pythonclient_topic_description:
271292

272293
****************
@@ -816,6 +837,14 @@ ThrottleEvent
816837
.. autoclass:: confluent_kafka.ThrottleEvent
817838
:members:
818839

840+
.. _pythonclient_isolation_level:
841+
842+
**************
843+
IsolationLevel
844+
**************
845+
846+
.. autoclass:: confluent_kafka.IsolationLevel
847+
:members:
819848

820849
.. _avro_producer:
821850

examples/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ The scripts in this directory provide various examples of using Confluent's Pyth
1212
* [protobuf_producer.py](protobuf_producer.py): Produce Protobuf serialized data using ProtobufSerializer.
1313
* [protobuf_consumer.py](protobuf_consumer.py): Read Protobuf serialized data using ProtobufDeserializer.
1414
* [sasl_producer.py](sasl_producer.py): Demonstrates SASL Authentication.
15-
* [list_offsets.py](list_offsets.py): List committed offsets and consumer lag for group and topics.
15+
* [get_watermark_offsets.py](get_watermark_offsets.py): Consumer method for listing committed offsets and consumer lag for group and topics.
1616
* [oauth_producer.py](oauth_producer.py): Demonstrates OAuth Authentication (client credentials).
1717

1818
Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/):

examples/adminapi.py

+60-3
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
# Example use of AdminClient operations.
1919

2020
from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
21-
TopicPartition, ConsumerGroupState, TopicCollection)
21+
TopicPartition, ConsumerGroupState, TopicCollection,
22+
IsolationLevel)
2223
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
2324
ConfigEntry, ConfigSource, AclBinding,
2425
AclBindingFilter, ResourceType, ResourcePatternType,
2526
AclOperation, AclPermissionType, AlterConfigOpType,
2627
ScramMechanism, ScramCredentialInfo,
27-
UserScramCredentialUpsertion, UserScramCredentialDeletion)
28+
UserScramCredentialUpsertion, UserScramCredentialDeletion,
29+
OffsetSpec)
2830
import sys
2931
import threading
3032
import logging
@@ -765,6 +767,57 @@ def example_alter_user_scram_credentials(a, args):
765767
print("{}: Error: {}".format(username, e))
766768

767769

770+
def example_list_offsets(a, args):
771+
topic_partition_offsets = {}
772+
if len(args) == 0:
773+
raise ValueError(
774+
"Invalid number of arguments for list offsets, expected at least 1, got 0")
775+
i = 1
776+
partition_i = 1
777+
isolation_level = IsolationLevel[args[0]]
778+
while i < len(args):
779+
if i + 3 > len(args):
780+
raise ValueError(
781+
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 3," +
782+
f" got {len(args) - i}")
783+
topic = args[i]
784+
partition = int(args[i+1])
785+
topic_partition = TopicPartition(topic, partition)
786+
787+
if "EARLIEST" == args[i+2]:
788+
offset_spec = OffsetSpec.earliest()
789+
790+
elif "LATEST" == args[i+2]:
791+
offset_spec = OffsetSpec.latest()
792+
793+
elif "MAX_TIMESTAMP" == args[i+2]:
794+
offset_spec = OffsetSpec.max_timestamp()
795+
796+
elif "TIMESTAMP" == args[i+2]:
797+
if i + 4 > len(args):
798+
raise ValueError(
799+
f"Invalid number of arguments for list offsets, partition {partition_i}, expected 4" +
800+
f", got {len(args) - i}")
801+
offset_spec = OffsetSpec.for_timestamp(int(args[i+3]))
802+
i += 1
803+
else:
804+
raise ValueError("Invalid OffsetSpec, must be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP")
805+
topic_partition_offsets[topic_partition] = offset_spec
806+
i = i + 3
807+
partition_i += 1
808+
809+
futmap = a.list_offsets(topic_partition_offsets, isolation_level=isolation_level, request_timeout=30)
810+
for partition, fut in futmap.items():
811+
try:
812+
result = fut.result()
813+
print("Topicname : {} Partition_Index : {} Offset : {} Timestamp : {}"
814+
.format(partition.topic, partition.partition, result.offset,
815+
result.timestamp))
816+
except KafkaException as e:
817+
print("Topicname : {} Partition_Index : {} Error : {}"
818+
.format(partition.topic, partition.partition, e))
819+
820+
768821
if __name__ == '__main__':
769822
if len(sys.argv) < 3:
770823
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
@@ -801,6 +854,9 @@ def example_alter_user_scram_credentials(a, args):
801854
'<iterations1> <password1> <salt1> ' +
802855
'[UPSERT <user2> <mechanism2> <iterations2> ' +
803856
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
857+
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
858+
'[<topic2> <partition2> <offset_spec2> ..]\n')
859+
804860
sys.exit(1)
805861

806862
broker = sys.argv[1]
@@ -829,7 +885,8 @@ def example_alter_user_scram_credentials(a, args):
829885
'list_consumer_group_offsets': example_list_consumer_group_offsets,
830886
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
831887
'describe_user_scram_credentials': example_describe_user_scram_credentials,
832-
'alter_user_scram_credentials': example_alter_user_scram_credentials}
888+
'alter_user_scram_credentials': example_alter_user_scram_credentials,
889+
'list_offsets': example_list_offsets}
833890

834891
if operation not in opsmap:
835892
sys.stderr.write('Unknown operation: %s\n' % operation)
File renamed without changes.

src/confluent_kafka/__init__.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
ConsumerGroupTopicPartitions,
2424
ConsumerGroupState,
2525
TopicCollection,
26-
TopicPartitionInfo)
26+
TopicPartitionInfo,
27+
IsolationLevel)
2728

2829
from .cimpl import (Producer,
2930
Consumer,
@@ -47,7 +48,8 @@
4748
'Producer', 'DeserializingConsumer',
4849
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
4950
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
50-
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid']
51+
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
52+
'IsolationLevel']
5153

5254
__version__ = version()[0]
5355

src/confluent_kafka/_model/__init__.py

+17
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,20 @@ def __init__(self, id, leader, replicas, isr):
132132
self.leader = leader
133133
self.replicas = replicas
134134
self.isr = isr
135+
136+
137+
class IsolationLevel(Enum):
138+
"""
139+
Enum for Kafka isolation levels.
140+
141+
Values:
142+
-------
143+
"""
144+
145+
READ_UNCOMMITTED = cimpl.ISOLATION_LEVEL_READ_UNCOMMITTED #: Receive all the offsets.
146+
READ_COMMITTED = cimpl.ISOLATION_LEVEL_READ_COMMITTED #: Skip offsets belonging to an aborted transaction.
147+
148+
def __lt__(self, other):
149+
if self.__class__ != other.__class__:
150+
return NotImplemented
151+
return self.value < other.value

src/confluent_kafka/admin/__init__.py

+98-6
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,14 @@
4646
ScramCredentialInfo,
4747
ScramMechanism,
4848
UserScramCredentialsDescription)
49+
4950
from ._topic import (TopicDescription) # noqa: F401
5051

5152
from ._cluster import (DescribeClusterResult) # noqa: F401
5253

54+
from ._listoffsets import (OffsetSpec, # noqa: F401
55+
ListOffsetsResultInfo)
56+
5357
from .._model import TopicCollection as _TopicCollection
5458

5559
from ..cimpl import (KafkaException, # noqa: F401
@@ -71,11 +75,10 @@
7175
RESOURCE_BROKER,
7276
OFFSET_INVALID)
7377

74-
from confluent_kafka import ConsumerGroupTopicPartitions \
75-
as _ConsumerGroupTopicPartitions
76-
77-
from confluent_kafka import ConsumerGroupState \
78-
as _ConsumerGroupState
78+
from confluent_kafka import \
79+
ConsumerGroupTopicPartitions as _ConsumerGroupTopicPartitions, \
80+
ConsumerGroupState as _ConsumerGroupState, \
81+
IsolationLevel as _IsolationLevel
7982

8083

8184
try:
@@ -295,6 +298,28 @@ def _make_user_scram_credentials_result(f, futmap):
295298
for _, fut in futmap.items():
296299
fut.set_exception(e)
297300

301+
@staticmethod
302+
def _make_futmap_result(f, futmap):
303+
try:
304+
results = f.result()
305+
len_results = len(results)
306+
len_futures = len(futmap)
307+
if len(results) != len_futures:
308+
raise RuntimeError(
309+
f"Results length {len_results} is different from future-map length {len_futures}")
310+
for key, value in results.items():
311+
fut = futmap.get(key, None)
312+
if fut is None:
313+
raise RuntimeError(
314+
f"Key {key} not found in future-map: {futmap}")
315+
if isinstance(value, KafkaError):
316+
fut.set_exception(KafkaException(value))
317+
else:
318+
fut.set_result(value)
319+
except Exception as e:
320+
for _, fut in futmap.items():
321+
fut.set_exception(e)
322+
298323
@staticmethod
299324
def _create_future():
300325
f = concurrent.futures.Future()
@@ -384,7 +409,6 @@ def _check_list_consumer_group_offsets_request(request):
384409
if topic_partition.partition < 0:
385410
raise ValueError("Element of 'topic_partitions' must not have negative 'partition' value")
386411
if topic_partition.offset != OFFSET_INVALID:
387-
print(topic_partition.offset)
388412
raise ValueError("Element of 'topic_partitions' must not have 'offset' value")
389413

390414
@staticmethod
@@ -479,6 +503,34 @@ def _check_alter_user_scram_credentials_request(alterations):
479503
"to be either a UserScramCredentialUpsertion or a " +
480504
"UserScramCredentialDeletion")
481505

506+
@staticmethod
507+
def _check_list_offsets_request(topic_partition_offsets, kwargs):
508+
if not isinstance(topic_partition_offsets, dict):
509+
raise TypeError("Expected topic_partition_offsets to be " +
510+
"dict of [TopicPartitions,OffsetSpec] for list offsets request")
511+
512+
for topic_partition, offset_spec in topic_partition_offsets.items():
513+
if topic_partition is None:
514+
raise TypeError("partition cannot be None")
515+
if not isinstance(topic_partition, _TopicPartition):
516+
raise TypeError("partition must be a TopicPartition")
517+
if topic_partition.topic is None:
518+
raise TypeError("partition topic name cannot be None")
519+
if not isinstance(topic_partition.topic, string_type):
520+
raise TypeError("partition topic name must be string")
521+
if not topic_partition.topic:
522+
raise ValueError("partition topic name cannot be empty")
523+
if topic_partition.partition < 0:
524+
raise ValueError("partition index must be non-negative")
525+
if offset_spec is None:
526+
raise TypeError("OffsetSpec cannot be None")
527+
if not isinstance(offset_spec, OffsetSpec):
528+
raise TypeError("Value must be a OffsetSpec")
529+
530+
if 'isolation_level' in kwargs:
531+
if not isinstance(kwargs['isolation_level'], _IsolationLevel):
532+
raise TypeError("isolation_level argument should be an IsolationLevel")
533+
482534
def create_topics(self, new_topics, **kwargs):
483535
"""
484536
Create one or more new topics.
@@ -1103,5 +1155,45 @@ def alter_user_scram_credentials(self, alterations, **kwargs):
11031155
AdminClient._make_user_scram_credentials_result)
11041156

11051157
super(AdminClient, self).alter_user_scram_credentials(alterations, f, **kwargs)
1158+
return futmap
1159+
1160+
def list_offsets(self, topic_partition_offsets, **kwargs):
1161+
"""
1162+
Enables to find the beginning offset,
1163+
end offset as well as the offset matching a timestamp
1164+
or the offset with max timestamp in partitions.
1165+
1166+
:param dict([TopicPartition, OffsetSpec]) topic_partition_offsets: Dictionary of
1167+
TopicPartition objects associated with the corresponding OffsetSpec to query for.
1168+
:param IsolationLevel isolation_level: The isolation level to use when
1169+
querying.
1170+
:param float request_timeout: The overall request timeout in seconds,
1171+
including broker lookup, request transmission, operation time
1172+
on broker, and response. Default: `socket.timeout.ms*1000.0`
1173+
1174+
:returns: A dict of futures keyed by TopicPartition.
1175+
The future result() method returns ListOffsetsResultInfo
1176+
raises KafkaException
1177+
1178+
:rtype: dict[TopicPartition, future]
1179+
1180+
:raises TypeError: Invalid input type.
1181+
:raises ValueError: Invalid input value.
1182+
"""
1183+
AdminClient._check_list_offsets_request(topic_partition_offsets, kwargs)
1184+
1185+
if 'isolation_level' in kwargs:
1186+
kwargs['isolation_level_value'] = kwargs['isolation_level'].value
1187+
del kwargs['isolation_level']
1188+
1189+
topic_partition_offsets_list = [
1190+
_TopicPartition(topic_partition.topic, int(topic_partition.partition),
1191+
int(offset_spec._value))
1192+
for topic_partition, offset_spec in topic_partition_offsets.items()]
1193+
1194+
f, futmap = AdminClient._make_futures_v2(topic_partition_offsets_list,
1195+
_TopicPartition,
1196+
AdminClient._make_futmap_result)
11061197

1198+
super(AdminClient, self).list_offsets(topic_partition_offsets_list, f, **kwargs)
11071199
return futmap

0 commit comments

Comments
 (0)