From d5970967af0fddb0dcf88c4cd7ed3e537cff59d2 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:08:09 +0000 Subject: [PATCH 01/11] implement ListOffsetRequestV1 and change OffsetRequest to ListOffsetRequest --- pykafka/protocol.py | 64 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index d1bc869f5..67fd2f8c4 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -38,7 +38,7 @@   ApiVersion => int16   CorrelationId => int32   ClientId => string -  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest +  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | ListOffsetRequest | OffsetCommitRequest | OffsetFetchRequest Response => CorrelationId ResponseMessage   CorrelationId => int32 @@ -46,7 +46,7 @@ """ __all__ = [ "MetadataRequest", "MetadataResponse", "ProduceRequest", "ProduceResponse", - "OffsetRequest", "OffsetResponse", "OffsetCommitRequest", + "ListOffsetRequest", "OffsetResponse", "OffsetCommitRequest", "FetchRequest", "FetchResponse", "PartitionFetchRequest", "OffsetCommitResponse", "OffsetFetchRequest", "OffsetFetchResponse", "PartitionOffsetRequest", "GroupCoordinatorRequest", @@ -1165,20 +1165,25 @@ class PartitionOffsetRequest(_PartitionOffsetRequest): pass -class OffsetRequest(Request): +class ListOffsetRequest(Request): """An offset request Specification:: - OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] + ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]   ReplicaId => int32   TopicName => string   Partition => int32   Time => int64   MaxNumberOfOffsets => int32 """ + API_VERSION = 0 API_KEY = 2 + @classmethod + def get_versions(cls): + return {0: ListOffsetRequest, 1: ListOffsetRequestV1} + def __init__(self, partition_requests): """Create a new offset request""" self._reqs = defaultdict(dict) @@ -1220,6 +1225,57 @@ def get_bytes(self): return output +class ListOffsetRequestV1(ListOffsetRequest): + """ + Specification:: + + ListOffsetRequest => ReplicaId [TopicName [Partition Time]] + ReplicaId => int32 + TopicName => string + Partition => int32 + Time => int64 + """ + API_VERSION = 1 + + def __init__(self, partition_requests): + """Create a new offset request""" + self._reqs = defaultdict(dict) + for t in partition_requests: + self._reqs[t.topic_name][t.partition_id] = t.offsets_before + + def __len__(self): + """Length of the serialized message, in bytes""" + # Header + replicaId + len(topics) + size = self.HEADER_LEN + 4 + 4 + for topic, parts in iteritems(self._reqs): + # topic name + len(parts) + size += 2 + len(topic) + 4 + # partition + fetch offset => for each partition + size += (4 + 8) * len(parts) + return size + + def get_bytes(self): + """Serialize the message + + :returns: Serialized message + :rtype: :class:`bytearray` + """ + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + struct.pack_into('!ii', output, offset, -1, len(self._reqs)) + offset += 8 + for topic_name, partitions in iteritems(self._reqs): + fmt = '!h%dsi' % len(topic_name) + struct.pack_into(fmt, output, offset, len(topic_name), + topic_name, len(partitions)) + offset += struct.calcsize(fmt) + for pnum, offsets_before in iteritems(partitions): + struct.pack_into('!iq', output, offset, pnum, offsets_before) + offset += 16 + return output + + OffsetPartitionResponse = namedtuple( 'OffsetPartitionResponse', ['offset', 'err'] From 49806a4697014126ffc01a3543c010bf50300026 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:08:27 +0000 Subject: [PATCH 02/11] change uses of OffsetRequest to ListOffsetRequest --- pykafka/balancedconsumer.py | 4 ++-- pykafka/broker.py | 5 +++-- pykafka/simpleconsumer.py | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 2bf0f595a..808280230 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -703,8 +703,8 @@ def reset_offsets(self, partition_offsets=None): For each value provided in `partition_offsets`: if the value is an integer, immediately reset the partition's internal offset counter to that value. If - it's a `datetime.datetime` instance or a valid `OffsetType`, issue an - `OffsetRequest` using that timestamp value to discover the latest offset + it's a `datetime.datetime` instance or a valid `OffsetType`, issue a + `ListOffsetRequest` using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition's internal counter to that value. diff --git a/pykafka/broker.py b/pykafka/broker.py index e1e21f209..d7500b0ff 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -24,7 +24,7 @@ from .exceptions import LeaderNotAvailable, SocketDisconnectedError from .handlers import RequestHandler from .protocol import ( - FetchRequest, FetchResponse, OffsetRequest, OffsetResponse, MetadataRequest, + FetchRequest, FetchResponse, ListOffsetRequest, OffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceResponse, JoinGroupRequest, JoinGroupResponse, SyncGroupRequest, SyncGroupResponse, HeartbeatRequest, HeartbeatResponse, @@ -349,7 +349,8 @@ def request_offset_limits(self, partition_requests): :type partition_requests: Iterable of :class:`pykafka.protocol.PartitionOffsetRequest` """ - future = self._req_handler.request(OffsetRequest(partition_requests)) + request_class = ListOffsetRequest.get_version_impl(self._api_versions) + future = self._req_handler.request(request_class(partition_requests)) return future.get(OffsetResponse) @_check_handler diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 60a7974b7..385342776 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -674,8 +674,8 @@ def reset_offsets(self, partition_offsets=None): For each value provided in `partition_offsets`: if the value is an integer, immediately reset the partition's internal offset counter to that value. If - it's a `datetime.datetime` instance or a valid `OffsetType`, issue an - `OffsetRequest` using that timestamp value to discover the latest offset + it's a `datetime.datetime` instance or a valid `OffsetType`, issue a + `ListOffsetRequest` using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition's internal counter to that value. @@ -736,7 +736,7 @@ def _handle_success(parts): list(map(owned_partition_timestamps.pop, successful)) if not parts_by_error: continue - log.error("Error in OffsetRequest for topic '%s' (errors: %s)", + log.error("Error in ListOffsetRequest for topic '%s' (errors: %s)", self._topic.name, {ERROR_CODES[err]: [op.partition.id for op, _ in parts] for err, parts in iteritems(parts_by_error)}) From ac8286405348019256f4b7f45dafe4ff3bed6f44 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:17:20 +0000 Subject: [PATCH 03/11] implement ListOffsetResponseV1 --- pykafka/protocol.py | 51 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 67fd2f8c4..1a599b49b 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -42,11 +42,11 @@ Response => CorrelationId ResponseMessage   CorrelationId => int32 -  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse +  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | ListOffsetResponse | OffsetCommitResponse | OffsetFetchResponse """ __all__ = [ "MetadataRequest", "MetadataResponse", "ProduceRequest", "ProduceResponse", - "ListOffsetRequest", "OffsetResponse", "OffsetCommitRequest", + "ListOffsetRequest", "ListOffsetResponse", "OffsetCommitRequest", "FetchRequest", "FetchResponse", "PartitionFetchRequest", "OffsetCommitResponse", "OffsetFetchRequest", "OffsetFetchResponse", "PartitionOffsetRequest", "GroupCoordinatorRequest", @@ -1282,17 +1282,30 @@ def get_bytes(self): ) -class OffsetResponse(Response): +OffsetPartitionResponseV1 = namedtuple( + 'OffsetPartitionResponse', + ['offset', 'timestamp', 'err'] +) + + +class ListOffsetResponse(Response): """An offset response Specification:: - OffsetResponse => [TopicName [PartitionOffsets]] + ListOffsetResponse => [TopicName [PartitionOffsets]]   PartitionOffsets => Partition ErrorCode [Offset]   Partition => int32   ErrorCode => int16   Offset => int64 """ + API_VERSION = 0 + API_KEY = 2 + + @classmethod + def get_versions(cls): + return {0: ListOffsetResponse, 1: ListOffsetResponseV1} + def __init__(self, buff): """Deserialize into a new Response @@ -1310,6 +1323,36 @@ def __init__(self, buff): partition[2], partition[1]) +class ListOffsetResponseV1(ListOffsetResponse): + """ + Specification:: + + ListOffsetResponse => [TopicName [PartitionOffsets]] +   PartitionOffsets => Partition ErrorCode Timestamp [Offset] +   Partition => int32 +   ErrorCode => int16 + Timestamp => int64 +   Offset => int64 + """ + API_VERSION = 1 + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = '[S [ihq [q] ] ]' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.topics = {} + for topic_name, partitions in response: + self.topics[topic_name] = {} + for partition in partitions: + self.topics[topic_name][partition[0]] = OffsetPartitionResponseV1( + partition[3], partition[2], partition[1]) + + class GroupCoordinatorRequest(Request): """A consumer metadata request From ad42135bbc6fe24ba84a570d1717cec35da7ed8d Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:17:52 +0000 Subject: [PATCH 04/11] use ListOffsetResponseV1 where appropriate --- pykafka/broker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pykafka/broker.py b/pykafka/broker.py index d7500b0ff..bfcdfd15c 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -24,7 +24,7 @@ from .exceptions import LeaderNotAvailable, SocketDisconnectedError from .handlers import RequestHandler from .protocol import ( - FetchRequest, FetchResponse, ListOffsetRequest, OffsetResponse, MetadataRequest, + FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceResponse, JoinGroupRequest, JoinGroupResponse, SyncGroupRequest, SyncGroupResponse, HeartbeatRequest, HeartbeatResponse, @@ -350,8 +350,9 @@ def request_offset_limits(self, partition_requests): :class:`pykafka.protocol.PartitionOffsetRequest` """ request_class = ListOffsetRequest.get_version_impl(self._api_versions) + response_class = ListOffsetResponse.get_version_impl(self._api_versions) future = self._req_handler.request(request_class(partition_requests)) - return future.get(OffsetResponse) + return future.get(response_class) @_check_handler def request_metadata(self, topics=None): From 09c11b8fe3763ba1a56a299f122a224578d0e7eb Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:18:11 +0000 Subject: [PATCH 05/11] add tests for new ListOffsetRequestV1 --- tests/pykafka/test_protocol.py | 62 +++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 24d2addd2..18af15c3c 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -926,12 +926,12 @@ class TestFetchAPIV2(TestFetchAPIV1): RESPONSE_CLASS = protocol.FetchResponseV2 -class TestOffsetAPI(unittest2.TestCase): +class TestListOffsetAPI(unittest2.TestCase): maxDiff = None def test_request(self): preq = protocol.PartitionOffsetRequest(b'test', 0, -1, 1) - req = protocol.OffsetRequest(partition_requests=[preq, ]) + req = protocol.ListOffsetRequest(partition_requests=[preq, ]) msg = req.get_bytes() self.assertEqual( msg, @@ -950,7 +950,7 @@ def test_request(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 - response = protocol.OffsetResponse( + response = protocol.ListOffsetResponse( buffer( b'\x00\x00\x00\x01' # len(topics) b'\x00\x04' # len(topic name) # noqa @@ -965,7 +965,61 @@ def test_partition_error(self): self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): - resp = protocol.OffsetResponse( + resp = protocol.ListOffsetResponse( + buffer( + b'\x00\x00\x00\x01' # len(topics) + b'\x00\x04' # len(topic name) # noqa + b'test' # topic name + b'\x00\x00\x00\x01' # len(partitions) + b'\x00\x00\x00\x00' # partitoin + b'\x00\x00' # error code + b'\x00\x00\x00\x01' # len(offsets) + b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset + ) + ) + self.assertEqual(resp.topics[b'test'][0].offset, [2]) + + +class TestListOffsetAPIV1(unittest2.TestCase): + maxDiff = None + + def test_request(self): + preq = protocol.PartitionOffsetRequest(b'test', 0, -1, 1) + req = protocol.ListOffsetRequestV1(partition_requests=[preq, ]) + msg = req.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x00\x003\x00\x02\x00\x00\x00\x00\x00\x00\x00\x07pykafka' # header + b'\xff\xff\xff\xff' # replica id + b'\x00\x00\x00\x01' # len(topics) + b'\x00\x04' # len(topic name) # noqa + b'test' # topic name + b'\x00\x00\x00\x01' # len(partitions) + b'\x00\x00\x00\x00' # partition + b'\xff\xff\xff\xff\xff\xff\xff\xff' # time + b'\x00\x00\x00\x01' # max number of offsets + ) + ) + + def test_partition_error(self): + # Response has a UnknownTopicOrPartition error for test/0 + response = protocol.ListOffsetResponseV1( + buffer( + b'\x00\x00\x00\x01' # len(topics) + b'\x00\x04' # len(topic name) # noqa + b'test' # topic name + b'\x00\x00\x00\x01' # len(partitions) + b'\x00\x00\x00\x00' # partitoin + b'\x00\x03' # error code + b'\x00\x00\x00\x01' # len(offsets) + b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset + ) + ) + self.assertEqual(response.topics[b'test'][0].err, 3) + + def test_response(self): + resp = protocol.ListOffsetResponseV1( buffer( b'\x00\x00\x00\x01' # len(topics) b'\x00\x04' # len(topic name) # noqa From 6e20a6190bccea8b2debf05a4cfed467b4067291 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 19 Jul 2018 22:31:09 +0000 Subject: [PATCH 06/11] fix protocol test bugs --- pykafka/protocol.py | 6 +++--- tests/pykafka/test_protocol.py | 12 ++++++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 1a599b49b..a5352ec07 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -1209,7 +1209,7 @@ def get_bytes(self): :rtype: :class:`bytearray` """ output = bytearray(len(self)) - self._write_header(output) + self._write_header(output, api_version=self.API_VERSION) offset = self.HEADER_LEN struct.pack_into('!ii', output, offset, -1, len(self._reqs)) offset += 8 @@ -1261,7 +1261,7 @@ def get_bytes(self): :rtype: :class:`bytearray` """ output = bytearray(len(self)) - self._write_header(output) + self._write_header(output, api_version=self.API_VERSION) offset = self.HEADER_LEN struct.pack_into('!ii', output, offset, -1, len(self._reqs)) offset += 8 @@ -1272,7 +1272,7 @@ def get_bytes(self): offset += struct.calcsize(fmt) for pnum, offsets_before in iteritems(partitions): struct.pack_into('!iq', output, offset, pnum, offsets_before) - offset += 16 + offset += 12 return output diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 18af15c3c..443728642 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -990,7 +990,14 @@ def test_request(self): self.assertEqual( msg, bytearray( - b'\x00\x00\x003\x00\x02\x00\x00\x00\x00\x00\x00\x00\x07pykafka' # header + # header + b'\x00\x00\x00/' # len(buffer) + b'\x00\x02' # ApiKey + b'\x00\x01' # api version + b'\x00\x00\x00\x00' # correlation id + b'\x00\x07' # len(client id) + b'pykafka' # client id # noqa + # end header b'\xff\xff\xff\xff' # replica id b'\x00\x00\x00\x01' # len(topics) b'\x00\x04' # len(topic name) # noqa @@ -998,7 +1005,6 @@ def test_request(self): b'\x00\x00\x00\x01' # len(partitions) b'\x00\x00\x00\x00' # partition b'\xff\xff\xff\xff\xff\xff\xff\xff' # time - b'\x00\x00\x00\x01' # max number of offsets ) ) @@ -1012,6 +1018,7 @@ def test_partition_error(self): b'\x00\x00\x00\x01' # len(partitions) b'\x00\x00\x00\x00' # partitoin b'\x00\x03' # error code + b'\x00\x00\x00\x00\x00\x00\x00\x02' # timestamp b'\x00\x00\x00\x01' # len(offsets) b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset ) @@ -1027,6 +1034,7 @@ def test_response(self): b'\x00\x00\x00\x01' # len(partitions) b'\x00\x00\x00\x00' # partitoin b'\x00\x00' # error code + b'\x00\x00\x00\x00\x00\x00\x00\x02' # timestamp b'\x00\x00\x00\x01' # len(offsets) b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset ) From 4605ae5bf652bcb577b980431de49225c5765e4f Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 21:47:51 +0000 Subject: [PATCH 07/11] turn off new version as a test --- pykafka/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index a5352ec07..a8336dd86 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -1182,7 +1182,7 @@ class ListOffsetRequest(Request): @classmethod def get_versions(cls): - return {0: ListOffsetRequest, 1: ListOffsetRequestV1} + return {0: ListOffsetRequest, 1: ListOffsetRequest} def __init__(self, partition_requests): """Create a new offset request""" From 73f366ae253c2010316136fb5dc841d1e880ba0b Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 22:30:33 +0000 Subject: [PATCH 08/11] turn off new response for testing --- pykafka/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index a8336dd86..9a5d28d06 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -1304,7 +1304,7 @@ class ListOffsetResponse(Response): @classmethod def get_versions(cls): - return {0: ListOffsetResponse, 1: ListOffsetResponseV1} + return {0: ListOffsetResponse, 1: ListOffsetResponse} def __init__(self, buff): """Deserialize into a new Response From 097e89637a3d68f5aca186c3c0de2725c81aa94c Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 23:06:41 +0000 Subject: [PATCH 09/11] test for valid timestamp value --- tests/pykafka/test_protocol.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 443728642..fc749bd4a 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -1040,6 +1040,7 @@ def test_response(self): ) ) self.assertEqual(resp.topics[b'test'][0].offset, [2]) + self.assertEqual(resp.topics[b'test'][0].timestamp, 2) class TestOffsetCommitFetchAPI(unittest2.TestCase): From 05525854c79587fe73fd14495b34ffb4494eb9b4 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Mon, 23 Jul 2018 23:13:49 +0000 Subject: [PATCH 10/11] reset versions, use correct namedtuple typename --- pykafka/protocol.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 9a5d28d06..cc5a608f2 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -1182,7 +1182,7 @@ class ListOffsetRequest(Request): @classmethod def get_versions(cls): - return {0: ListOffsetRequest, 1: ListOffsetRequest} + return {0: ListOffsetRequest, 1: ListOffsetRequestV1} def __init__(self, partition_requests): """Create a new offset request""" @@ -1283,7 +1283,7 @@ def get_bytes(self): OffsetPartitionResponseV1 = namedtuple( - 'OffsetPartitionResponse', + 'OffsetPartitionResponseV1', ['offset', 'timestamp', 'err'] ) @@ -1304,7 +1304,7 @@ class ListOffsetResponse(Response): @classmethod def get_versions(cls): - return {0: ListOffsetResponse, 1: ListOffsetResponse} + return {0: ListOffsetResponse, 1: ListOffsetResponseV1} def __init__(self, buff): """Deserialize into a new Response From 79cec434acd184b1926306a40fd57ab7bc9e8200 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 24 Jul 2018 19:15:45 +0000 Subject: [PATCH 11/11] disable new listoffsetrequest for now, since it requires new message format --- pykafka/protocol.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pykafka/protocol.py b/pykafka/protocol.py index cc5a608f2..5901590b8 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -1182,7 +1182,8 @@ class ListOffsetRequest(Request): @classmethod def get_versions(cls): - return {0: ListOffsetRequest, 1: ListOffsetRequestV1} + # XXX use ListOffsetRequestV1 after 0.10 message format is supported + return {0: ListOffsetRequest, 1: ListOffsetRequest} def __init__(self, partition_requests): """Create a new offset request""" @@ -1304,7 +1305,8 @@ class ListOffsetResponse(Response): @classmethod def get_versions(cls): - return {0: ListOffsetResponse, 1: ListOffsetResponseV1} + # XXX use ListOffsetResponseV1 after 0.10 message format is supported + return {0: ListOffsetResponse, 1: ListOffsetResponse} def __init__(self, buff): """Deserialize into a new Response