diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 2bf0f595a..808280230 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -703,8 +703,8 @@ def reset_offsets(self, partition_offsets=None): For each value provided in `partition_offsets`: if the value is an integer, immediately reset the partition's internal offset counter to that value. If - it's a `datetime.datetime` instance or a valid `OffsetType`, issue an - `OffsetRequest` using that timestamp value to discover the latest offset + it's a `datetime.datetime` instance or a valid `OffsetType`, issue a + `ListOffsetRequest` using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition's internal counter to that value. diff --git a/pykafka/broker.py b/pykafka/broker.py index e1e21f209..bfcdfd15c 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -24,7 +24,7 @@ from .exceptions import LeaderNotAvailable, SocketDisconnectedError from .handlers import RequestHandler from .protocol import ( - FetchRequest, FetchResponse, OffsetRequest, OffsetResponse, MetadataRequest, + FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceResponse, JoinGroupRequest, JoinGroupResponse, SyncGroupRequest, SyncGroupResponse, HeartbeatRequest, HeartbeatResponse, @@ -349,8 +349,10 @@ def request_offset_limits(self, partition_requests): :type partition_requests: Iterable of :class:`pykafka.protocol.PartitionOffsetRequest` """ - future = self._req_handler.request(OffsetRequest(partition_requests)) - return future.get(OffsetResponse) + request_class = ListOffsetRequest.get_version_impl(self._api_versions) + response_class = ListOffsetResponse.get_version_impl(self._api_versions) + future = self._req_handler.request(request_class(partition_requests)) + return future.get(response_class) @_check_handler def request_metadata(self, topics=None): diff --git a/pykafka/protocol.py b/pykafka/protocol.py index d1bc869f5..5901590b8 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -38,15 +38,15 @@   ApiVersion => int16   CorrelationId => int32   ClientId => string -  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest +  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | ListOffsetRequest | OffsetCommitRequest | OffsetFetchRequest Response => CorrelationId ResponseMessage   CorrelationId => int32 -  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse +  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | ListOffsetResponse | OffsetCommitResponse | OffsetFetchResponse """ __all__ = [ "MetadataRequest", "MetadataResponse", "ProduceRequest", "ProduceResponse", - "OffsetRequest", "OffsetResponse", "OffsetCommitRequest", + "ListOffsetRequest", "ListOffsetResponse", "OffsetCommitRequest", "FetchRequest", "FetchResponse", "PartitionFetchRequest", "OffsetCommitResponse", "OffsetFetchRequest", "OffsetFetchResponse", "PartitionOffsetRequest", "GroupCoordinatorRequest", @@ -1165,20 +1165,26 @@ class PartitionOffsetRequest(_PartitionOffsetRequest): pass -class OffsetRequest(Request): +class ListOffsetRequest(Request): """An offset request Specification:: - OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] + ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]   ReplicaId => int32   TopicName => string   Partition => int32   Time => int64   MaxNumberOfOffsets => int32 """ + API_VERSION = 0 API_KEY = 2 + @classmethod + def get_versions(cls): + # XXX use ListOffsetRequestV1 after 0.10 message format is supported + return {0: ListOffsetRequest, 1: ListOffsetRequest} + def __init__(self, partition_requests): """Create a new offset request""" self._reqs = defaultdict(dict) @@ -1204,7 +1210,7 @@ def get_bytes(self): :rtype: :class:`bytearray` """ output = bytearray(len(self)) - self._write_header(output) + self._write_header(output, api_version=self.API_VERSION) offset = self.HEADER_LEN struct.pack_into('!ii', output, offset, -1, len(self._reqs)) offset += 8 @@ -1220,23 +1226,88 @@ def get_bytes(self): return output +class ListOffsetRequestV1(ListOffsetRequest): + """ + Specification:: + + ListOffsetRequest => ReplicaId [TopicName [Partition Time]] + ReplicaId => int32 + TopicName => string + Partition => int32 + Time => int64 + """ + API_VERSION = 1 + + def __init__(self, partition_requests): + """Create a new offset request""" + self._reqs = defaultdict(dict) + for t in partition_requests: + self._reqs[t.topic_name][t.partition_id] = t.offsets_before + + def __len__(self): + """Length of the serialized message, in bytes""" + # Header + replicaId + len(topics) + size = self.HEADER_LEN + 4 + 4 + for topic, parts in iteritems(self._reqs): + # topic name + len(parts) + size += 2 + len(topic) + 4 + # partition + fetch offset => for each partition + size += (4 + 8) * len(parts) + return size + + def get_bytes(self): + """Serialize the message + + :returns: Serialized message + :rtype: :class:`bytearray` + """ + output = bytearray(len(self)) + self._write_header(output, api_version=self.API_VERSION) + offset = self.HEADER_LEN + struct.pack_into('!ii', output, offset, -1, len(self._reqs)) + offset += 8 + for topic_name, partitions in iteritems(self._reqs): + fmt = '!h%dsi' % len(topic_name) + struct.pack_into(fmt, output, offset, len(topic_name), + topic_name, len(partitions)) + offset += struct.calcsize(fmt) + for pnum, offsets_before in iteritems(partitions): + struct.pack_into('!iq', output, offset, pnum, offsets_before) + offset += 12 + return output + + OffsetPartitionResponse = namedtuple( 'OffsetPartitionResponse', ['offset', 'err'] ) -class OffsetResponse(Response): +OffsetPartitionResponseV1 = namedtuple( + 'OffsetPartitionResponseV1', + ['offset', 'timestamp', 'err'] +) + + +class ListOffsetResponse(Response): """An offset response Specification:: - OffsetResponse => [TopicName [PartitionOffsets]] + ListOffsetResponse => [TopicName [PartitionOffsets]]   PartitionOffsets => Partition ErrorCode [Offset]   Partition => int32   ErrorCode => int16   Offset => int64 """ + API_VERSION = 0 + API_KEY = 2 + + @classmethod + def get_versions(cls): + # XXX use ListOffsetResponseV1 after 0.10 message format is supported + return {0: ListOffsetResponse, 1: ListOffsetResponse} + def __init__(self, buff): """Deserialize into a new Response @@ -1254,6 +1325,36 @@ def __init__(self, buff): partition[2], partition[1]) +class ListOffsetResponseV1(ListOffsetResponse): + """ + Specification:: + + ListOffsetResponse => [TopicName [PartitionOffsets]] +   PartitionOffsets => Partition ErrorCode Timestamp [Offset] +   Partition => int32 +   ErrorCode => int16 + Timestamp => int64 +   Offset => int64 + """ + API_VERSION = 1 + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = '[S [ihq [q] ] ]' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.topics = {} + for topic_name, partitions in response: + self.topics[topic_name] = {} + for partition in partitions: + self.topics[topic_name][partition[0]] = OffsetPartitionResponseV1( + partition[3], partition[2], partition[1]) + + class GroupCoordinatorRequest(Request): """A consumer metadata request diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 60a7974b7..385342776 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -674,8 +674,8 @@ def reset_offsets(self, partition_offsets=None): For each value provided in `partition_offsets`: if the value is an integer, immediately reset the partition's internal offset counter to that value. If - it's a `datetime.datetime` instance or a valid `OffsetType`, issue an - `OffsetRequest` using that timestamp value to discover the latest offset + it's a `datetime.datetime` instance or a valid `OffsetType`, issue a + `ListOffsetRequest` using that timestamp value to discover the latest offset in the latest log segment before that timestamp, then set the partition's internal counter to that value. @@ -736,7 +736,7 @@ def _handle_success(parts): list(map(owned_partition_timestamps.pop, successful)) if not parts_by_error: continue - log.error("Error in OffsetRequest for topic '%s' (errors: %s)", + log.error("Error in ListOffsetRequest for topic '%s' (errors: %s)", self._topic.name, {ERROR_CODES[err]: [op.partition.id for op, _ in parts] for err, parts in iteritems(parts_by_error)}) diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 24d2addd2..fc749bd4a 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -926,12 +926,12 @@ class TestFetchAPIV2(TestFetchAPIV1): RESPONSE_CLASS = protocol.FetchResponseV2 -class TestOffsetAPI(unittest2.TestCase): +class TestListOffsetAPI(unittest2.TestCase): maxDiff = None def test_request(self): preq = protocol.PartitionOffsetRequest(b'test', 0, -1, 1) - req = protocol.OffsetRequest(partition_requests=[preq, ]) + req = protocol.ListOffsetRequest(partition_requests=[preq, ]) msg = req.get_bytes() self.assertEqual( msg, @@ -950,7 +950,7 @@ def test_request(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 - response = protocol.OffsetResponse( + response = protocol.ListOffsetResponse( buffer( b'\x00\x00\x00\x01' # len(topics) b'\x00\x04' # len(topic name) # noqa @@ -965,7 +965,7 @@ def test_partition_error(self): self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): - resp = protocol.OffsetResponse( + resp = protocol.ListOffsetResponse( buffer( b'\x00\x00\x00\x01' # len(topics) b'\x00\x04' # len(topic name) # noqa @@ -980,6 +980,69 @@ def test_response(self): self.assertEqual(resp.topics[b'test'][0].offset, [2]) +class TestListOffsetAPIV1(unittest2.TestCase): + maxDiff = None + + def test_request(self): + preq = protocol.PartitionOffsetRequest(b'test', 0, -1, 1) + req = protocol.ListOffsetRequestV1(partition_requests=[preq, ]) + msg = req.get_bytes() + self.assertEqual( + msg, + bytearray( + # header + b'\x00\x00\x00/' # len(buffer) + b'\x00\x02' # ApiKey + b'\x00\x01' # api version + b'\x00\x00\x00\x00' # correlation id + b'\x00\x07' # len(client id) + b'pykafka' # client id # noqa + # end header + b'\xff\xff\xff\xff' # replica id + b'\x00\x00\x00\x01' # len(topics) + b'\x00\x04' # len(topic name) # noqa + b'test' # topic name + b'\x00\x00\x00\x01' # len(partitions) + b'\x00\x00\x00\x00' # partition + b'\xff\xff\xff\xff\xff\xff\xff\xff' # time + ) + ) + + def test_partition_error(self): + # Response has a UnknownTopicOrPartition error for test/0 + response = protocol.ListOffsetResponseV1( + buffer( + b'\x00\x00\x00\x01' # len(topics) + b'\x00\x04' # len(topic name) # noqa + b'test' # topic name + b'\x00\x00\x00\x01' # len(partitions) + b'\x00\x00\x00\x00' # partitoin + b'\x00\x03' # error code + b'\x00\x00\x00\x00\x00\x00\x00\x02' # timestamp + b'\x00\x00\x00\x01' # len(offsets) + b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset + ) + ) + self.assertEqual(response.topics[b'test'][0].err, 3) + + def test_response(self): + resp = protocol.ListOffsetResponseV1( + buffer( + b'\x00\x00\x00\x01' # len(topics) + b'\x00\x04' # len(topic name) # noqa + b'test' # topic name + b'\x00\x00\x00\x01' # len(partitions) + b'\x00\x00\x00\x00' # partitoin + b'\x00\x00' # error code + b'\x00\x00\x00\x00\x00\x00\x00\x02' # timestamp + b'\x00\x00\x00\x01' # len(offsets) + b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset + ) + ) + self.assertEqual(resp.topics[b'test'][0].offset, [2]) + self.assertEqual(resp.topics[b'test'][0].timestamp, 2) + + class TestOffsetCommitFetchAPI(unittest2.TestCase): maxDiff = None