Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Implement ListOffsetRequestV1 #841

Merged
merged 12 commits into from
Jul 24, 2018
4 changes: 2 additions & 2 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ def reset_offsets(self, partition_offsets=None):
For each value provided in `partition_offsets`: if the value is an integer,
immediately reset the partition's internal offset counter to that value. If
it's a `datetime.datetime` instance or a valid `OffsetType`, issue an
`OffsetRequest` using that timestamp value to discover the latest offset
it's a `datetime.datetime` instance or a valid `OffsetType`, issue a
`ListOffsetRequest` using that timestamp value to discover the latest offset
in the latest log segment before that timestamp, then set the partition's
internal counter to that value.
Expand Down
8 changes: 5 additions & 3 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .exceptions import LeaderNotAvailable, SocketDisconnectedError
from .handlers import RequestHandler
from .protocol import (
FetchRequest, FetchResponse, OffsetRequest, OffsetResponse, MetadataRequest,
FetchRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest,
MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest,
OffsetFetchResponse, ProduceResponse, JoinGroupRequest, JoinGroupResponse,
SyncGroupRequest, SyncGroupResponse, HeartbeatRequest, HeartbeatResponse,
Expand Down Expand Up @@ -349,8 +349,10 @@ def request_offset_limits(self, partition_requests):
:type partition_requests: Iterable of
:class:`pykafka.protocol.PartitionOffsetRequest`
"""
future = self._req_handler.request(OffsetRequest(partition_requests))
return future.get(OffsetResponse)
request_class = ListOffsetRequest.get_version_impl(self._api_versions)
response_class = ListOffsetResponse.get_version_impl(self._api_versions)
future = self._req_handler.request(request_class(partition_requests))
return future.get(response_class)

@_check_handler
def request_metadata(self, topics=None):
Expand Down
117 changes: 109 additions & 8 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | ListOffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response => CorrelationId ResponseMessage
  CorrelationId => int32
  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | ListOffsetResponse | OffsetCommitResponse | OffsetFetchResponse
"""
__all__ = [
"MetadataRequest", "MetadataResponse", "ProduceRequest", "ProduceResponse",
"OffsetRequest", "OffsetResponse", "OffsetCommitRequest",
"ListOffsetRequest", "ListOffsetResponse", "OffsetCommitRequest",
"FetchRequest", "FetchResponse", "PartitionFetchRequest",
"OffsetCommitResponse", "OffsetFetchRequest", "OffsetFetchResponse",
"PartitionOffsetRequest", "GroupCoordinatorRequest",
Expand Down Expand Up @@ -1165,20 +1165,26 @@ class PartitionOffsetRequest(_PartitionOffsetRequest):
pass


class OffsetRequest(Request):
class ListOffsetRequest(Request):
"""An offset request
Specification::
OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32
"""
API_VERSION = 0
API_KEY = 2

@classmethod
def get_versions(cls):
# XXX use ListOffsetRequestV1 after 0.10 message format is supported
return {0: ListOffsetRequest, 1: ListOffsetRequest}

def __init__(self, partition_requests):
"""Create a new offset request"""
self._reqs = defaultdict(dict)
Expand All @@ -1204,7 +1210,7 @@ def get_bytes(self):
:rtype: :class:`bytearray`
"""
output = bytearray(len(self))
self._write_header(output)
self._write_header(output, api_version=self.API_VERSION)
offset = self.HEADER_LEN
struct.pack_into('!ii', output, offset, -1, len(self._reqs))
offset += 8
Expand All @@ -1220,23 +1226,88 @@ def get_bytes(self):
return output


class ListOffsetRequestV1(ListOffsetRequest):
"""
Specification::
ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
"""
API_VERSION = 1

def __init__(self, partition_requests):
"""Create a new offset request"""
self._reqs = defaultdict(dict)
for t in partition_requests:
self._reqs[t.topic_name][t.partition_id] = t.offsets_before

def __len__(self):
"""Length of the serialized message, in bytes"""
# Header + replicaId + len(topics)
size = self.HEADER_LEN + 4 + 4
for topic, parts in iteritems(self._reqs):
# topic name + len(parts)
size += 2 + len(topic) + 4
# partition + fetch offset => for each partition
size += (4 + 8) * len(parts)
return size

def get_bytes(self):
"""Serialize the message
:returns: Serialized message
:rtype: :class:`bytearray`
"""
output = bytearray(len(self))
self._write_header(output, api_version=self.API_VERSION)
offset = self.HEADER_LEN
struct.pack_into('!ii', output, offset, -1, len(self._reqs))
offset += 8
for topic_name, partitions in iteritems(self._reqs):
fmt = '!h%dsi' % len(topic_name)
struct.pack_into(fmt, output, offset, len(topic_name),
topic_name, len(partitions))
offset += struct.calcsize(fmt)
for pnum, offsets_before in iteritems(partitions):
struct.pack_into('!iq', output, offset, pnum, offsets_before)
offset += 12
return output


OffsetPartitionResponse = namedtuple(
'OffsetPartitionResponse',
['offset', 'err']
)


class OffsetResponse(Response):
OffsetPartitionResponseV1 = namedtuple(
'OffsetPartitionResponseV1',
['offset', 'timestamp', 'err']
)


class ListOffsetResponse(Response):
"""An offset response
Specification::
OffsetResponse => [TopicName [PartitionOffsets]]
ListOffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64
"""
API_VERSION = 0
API_KEY = 2

@classmethod
def get_versions(cls):
# XXX use ListOffsetResponseV1 after 0.10 message format is supported
return {0: ListOffsetResponse, 1: ListOffsetResponse}

def __init__(self, buff):
"""Deserialize into a new Response
Expand All @@ -1254,6 +1325,36 @@ def __init__(self, buff):
partition[2], partition[1])


class ListOffsetResponseV1(ListOffsetResponse):
"""
Specification::
ListOffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode Timestamp [Offset]
  Partition => int32
  ErrorCode => int16
Timestamp => int64
  Offset => int64
"""
API_VERSION = 1

def __init__(self, buff):
"""Deserialize into a new Response
:param buff: Serialized message
:type buff: :class:`bytearray`
"""
fmt = '[S [ihq [q] ] ]'
response = struct_helpers.unpack_from(fmt, buff, 0)

self.topics = {}
for topic_name, partitions in response:
self.topics[topic_name] = {}
for partition in partitions:
self.topics[topic_name][partition[0]] = OffsetPartitionResponseV1(
partition[3], partition[2], partition[1])


class GroupCoordinatorRequest(Request):
"""A consumer metadata request
Expand Down
6 changes: 3 additions & 3 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,8 @@ def reset_offsets(self, partition_offsets=None):
For each value provided in `partition_offsets`: if the value is an integer,
immediately reset the partition's internal offset counter to that value. If
it's a `datetime.datetime` instance or a valid `OffsetType`, issue an
`OffsetRequest` using that timestamp value to discover the latest offset
it's a `datetime.datetime` instance or a valid `OffsetType`, issue a
`ListOffsetRequest` using that timestamp value to discover the latest offset
in the latest log segment before that timestamp, then set the partition's
internal counter to that value.
Expand Down Expand Up @@ -736,7 +736,7 @@ def _handle_success(parts):
list(map(owned_partition_timestamps.pop, successful))
if not parts_by_error:
continue
log.error("Error in OffsetRequest for topic '%s' (errors: %s)",
log.error("Error in ListOffsetRequest for topic '%s' (errors: %s)",
self._topic.name,
{ERROR_CODES[err]: [op.partition.id for op, _ in parts]
for err, parts in iteritems(parts_by_error)})
Expand Down
71 changes: 67 additions & 4 deletions tests/pykafka/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,12 +926,12 @@ class TestFetchAPIV2(TestFetchAPIV1):
RESPONSE_CLASS = protocol.FetchResponseV2


class TestOffsetAPI(unittest2.TestCase):
class TestListOffsetAPI(unittest2.TestCase):
maxDiff = None

def test_request(self):
preq = protocol.PartitionOffsetRequest(b'test', 0, -1, 1)
req = protocol.OffsetRequest(partition_requests=[preq, ])
req = protocol.ListOffsetRequest(partition_requests=[preq, ])
msg = req.get_bytes()
self.assertEqual(
msg,
Expand All @@ -950,7 +950,7 @@ def test_request(self):

def test_partition_error(self):
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.OffsetResponse(
response = protocol.ListOffsetResponse(
buffer(
b'\x00\x00\x00\x01' # len(topics)
b'\x00\x04' # len(topic name) # noqa
Expand All @@ -965,7 +965,7 @@ def test_partition_error(self):
self.assertEqual(response.topics[b'test'][0].err, 3)

def test_response(self):
resp = protocol.OffsetResponse(
resp = protocol.ListOffsetResponse(
buffer(
b'\x00\x00\x00\x01' # len(topics)
b'\x00\x04' # len(topic name) # noqa
Expand All @@ -980,6 +980,69 @@ def test_response(self):
self.assertEqual(resp.topics[b'test'][0].offset, [2])


class TestListOffsetAPIV1(unittest2.TestCase):
maxDiff = None

def test_request(self):
preq = protocol.PartitionOffsetRequest(b'test', 0, -1, 1)
req = protocol.ListOffsetRequestV1(partition_requests=[preq, ])
msg = req.get_bytes()
self.assertEqual(
msg,
bytearray(
# header
b'\x00\x00\x00/' # len(buffer)
b'\x00\x02' # ApiKey
b'\x00\x01' # api version
b'\x00\x00\x00\x00' # correlation id
b'\x00\x07' # len(client id)
b'pykafka' # client id # noqa
# end header
b'\xff\xff\xff\xff' # replica id
b'\x00\x00\x00\x01' # len(topics)
b'\x00\x04' # len(topic name) # noqa
b'test' # topic name
b'\x00\x00\x00\x01' # len(partitions)
b'\x00\x00\x00\x00' # partition
b'\xff\xff\xff\xff\xff\xff\xff\xff' # time
)
)

def test_partition_error(self):
# Response has a UnknownTopicOrPartition error for test/0
response = protocol.ListOffsetResponseV1(
buffer(
b'\x00\x00\x00\x01' # len(topics)
b'\x00\x04' # len(topic name) # noqa
b'test' # topic name
b'\x00\x00\x00\x01' # len(partitions)
b'\x00\x00\x00\x00' # partitoin
b'\x00\x03' # error code
b'\x00\x00\x00\x00\x00\x00\x00\x02' # timestamp
b'\x00\x00\x00\x01' # len(offsets)
b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset
)
)
self.assertEqual(response.topics[b'test'][0].err, 3)

def test_response(self):
resp = protocol.ListOffsetResponseV1(
buffer(
b'\x00\x00\x00\x01' # len(topics)
b'\x00\x04' # len(topic name) # noqa
b'test' # topic name
b'\x00\x00\x00\x01' # len(partitions)
b'\x00\x00\x00\x00' # partitoin
b'\x00\x00' # error code
b'\x00\x00\x00\x00\x00\x00\x00\x02' # timestamp
b'\x00\x00\x00\x01' # len(offsets)
b'\x00\x00\x00\x00\x00\x00\x00\x02' # offset
)
)
self.assertEqual(resp.topics[b'test'][0].offset, [2])
self.assertEqual(resp.topics[b'test'][0].timestamp, 2)


class TestOffsetCommitFetchAPI(unittest2.TestCase):
maxDiff = None

Expand Down