diff --git a/.travis.yml b/.travis.yml index 7ac18c5de..dde2a7e5c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,6 @@ sudo: false cache: directories: - $HOME/.cache/pip - - $HOME/kafka-bin - $HOME/.ccache python: - "2.7" @@ -18,7 +17,7 @@ env: - KAFKA_BIN="$HOME/kafka-bin" matrix: - KAFKA_VERSION=0.8.2.2 - - KAFKA_VERSION=0.9.0.0 + - KAFKA_VERSION=0.9.0.1 addons: apt: diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 310bebf97..719410305 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -36,7 +36,7 @@ from .exceptions import KafkaException, PartitionOwnedError, ConsumerStoppedException from .handlers import GEventHandler from .simpleconsumer import SimpleConsumer -from .utils.compat import range, get_bytes, itervalues, iteritems +from .utils.compat import range, get_bytes, itervalues, iteritems, get_string try: from . import rdkafka except ImportError: @@ -210,6 +210,7 @@ def __init__(self, self._zookeeper_connection_timeout_ms = zookeeper_connection_timeout_ms self._reset_offset_on_start = reset_offset_on_start self._post_rebalance_callback = post_rebalance_callback + self._generation_id = -1 self._running = False self._worker_exception = None self._worker_trace_logged = False @@ -223,10 +224,10 @@ def __init__(self, self._rebalancing_lock = cluster.handler.Lock() self._consumer = None - self._consumer_id = "{hostname}:{uuid}".format( + self._consumer_id = get_bytes("{hostname}:{uuid}".format( hostname=socket.gethostname(), uuid=uuid4() - ) + )) self._setting_watches = True self._topic_path = '/consumers/{group}/owners/{topic}'.format( @@ -289,7 +290,7 @@ def held_offsets(self): return self._consumer.held_offsets def start(self): - """Open connections and join a cluster.""" + """Open connections and join a consumer group.""" try: if self._zookeeper is None: self._setup_zookeeper(self._zookeeper_connect, @@ -347,7 +348,29 @@ def _setup_zookeeper(self, zookeeper_connect, timeout): def _setup_internal_consumer(self, partitions=None, start=True): """Instantiate an internal SimpleConsumer instance""" - self._consumer = self._get_internal_consumer(partitions=partitions, start=start) + if partitions is None: + partitions = [] + # Only re-create internal consumer if something changed. + if partitions != self._partitions: + cns = self._get_internal_consumer(partitions=list(partitions), start=start) + if self._post_rebalance_callback is not None: + old_offsets = (self._consumer.held_offsets + if self._consumer else dict()) + new_offsets = cns.held_offsets + try: + reset_offsets = self._post_rebalance_callback( + self, old_offsets, new_offsets) + except Exception: + log.exception("post rebalance callback threw an exception") + self._worker_exception = sys.exc_info() + return False + + if reset_offsets: + cns.reset_offsets(partition_offsets=[ + (cns.partitions[id_], offset) for + (id_, offset) in iteritems(reset_offsets)]) + self._consumer = cns + return True def _get_internal_consumer(self, partitions=None, start=True): """Instantiate a SimpleConsumer for internal use. @@ -385,10 +408,12 @@ def _get_internal_consumer(self, partitions=None, start=True): auto_offset_reset=self._auto_offset_reset, reset_offset_on_start=reset_offset_on_start, auto_start=start, - compacted_topic=self._is_compacted_topic + compacted_topic=self._is_compacted_topic, + generation_id=self._generation_id, + consumer_id=self._consumer_id ) - def _decide_partitions(self, participants): + def _decide_partitions(self, participants, consumer_id=None): """Decide which partitions belong to this consumer. Uses the consumer rebalancing algorithm described here @@ -402,6 +427,8 @@ def _decide_partitions(self, participants): :param participants: Sorted list of ids of all other consumers in this consumer group. :type participants: Iterable of `bytes` + :param consumer_id: The ID of the consumer for which to generate a partition + assignment. Defaults to `self._consumer_id` """ # Freeze and sort partitions so we always have the same results p_to_str = lambda p: '-'.join([str(p.topic.name), str(p.leader.id), str(p.id)]) @@ -410,7 +437,7 @@ def _decide_partitions(self, participants): # get start point, # of partitions, and remainder participants = sorted(participants) # just make sure it's sorted. - idx = participants.index(self._consumer_id) + idx = participants.index(consumer_id or self._consumer_id) parts_per_consumer = len(all_parts) // len(participants) remainder_ppc = len(all_parts) % len(participants) @@ -420,15 +447,16 @@ def _decide_partitions(self, participants): # assign partitions from i*N to (i+1)*N - 1 to consumer Ci new_partitions = itertools.islice(all_parts, start, start + num_parts) new_partitions = set(new_partitions) - log.info('Balancing %i participants for %i partitions.\nOwning %i partitions.', - len(participants), len(all_parts), len(new_partitions)) + log.info('%s: Balancing %i participants for %i partitions. Owning %i partitions.', + self._consumer_id, len(participants), len(all_parts), + len(new_partitions)) log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions]) return new_partitions def _get_participants(self): """Use zookeeper to get the other consumers of this topic. - :return: A sorted list of the ids of the other consumers of this + :return: A sorted list of the ids of other consumers of this consumer's topic """ try: @@ -443,9 +471,9 @@ def _get_participants(self): try: topic, stat = self._zookeeper.get("%s/%s" % (self._consumer_id_path, id_)) if topic == self._topic.name: - participants.append(id_) + participants.append(get_bytes(id_)) except NoNodeException: - pass # disappeared between ``get_children`` and ``get`` + pass # node disappeared between ``get_children`` and ``get`` participants = sorted(participants) return participants @@ -520,11 +548,50 @@ def _path_self(self): """Path where this consumer should be registered in zookeeper""" return '{path}/{id_}'.format( path=self._consumer_id_path, - id_=self._consumer_id + # get_string is necessary to avoid writing literal "b'" to zookeeper + id_=get_string(self._consumer_id) ) + def _update_member_assignment(self): + """Decide and assign new partitions for this consumer""" + for i in range(self._rebalance_max_retries): + try: + # If retrying, be sure to make sure the + # partition allocation is correct. + participants = self._get_participants() + if self._consumer_id not in participants: + # situation that only occurs if our zk session expired + self._add_self() + participants.append(self._consumer_id) + + new_partitions = self._decide_partitions(participants) + if not new_partitions: + log.warning("No partitions assigned to consumer %s", + self._consumer_id) + + # Update zk with any changes: + # Note that we explicitly fetch our set of held partitions + # from zk, rather than assuming it will be identical to + # `self.partitions`. This covers the (rare) situation + # where due to an interrupted connection our zk session + # has expired, in which case we'd hold zero partitions on + # zk, but `self._partitions` may be outdated and non-empty + current_zk_parts = self._get_held_partitions() + self._remove_partitions(current_zk_parts - new_partitions) + self._add_partitions(new_partitions - current_zk_parts) + if self._setup_internal_consumer(new_partitions): + log.info('Rebalancing Complete.') + break + except PartitionOwnedError as ex: + if i == self._rebalance_max_retries - 1: + log.warning('Failed to acquire partition %s after %d retries.', + ex.partition, i) + raise + log.info('Unable to acquire partition %s. Retrying', ex.partition) + self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000)) + def _rebalance(self): - """Claim partitions for this consumer. + """Start the rebalancing process for this consumer This method is called whenever a zookeeper watch is triggered. """ @@ -536,65 +603,8 @@ def _rebalance(self): if not self._running: raise ConsumerStoppedException log.info('Rebalancing consumer "%s" for topic "%s".' % ( - self._consumer_id, self._topic.name) - ) - - for i in range(self._rebalance_max_retries): - try: - # If retrying, be sure to make sure the - # partition allocation is correct. - participants = self._get_participants() - if self._consumer_id not in participants: - # situation that only occurs if our zk session expired - self._add_self() - participants.append(self._consumer_id) - - new_partitions = self._decide_partitions(participants) - if not new_partitions: - log.warning("No partitions assigned to consumer %s", - self._consumer_id) - - # Update zk with any changes: - # Note that we explicitly fetch our set of held partitions - # from zk, rather than assuming it will be identical to - # `self.partitions`. This covers the (rare) situation - # where due to an interrupted connection our zk session - # has expired, in which case we'd hold zero partitions on - # zk, but `self._partitions` may be outdated and non-empty - current_zk_parts = self._get_held_partitions() - self._remove_partitions(current_zk_parts - new_partitions) - self._add_partitions(new_partitions - current_zk_parts) - - # Only re-create internal consumer if something changed. - if new_partitions != self._partitions: - cns = self._get_internal_consumer(list(new_partitions)) - if self._post_rebalance_callback is not None: - old_offsets = (self._consumer.held_offsets - if self._consumer else dict()) - new_offsets = cns.held_offsets - try: - reset_offsets = self._post_rebalance_callback( - self, old_offsets, new_offsets) - except Exception as ex: - log.exception("post rebalance callback threw an exception") - self._worker_exception = sys.exc_info() - break - - if reset_offsets: - cns.reset_offsets(partition_offsets=[ - (cns.partitions[id_], offset) for - (id_, offset) in iteritems(reset_offsets)]) - self._consumer = cns - - log.info('Rebalancing Complete.') - break - except PartitionOwnedError as ex: - if i == self._rebalance_max_retries - 1: - log.warning('Failed to acquire partition %s after %d retries.', - ex.partition, i) - raise - log.info('Unable to acquire partition %s. Retrying', ex.partition) - self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000)) + self._consumer_id, self._topic.name)) + self._update_member_assignment() def _path_from_partition(self, p): """Given a partition, return its path in zookeeper. diff --git a/pykafka/broker.py b/pykafka/broker.py index 821fd4156..a4a6c5419 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -24,12 +24,12 @@ from .exceptions import LeaderNotAvailable, SocketDisconnectedError from .handlers import RequestHandler from .protocol import ( - FetchRequest, FetchResponse, OffsetRequest, - OffsetResponse, MetadataRequest, MetadataResponse, - OffsetCommitRequest, OffsetCommitResponse, - OffsetFetchRequest, OffsetFetchResponse, - ProduceResponse) -from .utils.compat import range, iteritems + FetchRequest, FetchResponse, OffsetRequest, OffsetResponse, MetadataRequest, + MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, + OffsetFetchResponse, ProduceResponse, JoinGroupRequest, JoinGroupResponse, + SyncGroupRequest, SyncGroupResponse, HeartbeatRequest, HeartbeatResponse, + LeaveGroupRequest, LeaveGroupResponse) +from .utils.compat import range, iteritems, get_bytes log = logging.getLogger(__name__) @@ -89,6 +89,7 @@ def __init__(self, self._socket_timeout_ms = socket_timeout_ms self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms self._buffer_size = buffer_size + self._req_handlers = {} self.connect() def __repr__(self): @@ -194,6 +195,7 @@ def connect(self): :class:`pykafka.handlers.RequestHandler` for this broker """ self._connection = BrokerConnection(self.host, self.port, + self._handler, buffer_size=self._buffer_size, source_host=self._source_host, source_port=self._source_port) @@ -209,7 +211,8 @@ def connect_offsets_channel(self): channel """ self._offsets_channel_connection = BrokerConnection( - self.host, self.port, buffer_size=self._buffer_size, + self.host, self.port, self._handler, + buffer_size=self._buffer_size, source_host=self._source_host, source_port=self._source_port) self._offsets_channel_connection.connect(self._offsets_channel_socket_timeout_ms) self._offsets_channel_req_handler = RequestHandler( @@ -217,6 +220,32 @@ def connect_offsets_channel(self): ) self._offsets_channel_req_handler.start() + def _get_unique_req_handler(self, connection_id): + """Return a RequestHandler instance unique to the given connection_id + + In some applications, for example the Group Membership API, requests running + in the same process must be interleaved. When both of these requests are + using the same RequestHandler instance, the requests are queued and the + interleaving semantics are not upheld. This method behaves identically to + self._req_handler if there is only one connection_id per KafkaClient. + If a single KafkaClient needs to use more than one connection_id, this + method maintains a dictionary of connections unique to those ids. + + :param connection_id: The unique identifier of the connection to return + :type connection_id: str + """ + if len(self._req_handlers) == 0: + self._req_handlers[connection_id] = self._req_handler + elif connection_id not in self._req_handlers: + conn = BrokerConnection( + self.host, self.port, self._handler, buffer_size=self._buffer_size, + source_host=self._source_host, source_port=self._source_port) + conn.connect(self._socket_timeout_ms) + handler = RequestHandler(self._handler, conn) + handler.start() + self._req_handlers[connection_id] = handler + return self._req_handlers[connection_id] + def fetch_messages(self, partition_requests, timeout=30000, @@ -327,7 +356,7 @@ def commit_consumer_group_offsets(self, if not self.offsets_channel_connected: self.connect_offsets_channel() req = OffsetCommitRequest(consumer_group, - consumer_group_generation_id, + get_bytes(consumer_group_generation_id), consumer_id, partition_requests=preqs) return self._offsets_channel_req_handler.request(req).get(OffsetCommitResponse) @@ -348,3 +377,82 @@ def fetch_consumer_group_offsets(self, consumer_group, preqs): self.connect_offsets_channel() req = OffsetFetchRequest(consumer_group, partition_requests=preqs) return self._offsets_channel_req_handler.request(req).get(OffsetFetchResponse) + + ########################## + # Group Membership API # + ########################## + + def join_group(self, connection_id, consumer_group, member_id): + """Send a JoinGroupRequest + + :param connection_id: The unique identifier of the connection on which to make + this request + :type connection_id: str + :param consumer_group: The name of the consumer group to join + :type consumer_group: bytes + :param member_id: The ID of the consumer joining the group + :type member_id: bytes + """ + handler = self._get_unique_req_handler(connection_id) + future = handler.request(JoinGroupRequest(consumer_group, member_id)) + self._handler.sleep() + return future.get(JoinGroupResponse) + + def leave_group(self, connection_id, consumer_group, member_id): + """Send a LeaveGroupRequest + + :param connection_id: The unique identifier of the connection on which to make + this request + :type connection_id: str + :param consumer_group: The name of the consumer group to leave + :type consumer_group: bytes + :param member_id: The ID of the consumer leaving the group + :type member_id: bytes + """ + handler = self._get_unique_req_handler(connection_id) + future = handler.request(LeaveGroupRequest(consumer_group, member_id)) + return future.get(LeaveGroupResponse) + + def sync_group(self, connection_id, consumer_group, generation_id, member_id, group_assignment): + """Send a SyncGroupRequest + + :param connection_id: The unique identifier of the connection on which to make + this request + :type connection_id: str + :param consumer_group: The name of the consumer group to which this consumer + belongs + :type consumer_group: bytes + :param generation_id: The current generation for the consumer group + :type generation_id: int + :param member_id: The ID of the consumer syncing + :type member_id: bytes + :param group_assignment: A sequence of :class:`pykafka.protocol.MemberAssignment` + instances indicating the partition assignments for each member of the group. + When `sync_group` is called by a member other than the leader of the group, + `group_assignment` should be an empty sequence. + :type group_assignment: iterable of :class:`pykafka.protocol.MemberAssignment` + """ + handler = self._get_unique_req_handler(connection_id) + future = handler.request( + SyncGroupRequest(consumer_group, generation_id, member_id, group_assignment)) + return future.get(SyncGroupResponse) + + def heartbeat(self, connection_id, consumer_group, generation_id, member_id): + """Send a HeartbeatRequest + + :param connection_id: The unique identifier of the connection on which to make + this request + :type connection_id: str + :param consumer_group: The name of the consumer group to which this consumer + belongs + :type consumer_group: bytes + :param generation_id: The current generation for the consumer group + :type generation_id: int + :param member_id: The ID of the consumer sending this heartbeat + :type member_id: bytes + """ + handler = self._get_unique_req_handler(connection_id) + future = handler.request( + HeartbeatRequest(consumer_group, generation_id, member_id)) + self._handler.sleep() + return future.get(HeartbeatResponse) diff --git a/pykafka/cli/kafka_tools.py b/pykafka/cli/kafka_tools.py index dafd4d2c8..37a28ccc7 100644 --- a/pykafka/cli/kafka_tools.py +++ b/pykafka/cli/kafka_tools.py @@ -227,7 +227,7 @@ def reset_offsets(client, args): for partition_id, res in offsets.iteritems()] # Send them to the appropriate broker. - broker = client.cluster.get_offset_manager(args.consumer_group) + broker = client.cluster.get_group_coordinator(args.consumer_group) broker.commit_consumer_group_offsets( args.consumer_group, 1, 'kafka-tools', reqs ) diff --git a/pykafka/cluster.py b/pykafka/cluster.py index d73ead3e8..d394ed773 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -28,11 +28,11 @@ from .broker import Broker from .exceptions import (ERROR_CODES, - ConsumerCoordinatorNotAvailable, + GroupCoordinatorNotAvailable, NoBrokersAvailableError, SocketDisconnectedError, LeaderNotAvailable) -from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse +from .protocol import GroupCoordinatorRequest, GroupCoordinatorResponse from .topic import Topic from .utils.compat import iteritems, range @@ -366,8 +366,8 @@ def _update_brokers(self, broker_metadata): # needed. raise Exception('Broker host/port change detected! %s', broker) - def get_offset_manager(self, consumer_group): - """Get the broker designated as the offset manager for this consumer group. + def get_group_coordinator(self, consumer_group): + """Get the broker designated as the group coordinator for this consumer group. Based on Step 1 at https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka @@ -385,11 +385,11 @@ def get_offset_manager(self, consumer_group): log.debug("Retrying offset manager discovery") time.sleep(i * 2) - req = ConsumerMetadataRequest(consumer_group) + req = GroupCoordinatorRequest(consumer_group) future = broker.handler.request(req) try: - res = future.get(ConsumerMetadataResponse) - except ConsumerCoordinatorNotAvailable: + res = future.get(GroupCoordinatorResponse) + except GroupCoordinatorNotAvailable: log.error('Error discovering offset manager.') if i == self._max_connection_retries - 1: raise diff --git a/pykafka/connection.py b/pykafka/connection.py index efbcebb8a..960aa13fa 100644 --- a/pykafka/connection.py +++ b/pykafka/connection.py @@ -38,6 +38,7 @@ class BrokerConnection(object): def __init__(self, host, port, + handler, buffer_size=1024 * 1024, source_host='', source_port=0): @@ -47,6 +48,9 @@ def __init__(self, :type host: str :param port: The port on the host to which to connect :type port: int + :param handler: The :class:`pykafka.handlers.Handler` instance to use when + creating a connection + :type handler: :class:`pykafka.handlers.Handler` :param buffer_size: The size (in bytes) of the buffer in which to hold response data. :type buffer_size: int @@ -60,6 +64,7 @@ def __init__(self, self._buff = bytearray(buffer_size) self.host = host self.port = port + self._handler = handler self._socket = None self.source_host = source_host self.source_port = source_port @@ -76,7 +81,7 @@ def connected(self): def connect(self, timeout): """Connect to the broker.""" log.debug("Connecting to %s:%s", self.host, self.port) - self._socket = socket.create_connection( + self._socket = self._handler.Socket.create_connection( (self.host, self.port), timeout / 1000, (self.source_host, self.source_port) diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index e39c655be..c78d2bc45 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -159,20 +159,62 @@ class GroupLoadInProgress(ProtocolClientError): ERROR_CODE = 14 -class ConsumerCoordinatorNotAvailable(ProtocolClientError): +class GroupCoordinatorNotAvailable(ProtocolClientError): """The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created. """ ERROR_CODE = 15 -class NotCoordinatorForConsumer(ProtocolClientError): +class NotCoordinatorForGroup(ProtocolClientError): """The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for. """ ERROR_CODE = 16 +class IllegalGeneration(ProtocolClientError): + """Returned from group membership requests (such as heartbeats) when the generation + id provided in the request is not the current generation + """ + ERROR_CODE = 22 + + +class InconsistentGroupProtocol(ProtocolClientError): + """Returned in join group when the member provides a protocol type or set of protocols + which is not compatible with the current group. + """ + ERROR_CODE = 23 + + +class UnknownMemberId(ProtocolClientError): + """Returned from group requests (offset commits/fetches, heartbeats, etc) when the + memberId is not in the current generation. + """ + ERROR_CODE = 25 + + +class InvalidSessionTimeout(ProtocolClientError): + """Returned in join group when the requested session timeout is outside of the allowed + range on the broker + """ + ERROR_CODE = 26 + + +class RebalanceInProgress(ProtocolClientError): + """Returned in heartbeat requests when the coordinator has begun rebalancing the + group. This indicates to the client that it should rejoin the group. + """ + ERROR_CODE = 27 + + +class GroupAuthorizationFailed(ProtocolClientError): + """Returned by the broker when the client is not authorized to access a particular + groupId. + """ + ERROR_CODE = 30 + + ERROR_CODES = dict( (exc.ERROR_CODE, exc) for exc in (UnknownError, @@ -186,8 +228,14 @@ class NotCoordinatorForConsumer(ProtocolClientError): MessageSizeTooLarge, OffsetMetadataTooLarge, GroupLoadInProgress, - ConsumerCoordinatorNotAvailable, - NotCoordinatorForConsumer) + GroupCoordinatorNotAvailable, + NotCoordinatorForGroup, + IllegalGeneration, + InconsistentGroupProtocol, + UnknownMemberId, + InvalidSessionTimeout, + RebalanceInProgress, + GroupAuthorizationFailed) ) diff --git a/pykafka/handlers.py b/pykafka/handlers.py index 551d5e775..6a0db24ed 100644 --- a/pykafka/handlers.py +++ b/pykafka/handlers.py @@ -23,8 +23,9 @@ import gevent.event import gevent.lock import gevent.queue -import gevent.lock +import gevent.socket as gsocket import logging +import socket as pysocket import threading import time @@ -81,6 +82,7 @@ class ThreadingHandler(Handler): Event = threading.Event Lock = threading.Lock Semaphore = Semaphore + Socket = pysocket _workers_spawned = 0 def sleep(self, seconds=0): @@ -112,6 +114,7 @@ class GEventHandler(Handler): Lock = gevent.lock.RLock # fixme RLock = gevent.lock.RLock Semaphore = gevent.lock.Semaphore + Socket = gsocket def sleep(self, seconds=0): gevent.sleep(seconds) diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py new file mode 100644 index 000000000..e75c250b8 --- /dev/null +++ b/pykafka/managedbalancedconsumer.py @@ -0,0 +1,377 @@ +from __future__ import division +""" +Author: Emmett Butler +""" +__license__ = """ +Copyright 2016 Parse.ly, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +__all__ = ["ManagedBalancedConsumer"] +import logging +import sys +import uuid +import weakref + +from .balancedconsumer import BalancedConsumer +from .common import OffsetType +from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup, + GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress) +from .protocol import MemberAssignment +from .utils.compat import iterkeys + +log = logging.getLogger(__name__) + + +class ManagedBalancedConsumer(BalancedConsumer): + """A self-balancing consumer that uses Kafka 0.9's Group Membership API + + Implements the Group Management API semantics for Kafka 0.9 compatibility + + Maintains a single instance of SimpleConsumer, periodically using the + consumer rebalancing algorithm to reassign partitions to this + SimpleConsumer. + + This class overrides the functionality of + :class:`pykafka.balancedconsumer.BalancedConsumer` that deals with ZooKeeper and + inherits other functionality directly. + """ + def __init__(self, + topic, + cluster, + consumer_group, + fetch_message_max_bytes=1024 * 1024, + num_consumer_fetchers=1, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + queued_max_messages=2000, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + offsets_channel_backoff_ms=1000, + offsets_commit_max_retries=5, + auto_offset_reset=OffsetType.EARLIEST, + consumer_timeout_ms=-1, + rebalance_max_retries=5, + rebalance_backoff_ms=2 * 1000, + auto_start=True, + reset_offset_on_start=False, + post_rebalance_callback=None, + use_rdkafka=False, + compacted_topic=True, + heartbeat_interval_ms=3000): + """Create a ManagedBalancedConsumer instance + + :param topic: The topic this consumer should consume + :type topic: :class:`pykafka.topic.Topic` + :param cluster: The cluster to which this consumer should connect + :type cluster: :class:`pykafka.cluster.Cluster` + :param consumer_group: The name of the consumer group this consumer + should join. + :type consumer_group: bytes + :param fetch_message_max_bytes: The number of bytes of messages to + attempt to fetch with each fetch request + :type fetch_message_max_bytes: int + :param num_consumer_fetchers: The number of workers used to make + FetchRequests + :type num_consumer_fetchers: int + :param auto_commit_enable: If true, periodically commit to kafka the + offset of messages already fetched by this consumer. This also + requires that `consumer_group` is not `None`. + :type auto_commit_enable: bool + :param auto_commit_interval_ms: The frequency (in milliseconds) at which + the consumer's offsets are committed to kafka. This setting is + ignored if `auto_commit_enable` is `False`. + :type auto_commit_interval_ms: int + :param queued_max_messages: The maximum number of messages buffered for + consumption in the internal + :class:`pykafka.simpleconsumer.SimpleConsumer` + :type queued_max_messages: int + :param fetch_min_bytes: The minimum amount of data (in bytes) that the + server should return for a fetch request. If insufficient data is + available, the request will block until sufficient data is available. + :type fetch_min_bytes: int + :param fetch_wait_max_ms: The maximum amount of time (in milliseconds) + that the server will block before answering a fetch request if + there isn't sufficient data to immediately satisfy `fetch_min_bytes`. + :type fetch_wait_max_ms: int + :param offsets_channel_backoff_ms: Backoff time to retry failed offset + commits and fetches. + :type offsets_channel_backoff_ms: int + :param offsets_commit_max_retries: The number of times the offset commit + worker should retry before raising an error. + :type offsets_commit_max_retries: int + :param auto_offset_reset: What to do if an offset is out of range. This + setting indicates how to reset the consumer's internal offset + counter when an `OffsetOutOfRangeError` is encountered. + :type auto_offset_reset: :class:`pykafka.common.OffsetType` + :param consumer_timeout_ms: Amount of time (in milliseconds) the + consumer may spend without messages available for consumption + before returning None. + :type consumer_timeout_ms: int + :param rebalance_max_retries: The number of times the rebalance should + retry before raising an error. + :type rebalance_max_retries: int + :param rebalance_backoff_ms: Backoff time (in milliseconds) between + retries during rebalance. + :type rebalance_backoff_ms: int + :param auto_start: Whether the consumer should start after __init__ is complete. + If false, it can be started with `start()`. + :type auto_start: bool + :param reset_offset_on_start: Whether the consumer should reset its + internal offset counter to `self._auto_offset_reset` and commit that + offset immediately upon starting up + :type reset_offset_on_start: bool + :param post_rebalance_callback: A function to be called when a rebalance is + in progress. This function should accept three arguments: the + :class:`pykafka.balancedconsumer.BalancedConsumer` instance that just + completed its rebalance, a dict of partitions that it owned before the + rebalance, and a dict of partitions it owns after the rebalance. These dicts + map partition ids to the most recently known offsets for those partitions. + This function can optionally return a dictionary mapping partition ids to + offsets. If it does, the consumer will reset its offsets to the supplied + values before continuing consumption. + Note that the BalancedConsumer is in a poorly defined state at + the time this callback runs, so that accessing its properties + (such as `held_offsets` or `partitions`) might yield confusing + results. Instead, the callback should really rely on the + provided partition-id dicts, which are well-defined. + :type post_rebalance_callback: function + :param use_rdkafka: Use librdkafka-backed consumer if available + :type use_rdkafka: bool + :param compacted_topic: Set to read from a compacted topic. Forces + consumer to use less stringent message ordering logic because compacted + topics do not provide offsets in stict incrementing order. + :type compacted_topic: bool + :param heartbeat_interval_ms: The amount of time in milliseconds to wait between + heartbeat requests + :type heartbeat_interval_ms: int + """ + + self._cluster = cluster + if not isinstance(consumer_group, bytes): + raise TypeError("consumer_group must be a bytes object") + self._consumer_group = consumer_group + self._topic = topic + + self._auto_commit_enable = auto_commit_enable + self._auto_commit_interval_ms = auto_commit_interval_ms + self._fetch_message_max_bytes = fetch_message_max_bytes + self._fetch_min_bytes = fetch_min_bytes + self._num_consumer_fetchers = num_consumer_fetchers + self._queued_max_messages = queued_max_messages + self._fetch_wait_max_ms = fetch_wait_max_ms + self._consumer_timeout_ms = consumer_timeout_ms + self._offsets_channel_backoff_ms = offsets_channel_backoff_ms + self._offsets_commit_max_retries = offsets_commit_max_retries + self._auto_offset_reset = auto_offset_reset + self._reset_offset_on_start = reset_offset_on_start + self._rebalance_max_retries = rebalance_max_retries + self._rebalance_backoff_ms = rebalance_backoff_ms + self._post_rebalance_callback = post_rebalance_callback + self._is_compacted_topic = compacted_topic + self._heartbeat_interval_ms = heartbeat_interval_ms + if use_rdkafka is True: + raise ImportError("use_rdkafka is not available for {}".format( + self.__class__.__name__)) + self._use_rdkafka = use_rdkafka + + self._generation_id = -1 + self._rebalancing_lock = cluster.handler.Lock() + # ManagedBalancedConsumers in the same process cannot share connections. + # This connection hash is passed to Broker calls that use the group + # membership API + self._connection_id = uuid.uuid4() + self._consumer = None + self._group_coordinator = None + self._consumer_id = b'' + self._worker_trace_logged = False + self._worker_exception = None + self._default_error_handlers = self._build_default_error_handlers() + + if auto_start is True: + self.start() + + def _setup_heartbeat_worker(self): + """Start the heartbeat worker""" + self = weakref.proxy(self) + + def fetcher(): + while True: + try: + if not self._running: + break + + log.info("Sending heartbeat from consumer '%s'", self._consumer_id) + res = self._group_coordinator.heartbeat(self._connection_id, + self._consumer_group, + self._generation_id, + self._consumer_id) + if res.error_code != 0: + log.info("Error code %d encountered on heartbeat.", + res.error_code) + self._handle_error(res.error_code) + self._rebalance() + + self._cluster.handler.sleep(self._heartbeat_interval_ms / 1000) + except ReferenceError: + break + except Exception: + # surface all exceptions to the main thread + self._worker_exception = sys.exc_info() + break + log.debug("Heartbeat worker exiting") + log.info("Starting heartbeat worker") + return self._cluster.handler.spawn( + fetcher, name="pykafka.ManagedBalancedConsumer.heartbeats") + + def start(self): + """Start this consumer. + + Must be called before consume() if `auto_start=False`. + """ + try: + self._running = True + self._group_coordinator = self._cluster.get_group_coordinator( + self._consumer_group) + self._rebalance() + self._setup_heartbeat_worker() + except Exception: + log.exception("Stopping consumer in response to error") + self.stop() + + def stop(self): + """Stop this consumer + + Should be called as part of a graceful shutdown + """ + self._running = False + if self._consumer is not None: + self._consumer.stop() + if self._group_coordinator is not None: + self._group_coordinator.leave_group(self._connection_id, + self._consumer_group, + self._consumer_id) + + def _update_member_assignment(self): + """Join a managed consumer group and start consuming assigned partitions + + Equivalent to + `pykafka.balancedconsumer.BalancedConsumer._update_member_assignment`, + but uses the Kafka 0.9 Group Membership API instead of ZooKeeper to manage + group state + """ + for i in range(self._rebalance_max_retries): + try: + members = self._join_group() + # generate partition assignments for each group member + # if this is not the leader, join_result.members will be empty + group_assignments = [ + MemberAssignment([ + (self._topic.name, + [p.id for p in self._decide_partitions( + iterkeys(members), consumer_id=member_id)]) + ], member_id=member_id) for member_id in members] + + assignment = self._sync_group(group_assignments) + self._setup_internal_consumer( + partitions=[self._topic.partitions[pid] for pid in assignment[0][1]]) + log.debug("Successfully rebalanced consumer '%s'", self._consumer_id) + break + except Exception as ex: + if i == self._rebalance_max_retries - 1: + log.warning('Failed to rebalance s after %d retries.', i) + raise + log.exception(ex) + log.info('Unable to complete rebalancing. Retrying') + self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000)) + self._raise_worker_exceptions() + + def _build_default_error_handlers(self): + """Set up default responses to common error codes""" + self = weakref.proxy(self) + + def _handle_GroupCoordinatorNotAvailable(): + self._group_coordinator = self._cluster.get_group_coordinator( + self._consumer_group) + + def _handle_NotCoordinatorForGroup(): + self._group_coordinator = self._cluster.get_group_coordinator( + self._consumer_group) + + return { + GroupCoordinatorNotAvailable.ERROR_CODE: _handle_GroupCoordinatorNotAvailable, + NotCoordinatorForGroup.ERROR_CODE: _handle_NotCoordinatorForGroup, + GroupLoadInProgress.ERROR_CODE: None, + RebalanceInProgress.ERROR_CODE: None, + IllegalGeneration.ERROR_CODE: None + } + + def _handle_error(self, error_code): + """Call the appropriate handler function for the given error code + + :param error_code: The error code returned from a Group Membership API request + :type error_code: int + """ + if error_code not in self._default_error_handlers: + raise ERROR_CODES[error_code]() + if self._default_error_handlers[error_code] is not None: + self._default_error_handlers[error_code]() + + def _join_group(self): + """Send a JoinGroupRequest. + + Assigns a member id and tells the coordinator about this consumer. + """ + log.info("Sending JoinGroupRequest for consumer id '%s'", self._consumer_id) + for i in range(self._cluster._max_connection_retries): + join_result = self._group_coordinator.join_group(self._connection_id, + self._consumer_group, + self._consumer_id) + if join_result.error_code == 0: + break + log.info("Error code %d encountered during JoinGroupRequest for" + " generation '%s'", join_result.error_code, self._generation_id) + if i == self._cluster._max_connection_retries - 1: + raise ERROR_CODES[join_result.error_code] + self._handle_error(join_result.error_code) + self._cluster.handler.sleep(i * 2) + self._generation_id = join_result.generation_id + self._consumer_id = join_result.member_id + return join_result.members + + def _sync_group(self, group_assignments): + """Send a SyncGroupRequest. + + If this consumer is the group leader, this call informs the other consumers + of their partition assignments. For all consumers including the leader, this call + is used to fetch partition assignments. + + The group leader *could* tell itself its own assignment instead of using the + result of this request, but it does the latter to ensure consistency. + """ + log.info("Sending SyncGroupRequest for consumer id '%s'", self._consumer_id) + for i in range(self._cluster._max_connection_retries): + sync_result = self._group_coordinator.sync_group(self._connection_id, + self._consumer_group, + self._generation_id, + self._consumer_id, + group_assignments) + if sync_result.error_code == 0: + break + log.info("Error code %d encountered during SyncGroupRequest", + sync_result.error_code) + if i == self._cluster._max_connection_retries - 1: + raise ERROR_CODES[sync_result.error_code] + self._handle_error(sync_result.error_code) + self._cluster.handler.sleep(i * 2) + return sync_result.member_assignment.partition_assignment diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 54f9e058e..ebaaa5626 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -49,8 +49,8 @@ "OffsetRequest", "OffsetResponse", "OffsetCommitRequest", "FetchRequest", "FetchResponse", "PartitionFetchRequest", "OffsetCommitResponse", "OffsetFetchRequest", "OffsetFetchResponse", - "PartitionOffsetRequest", "ConsumerMetadataRequest", - "ConsumerMetadataResponse", "PartitionOffsetCommitRequest", + "PartitionOffsetRequest", "GroupCoordinatorRequest", + "GroupCoordinatorResponse", "PartitionOffsetCommitRequest", "PartitionOffsetFetchRequest", "Request", "Response", "Message", "MessageSet" ] @@ -868,16 +868,16 @@ def __init__(self, buff): partition[2], partition[1]) -class ConsumerMetadataRequest(Request): +class GroupCoordinatorRequest(Request): """A consumer metadata request Specification:: - ConsumerMetadataRequest => ConsumerGroup + GroupCoordinatorRequest => ConsumerGroup ConsumerGroup => string """ def __init__(self, consumer_group): - """Create a new consumer metadata request""" + """Create a new group coordinator request""" self.consumer_group = consumer_group def __len__(self): @@ -904,12 +904,12 @@ def get_bytes(self): return output -class ConsumerMetadataResponse(Response): - """A consumer metadata response +class GroupCoordinatorResponse(Response): + """A group coordinator response Specification:: - ConsumerMetadataResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort + GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort ErrorCode => int16 CoordinatorId => int32 CoordinatorHost => string @@ -1190,3 +1190,416 @@ def __init__(self, buff): partition[2], partition[3]) self.topics[topic_name][partition[0]] = pres + + +### +# Group Membership API +### +class ConsumerGroupProtocolMetadata(object): + """ + Protocol specification:: + + ProtocolMetadata => Version Subscription UserData + Version => int16 + Subscription => [Topic] + Topic => string + UserData => bytes + """ + def __init__(self): + self.version = 0 + self.topic_names = [b"dummytopic"] + self.user_data = b"testuserdata" + + def __len__(self): + # version + len(topic names) + size = 2 + 4 + for topic_name in self.topic_names: + # len(topic_name) + topic_name + size += 2 + len(topic_name) + # len(user data) + user data + size += 4 + len(self.user_data) + return size + + def get_bytes(self): + output = bytearray(len(self)) + offset = 0 + fmt = '!hi' + struct.pack_into(fmt, output, offset, self.version, len(self.topic_names)) + offset += struct.calcsize(fmt) + for topic_name in self.topic_names: + fmt = '!h%ds' % len(topic_name) + struct.pack_into(fmt, output, offset, len(topic_name), topic_name) + offset += struct.calcsize(fmt) + fmt = '!i%ds' % len(self.user_data) + struct.pack_into(fmt, output, offset, len(self.user_data), self.user_data) + offset += struct.calcsize(fmt) + return output + + +GroupMembershipProtocol = namedtuple( + 'GroupMembershipProtocol', ['protocol_type', 'protocol_name', 'metadata'] +) + + +ConsumerGroupProtocol = GroupMembershipProtocol(b"consumer", b"pykafkaassignmentstrategy", + ConsumerGroupProtocolMetadata()) + + +class JoinGroupRequest(Request): + """A group join request + + Specification:: + + JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols + GroupId => string + SessionTimeout => int32 + MemberId => string + ProtocolType => string + GroupProtocols => [ProtocolName ProtocolMetadata] + ProtocolName => string + ProtocolMetadata => bytes + """ + def __init__(self, group_id, member_id, session_timeout=30000): + """Create a new group join request""" + self.protocol = ConsumerGroupProtocol + self.group_id = group_id + self.session_timeout = session_timeout + self.member_id = member_id + self.protocol_type = self.protocol.protocol_type + self.group_protocols = [(self.protocol.protocol_name, + bytes(self.protocol.metadata.get_bytes()))] + + def __len__(self): + """Length of the serialized message, in bytes""" + # Header + group id + session timeout + size = self.HEADER_LEN + 2 + len(self.group_id) + 4 + # + member id + protocol type + len(group protocols) + size += 2 + len(self.member_id) + 2 + len(self.protocol_type) + 4 + # metadata tuples + for name, metadata in self.group_protocols: + size += 2 + len(name) + 4 + len(metadata) + return size + + @property + def API_KEY(self): + """API_KEY for this request, from the Kafka docs""" + return 11 + + def get_bytes(self): + """Serialize the message + + :returns: Serialized message + :rtype: :class:`bytearray` + """ + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + fmt = '!h%dsih%dsh%dsi' % (len(self.group_id), len(self.member_id), + len(self.protocol_type)) + struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id, + self.session_timeout, len(self.member_id), self.member_id, + len(self.protocol_type), self.protocol_type, + len(self.group_protocols)) + offset += struct.calcsize(fmt) + for protocol_name, protocol_metadata in self.group_protocols: + fmt = '!h%dsi%ds' % (len(protocol_name), len(protocol_metadata)) + struct.pack_into(fmt, output, offset, len(protocol_name), protocol_name, + len(protocol_metadata), protocol_metadata) + offset += struct.calcsize(fmt) + return output + + +class JoinGroupResponse(Response): + """A group join response + + Specification:: + + JoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members + ErrorCode => int16 + GenerationId => int32 + GroupProtocol => string + LeaderId => string + MemberId => string + Members => [MemberId MemberMetadata] + MemberId => string + MemberMetadata => bytes + """ + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'hiSSS[SY ]' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.generation_id = response[1] + self.group_protocol = response[2] + self.leader_id = response[3] + self.member_id = response[4] + # TODO - parse metadata bytestring into ConsumerGroupProtocolMetadata? + self.members = {_id: meta for _id, meta in response[5]} + + +class MemberAssignment(object): + """ + Protocol specification:: + + MemberAssignment => Version PartitionAssignment + Version => int16 + PartitionAssignment => [Topic [Partition]] + Topic => string + Partition => int32 + UserData => bytes + """ + def __init__(self, partition_assignment, member_id=None, version=1): + self.member_id = member_id + self.version = version + self.partition_assignment = partition_assignment + + @classmethod + def from_bytestring(cls, buff): + if len(buff) == 0: + return cls(tuple()) + fmt = 'h [S [i ] ]' + response = struct_helpers.unpack_from(fmt, buff, 0) + + version = response[0] + partition_assignment = response[1] + return cls(partition_assignment, version=version) + + def __len__(self): + # version + len(partition assignment) + size = 2 + 4 + for topic_name, partitions in self.partition_assignment: + # len(topic_name) + topic_name + len(partitions) + size += 2 + len(topic_name) + 4 + size += 4 * len(partitions) + return size + + def get_bytes(self): + output = bytearray(len(self)) + offset = 0 + fmt = '!hi' + struct.pack_into(fmt, output, offset, self.version, + len(self.partition_assignment)) + offset += struct.calcsize(fmt) + for topic_name, partitions in self.partition_assignment: + fmt = '!h%dsi' % len(topic_name) + struct.pack_into(fmt, output, offset, len(topic_name), topic_name, + len(partitions)) + offset += struct.calcsize(fmt) + for partition_id in partitions: + fmt = '!i' + struct.pack_into(fmt, output, offset, partition_id) + offset += struct.calcsize(fmt) + return output + + +class SyncGroupRequest(Request): + """A group sync request + + Specification:: + + SyncGroupRequest => GroupId GenerationId MemberId GroupAssignment + GroupId => string + GenerationId => int32 + MemberId => string + GroupAssignment => [MemberId MemberAssignment] + MemberId => string + MemberAssignment => bytes + """ + def __init__(self, group_id, generation_id, member_id, group_assignment): + """Create a new group join request""" + self.group_id = group_id + self.generation_id = generation_id + self.member_id = member_id + self.group_assignment = group_assignment + + def __len__(self): + """Length of the serialized message, in bytes""" + # Header + len(group id) + group id + generation id + size = self.HEADER_LEN + 2 + len(self.group_id) + 4 + # + len(member id) + member id + len(group assignment) + size += 2 + len(self.member_id) + 4 + # group assignment tuples + for member_assignment in self.group_assignment: + # + len(member id) + member id + len(member assignment) + member assignment + size += 2 + len(member_assignment.member_id) + 4 + len(member_assignment) + return size + + @property + def API_KEY(self): + """API_KEY for this request, from the Kafka docs""" + return 14 + + def get_bytes(self): + """Serialize the message + + :returns: Serialized message + :rtype: :class:`bytearray` + """ + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + fmt = '!h%dsih%dsi' % (len(self.group_id), len(self.member_id)) + struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id, + self.generation_id, len(self.member_id), self.member_id, + len(self.group_assignment)) + offset += struct.calcsize(fmt) + for member_assignment in self.group_assignment: + assignment_bytes = bytes(member_assignment.get_bytes()) + fmt = '!h%dsi%ds' % (len(member_assignment.member_id), len(assignment_bytes)) + struct.pack_into(fmt, output, offset, len(member_assignment.member_id), + member_assignment.member_id, len(assignment_bytes), + assignment_bytes) + offset += struct.calcsize(fmt) + return output + + +class SyncGroupResponse(Response): + """A group sync response + + Specification:: + + SyncGroupResponse => ErrorCode MemberAssignment + ErrorCode => int16 + MemberAssignment => bytes + """ + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'hY' + response = struct_helpers.unpack_from(fmt, buff, 0) + self.error_code = response[0] + self.member_assignment = MemberAssignment.from_bytestring(response[1]) + + +class HeartbeatRequest(Request): + """A group heartbeat request + + Specification:: + + HeartbeatRequest => GroupId GenerationId MemberId + GroupId => string + GenerationId => int32 + MemberId => string + """ + def __init__(self, group_id, generation_id, member_id): + """Create a new heartbeat request""" + self.group_id = group_id + self.generation_id = generation_id + self.member_id = member_id + + def __len__(self): + """Length of the serialized message, in bytes""" + # Header + len(group id) + group id + generation id + size = self.HEADER_LEN + 2 + len(self.group_id) + 4 + # + len(member id) + member id + size += 2 + len(self.member_id) + return size + + @property + def API_KEY(self): + """API_KEY for this request, from the Kafka docs""" + return 12 + + def get_bytes(self): + """Serialize the message + + :returns: Serialized message + :rtype: :class:`bytearray` + """ + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + fmt = '!h%dsih%ds' % (len(self.group_id), len(self.member_id)) + struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id, + self.generation_id, len(self.member_id), self.member_id) + offset += struct.calcsize(fmt) + return output + + +class HeartbeatResponse(Response): + """A group heartbeat response + + Specification:: + + HeartbeatResponse => ErrorCode + ErrorCode => int16 + """ + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h' + response = struct_helpers.unpack_from(fmt, buff, 0) + self.error_code = response[0] + + +class LeaveGroupRequest(Request): + """A group exit request + + Specification:: + + LeaveGroupRequest => GroupId MemberId + GroupId => string + MemberId => string + """ + def __init__(self, group_id, member_id): + """Create a new group join request""" + self.group_id = group_id + self.member_id = member_id + + def __len__(self): + """Length of the serialized message, in bytes""" + # Header + len(group id) + group id + size = self.HEADER_LEN + 2 + len(self.group_id) + # + len(member id) + member id + size += 2 + len(self.member_id) + return size + + @property + def API_KEY(self): + """API_KEY for this request, from the Kafka docs""" + return 13 + + def get_bytes(self): + """Serialize the message + + :returns: Serialized message + :rtype: :class:`bytearray` + """ + output = bytearray(len(self)) + self._write_header(output) + offset = self.HEADER_LEN + fmt = '!h%dsh%ds' % (len(self.group_id), len(self.member_id)) + struct.pack_into(fmt, output, offset, len(self.group_id), self.group_id, + len(self.member_id), self.member_id) + offset += struct.calcsize(fmt) + return output + + +class LeaveGroupResponse(Response): + """A group exit response + + Specification:: + + LeaveGroupResponse => ErrorCode + ErrorCode => int16 + """ + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h' + response = struct_helpers.unpack_from(fmt, buff, 0) + self.error_code = response[0] diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index 25e41a19b..8b70c6005 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -45,7 +45,9 @@ def __init__(self, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, - compacted_topic=False): + compacted_topic=False, + generation_id=-1, + consumer_id=b''): callargs = {k: v for k, v in vars().items() if k not in ("self", "__class__")} self._rdk_consumer = None diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 20b15df94..287c47bf4 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -32,10 +32,11 @@ range, iterkeys) from .exceptions import (OffsetOutOfRangeError, UnknownTopicOrPartition, OffsetMetadataTooLarge, GroupLoadInProgress, - NotCoordinatorForConsumer, SocketDisconnectedError, + NotCoordinatorForGroup, SocketDisconnectedError, ConsumerStoppedException, KafkaException, NotLeaderForPartition, OffsetRequestFailedError, - RequestTimedOut, ERROR_CODES) + RequestTimedOut, UnknownMemberId, RebalanceInProgress, + ERROR_CODES) from .protocol import (PartitionFetchRequest, PartitionOffsetCommitRequest, PartitionOffsetFetchRequest, PartitionOffsetRequest) from .utils.error_handlers import (handle_partition_responses, raise_error, @@ -67,7 +68,9 @@ def __init__(self, consumer_timeout_ms=-1, auto_start=True, reset_offset_on_start=False, - compacted_topic=False): + compacted_topic=False, + generation_id=-1, + consumer_id=b''): """Create a SimpleConsumer. Settings and default values are taken from the Scala @@ -136,6 +139,11 @@ def __init__(self, consumer to use less stringent message ordering logic because compacted topics do not provide offsets in stict incrementing order. :type compacted_topic: bool + :param generation_id: The generation id with which to make group requests + :type generation_id: int + :param consumer_id: The identifying string to use for this consumer on group + requests + :type consumer_id: bytes """ self._cluster = cluster if not (isinstance(consumer_group, bytes) or consumer_group is None): @@ -157,6 +165,8 @@ def __init__(self, self._auto_start = auto_start self._reset_offset_on_start = reset_offset_on_start self._is_compacted_topic = compacted_topic + self._generation_id = generation_id + self._consumer_id = consumer_id # incremented for any message arrival from any partition # the initial value is 0 (no messages waiting) @@ -169,7 +179,7 @@ def __init__(self, self._worker_trace_logged = False self._update_lock = self._cluster.handler.Lock() - self._discover_offset_manager() + self._discover_group_coordinator() if partitions is not None: self._partitions = {p: OwnedPartition(p, self._cluster.handler, @@ -219,7 +229,7 @@ def _update(self): with self._update_lock: self._cluster.update() self._setup_partitions_by_leader() - self._discover_offset_manager() + self._discover_group_coordinator() def start(self): """Begin communicating with Kafka, including setting up worker threads @@ -260,8 +270,8 @@ def _handle_OffsetOutOfRangeError(parts): def _handle_RequestTimedOut(parts): log.info("Continuing in response to RequestTimedOut") - def _handle_NotCoordinatorForConsumer(parts): - log.info("Updating cluster in response to NotCoordinatorForConsumer") + def _handle_NotCoordinatorForGroup(parts): + log.info("Updating cluster in response to NotCoordinatorForGroup") self._update() def _handle_NotLeaderForPartition(parts): @@ -271,23 +281,31 @@ def _handle_NotLeaderForPartition(parts): def _handle_GroupLoadInProgress(parts): log.info("Continuing in response to GroupLoadInProgress") + def _handle_UnknownMemberId(parts): + log.info("Continuing in response to UnknownMemberId") + + def _handle_RebalanceInProgress(parts): + log.info("Continuing in response to RebalanceInProgress") + return { UnknownTopicOrPartition.ERROR_CODE: lambda p: raise_error(UnknownTopicOrPartition), OffsetOutOfRangeError.ERROR_CODE: _handle_OffsetOutOfRangeError, NotLeaderForPartition.ERROR_CODE: _handle_NotLeaderForPartition, OffsetMetadataTooLarge.ERROR_CODE: lambda p: raise_error(OffsetMetadataTooLarge), - NotCoordinatorForConsumer.ERROR_CODE: _handle_NotCoordinatorForConsumer, + NotCoordinatorForGroup.ERROR_CODE: _handle_NotCoordinatorForGroup, RequestTimedOut.ERROR_CODE: _handle_RequestTimedOut, - GroupLoadInProgress.ERROR_CODE: _handle_GroupLoadInProgress + GroupLoadInProgress.ERROR_CODE: _handle_GroupLoadInProgress, + UnknownMemberId.ERROR_CODE: _handle_UnknownMemberId, + RebalanceInProgress.ERROR_CODE: _handle_RebalanceInProgress } - def _discover_offset_manager(self): - """Set the offset manager for this consumer. + def _discover_group_coordinator(self): + """Set the group coordinator for this consumer. If a consumer group is not supplied to __init__, this method does nothing """ if self._consumer_group is not None: - self._offset_manager = self._cluster.get_offset_manager(self._consumer_group) + self._group_coordinator = self._cluster.get_group_coordinator(self._consumer_group) @property def topic(self): @@ -427,19 +445,19 @@ def commit_offsets(self): reqs = [p.build_offset_commit_request() for p in self._partitions.values()] log.debug("Committing offsets for %d partitions to broker id %s", len(reqs), - self._offset_manager.id) + self._group_coordinator.id) for i in range(self._offsets_commit_max_retries): if i > 0: log.debug("Retrying") self._cluster.handler.sleep(i * (self._offsets_channel_backoff_ms / 1000)) try: - response = self._offset_manager.commit_consumer_group_offsets( - self._consumer_group, -1, b'pykafka', reqs) + response = self._group_coordinator.commit_consumer_group_offsets( + self._consumer_group, self._generation_id, self._consumer_id, reqs) except (SocketDisconnectedError, IOError): - log.error("Error committing offsets for topic '%s' " + log.error("Error committing offsets for topic '%s' from consumer id '%s'" "(SocketDisconnectedError)", - self._topic.name) + self._topic.name, self._consumer_id) if i >= self._offsets_commit_max_retries - 1: raise self._update() @@ -452,8 +470,8 @@ def commit_offsets(self): if (len(parts_by_error) == 1 and 0 in parts_by_error) or \ len(parts_by_error) == 0: break - log.error("Error committing offsets for topic '%s' (errors: %s)", - self._topic.name, + log.error("Error committing offsets for topic '%s' from consumer id '%s'" + "(errors: %s)", self._topic.name, self._consumer_id, {ERROR_CODES[err]: [op.partition.id for op, _ in parts] for err, parts in iteritems(parts_by_error)}) @@ -510,13 +528,13 @@ def _handle_success(parts): success_responses = [] log.debug("Fetching offsets for %d partitions from broker id %s", len(reqs), - self._offset_manager.id) + self._group_coordinator.id) for i in range(self._offsets_fetch_max_retries): if i > 0: log.debug("Retrying offset fetch") - res = self._offset_manager.fetch_consumer_group_offsets(self._consumer_group, reqs) + res = self._group_coordinator.fetch_consumer_group_offsets(self._consumer_group, reqs) parts_by_error = handle_partition_responses( self._default_error_handlers, response=res, diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index c25364f6d..52c112843 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -132,7 +132,7 @@ class KafkaInstance(ManagedInstance): def __init__(self, num_instances=1, kafka_version='0.8.2.1', - scala_version='2.10', + scala_version='2.11', bin_dir='/tmp/kafka-bin', name='kafka', use_gevent=False): @@ -348,7 +348,7 @@ def produce_messages(self, topic_name, messages): help='Download destination for Kafka') parser.add_argument('--kafka-version', type=str, default='0.8.2.1', help='Kafka version to download') - parser.add_argument('--scala-version', type=str, default='2.10', + parser.add_argument('--scala-version', type=str, default='2.11', help='Scala version for kafka build') args = parser.parse_args() diff --git a/pykafka/topic.py b/pykafka/topic.py index 526e177db..9625ac434 100644 --- a/pykafka/topic.py +++ b/pykafka/topic.py @@ -24,6 +24,7 @@ from .common import OffsetType from .exceptions import LeaderNotAvailable from .handlers import GEventHandler +from .managedbalancedconsumer import ManagedBalancedConsumer from .partition import Partition from .producer import Producer from .protocol import PartitionOffsetRequest @@ -186,13 +187,20 @@ def get_simple_consumer(self, consumer_group=consumer_group, **kwargs) - def get_balanced_consumer(self, consumer_group, **kwargs): + def get_balanced_consumer(self, consumer_group, managed=False, **kwargs): """Return a BalancedConsumer of this topic :param consumer_group: The name of the consumer group to join :type consumer_group: bytes + :param managed: If True, manage the consumer group with Kafka using the 0.9 + group management api (requires Kafka >=0.9)) + :type managed: bool """ - if "zookeeper_connect" not in kwargs and \ - self._cluster._zookeeper_connect is not None: - kwargs['zookeeper_connect'] = self._cluster._zookeeper_connect - return BalancedConsumer(self, self._cluster, consumer_group, **kwargs) + if not managed: + if "zookeeper_connect" not in kwargs and \ + self._cluster._zookeeper_connect is not None: + kwargs['zookeeper_connect'] = self._cluster._zookeeper_connect + cls = BalancedConsumer + else: + cls = ManagedBalancedConsumer + return cls(self, self._cluster, consumer_group, **kwargs) diff --git a/tests/pykafka/__init__.py b/tests/pykafka/__init__.py index e69de29bb..349916e5c 100644 --- a/tests/pykafka/__init__.py +++ b/tests/pykafka/__init__.py @@ -0,0 +1,48 @@ +import pytest + + +def patch_subclass(parent, skip_condition): + """Work around a pytest.mark.skipif bug + + https://github.com/pytest-dev/pytest/issues/568 + + The issue causes all subclasses of a TestCase subclass to be skipped if any one + of them is skipped. + + This fix circumvents the issue by overriding Python's existing subclassing mechanism. + Instead of having `cls` be a subclass of `parent`, this decorator adds each attribute + of `parent` to `cls` without using Python inheritance. When appropriate, it also adds + a boolean condition under which to skip tests for the decorated class. + + :param parent: The "superclass" from which the decorated class should inherit + its non-overridden attributes + :type parent: unittest2.TestCase + :param skip_condition: A boolean condition that, when True, will cause all tests in + the decorated class to be skipped + :type skip_condition: bool + """ + def patcher(cls): + def build_skipped_method(method, cls, cond=None): + if cond is None: + cond = False + if hasattr(method, "skip_condition"): + cond = cond or method.skip_condition(cls) + + @pytest.mark.skipif(cond, reason="") + def _wrapper(self): + return method(self) + return _wrapper + + # two passes required so that skips have access to all class attributes + for attr in parent.__dict__: + if attr in cls.__dict__: + continue + if not attr.startswith("test_"): + setattr(cls, attr, parent.__dict__[attr]) + + for attr in parent.__dict__: + if attr.startswith("test_"): + setattr(cls, attr, build_skipped_method(parent.__dict__[attr], + cls, skip_condition)) + return cls + return patcher diff --git a/tests/pykafka/rdkafka/test_simple_consumer.py b/tests/pykafka/rdkafka/test_simple_consumer.py index fb6b9a841..6f3fccfc6 100644 --- a/tests/pykafka/rdkafka/test_simple_consumer.py +++ b/tests/pykafka/rdkafka/test_simple_consumer.py @@ -1,8 +1,9 @@ import platform +import unittest2 import pytest -from tests.pykafka import test_simpleconsumer, test_balancedconsumer +from tests.pykafka import test_simpleconsumer, test_balancedconsumer, patch_subclass from pykafka.utils.compat import range @@ -68,8 +69,7 @@ def _latest_partition_offsets_by_reading(consumer, n_reads): return latest_offs -@pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="Unresolved crashes") -class RdkBalancedConsumerIntegrationTests( - test_balancedconsumer.BalancedConsumerIntegrationTests): +@patch_subclass(test_balancedconsumer.BalancedConsumerIntegrationTests, + platform.python_implementation() == "PyPy") +class RdkBalancedConsumerIntegrationTests(unittest2.TestCase): USE_RDKAFKA = True diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index bcc4e16ac..0ab92b94e 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -1,5 +1,7 @@ import math import mock +import os +import pkg_resources import platform import pytest import time @@ -12,35 +14,41 @@ from pykafka import KafkaClient from pykafka.balancedconsumer import BalancedConsumer, OffsetType from pykafka.exceptions import ConsumerStoppedException +from pykafka.managedbalancedconsumer import ManagedBalancedConsumer from pykafka.test.utils import get_cluster, stop_cluster from pykafka.utils.compat import range, iterkeys, iteritems +from tests.pykafka import patch_subclass -def buildMockConsumer(num_partitions=10, num_participants=1, timeout=2000): - consumer_group = b'testgroup' - topic = mock.Mock() - topic.name = 'testtopic' - topic.partitions = {} - for k in range(num_partitions): - part = mock.Mock(name='part-{part}'.format(part=k)) - part.id = k - part.topic = topic - part.leader = mock.Mock() - part.leader.id = k % num_participants - topic.partitions[k] = part - - cluster = mock.MagicMock() - zk = mock.MagicMock() - return BalancedConsumer(topic, cluster, consumer_group, - zookeeper=zk, auto_start=False, use_rdkafka=False, - consumer_timeout_ms=timeout), topic +kafka_version = pkg_resources.parse_version(os.environ.get('KAFKA_VERSION', '0.8')) +version_09 = pkg_resources.parse_version("0.9.0.0") class TestBalancedConsumer(unittest2.TestCase): @classmethod def setUpClass(cls): cls._consumer_timeout = 2000 - cls._mock_consumer, _ = buildMockConsumer(timeout=cls._consumer_timeout) + cls._mock_consumer, _ = TestBalancedConsumer.buildMockConsumer(timeout=cls._consumer_timeout) + + @classmethod + def buildMockConsumer(self, num_partitions=10, num_participants=1, timeout=2000): + consumer_group = b'testgroup' + topic = mock.Mock() + topic.name = 'testtopic' + topic.partitions = {} + for k in range(num_partitions): + part = mock.Mock(name='part-{part}'.format(part=k)) + part.id = k + part.topic = topic + part.leader = mock.Mock() + part.leader.id = k % num_participants + topic.partitions[k] = part + + cluster = mock.MagicMock() + zk = mock.MagicMock() + return BalancedConsumer(topic, cluster, consumer_group, + zookeeper=zk, auto_start=False, use_rdkafka=False, + consumer_timeout_ms=timeout), topic def test_consume_returns(self): """Ensure that consume() returns in the amount of time it's supposed to @@ -56,7 +64,7 @@ def test_consume_graceful_stop(self): """Ensure that stopping a consumer while consuming from Kafka does not end in an infinite loop when timeout is not used. """ - consumer, _ = buildMockConsumer(timeout=-1) + consumer, _ = self.buildMockConsumer(timeout=-1) consumer._setup_internal_consumer(start=False) consumer._consumer._partitions_by_id = {1: "dummy"} @@ -73,8 +81,8 @@ def test_decide_partitions(self): num_partitions = 100 - i participants = sorted(['test-debian:{p}'.format(p=p) for p in range(num_participants)]) - cns, topic = buildMockConsumer(num_partitions=num_partitions, - num_participants=num_participants) + cns, topic = self.buildMockConsumer(num_partitions=num_partitions, + num_participants=num_participants) # Simulate each participant to ensure they're correct assigned_parts = [] @@ -101,10 +109,34 @@ def test_decide_partitions(self): self.assertListEqual(assigned_parts, all_partitions) +class TestManagedBalancedConsumer(TestBalancedConsumer): + @classmethod + def buildMockConsumer(self, num_partitions=10, num_participants=1, timeout=2000): + consumer_group = b'testgroup' + topic = mock.Mock() + topic.name = 'testtopic' + topic.partitions = {} + for k in range(num_partitions): + part = mock.Mock(name='part-{part}'.format(part=k)) + part.id = k + part.topic = topic + part.leader = mock.Mock() + part.leader.id = k % num_participants + topic.partitions[k] = part + + cluster = mock.MagicMock() + cns = ManagedBalancedConsumer(topic, cluster, consumer_group, + auto_start=False, use_rdkafka=False, + consumer_timeout_ms=timeout) + cns._group_coordinator = mock.MagicMock() + return cns, topic + + class BalancedConsumerIntegrationTests(unittest2.TestCase): maxDiff = None USE_RDKAFKA = False USE_GEVENT = False + MANAGED_CONSUMER = False @classmethod def setUpClass(cls): @@ -128,6 +160,16 @@ def get_zk(self): return KazooClient(self.kafka.zookeeper) return KazooClient(self.kafka.zookeeper, handler=SequentialGeventHandler()) + def get_balanced_consumer(self, consumer_group, **kwargs): + if self.MANAGED_CONSUMER: + kwargs.pop("zookeeper", None) + kwargs.pop("zookeeper_connect", None) + return self.client.topics[self.topic_name].get_balanced_consumer( + consumer_group, + managed=self.MANAGED_CONSUMER, + **kwargs + ) + def test_rebalance_callbacks(self): def on_rebalance(cns, old_partition_offsets, new_partition_offsets): self.assertTrue(len(new_partition_offsets) > 0) @@ -140,13 +182,13 @@ def on_rebalance(cns, old_partition_offsets, new_partition_offsets): self.offset_reset = 50 try: consumer_group = b'test_rebalance_callbacks' - consumer_a = self.client.topics[self.topic_name].get_balanced_consumer( + consumer_a = self.get_balanced_consumer( consumer_group, zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.EARLIEST, post_rebalance_callback=on_rebalance, use_rdkafka=self.USE_RDKAFKA) - consumer_b = self.client.topics[self.topic_name].get_balanced_consumer( + consumer_b = self.get_balanced_consumer( consumer_group, zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.EARLIEST, @@ -170,13 +212,13 @@ def on_rebalance(cns, old_partition_offsets, new_partition_offsets): self.offset_reset = 50 try: consumer_group = b'test_rebalance_callbacks_error' - consumer_a = self.client.topics[self.topic_name].get_balanced_consumer( + consumer_a = self.get_balanced_consumer( consumer_group, zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.EARLIEST, post_rebalance_callback=on_rebalance, use_rdkafka=self.USE_RDKAFKA) - consumer_b = self.client.topics[self.topic_name].get_balanced_consumer( + consumer_b = self.get_balanced_consumer( consumer_group, zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.EARLIEST, @@ -195,13 +237,12 @@ def on_rebalance(cns, old_partition_offsets, new_partition_offsets): def test_consume_earliest(self): try: - topic = self.client.topics[self.topic_name] - consumer_a = topic.get_balanced_consumer( + consumer_a = self.get_balanced_consumer( b'test_consume_earliest', zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.EARLIEST, use_rdkafka=self.USE_RDKAFKA) - consumer_b = topic.get_balanced_consumer( + consumer_b = self.get_balanced_consumer( b'test_consume_earliest', zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.EARLIEST, @@ -233,13 +274,12 @@ def test_consume_earliest(self): def test_consume_latest(self): try: - topic = self.client.topics[self.topic_name] - consumer_a = topic.get_balanced_consumer( + consumer_a = self.get_balanced_consumer( b'test_consume_latest', zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.LATEST, use_rdkafka=self.USE_RDKAFKA) - consumer_b = topic.get_balanced_consumer( + consumer_b = self.get_balanced_consumer( b'test_consume_latest', zookeeper_connect=self.kafka.zookeeper, auto_offset_reset=OffsetType.LATEST, @@ -286,23 +326,28 @@ def test_external_kazoo_client(self): zk = KazooClient(self.kafka.zookeeper) zk.start() - consumer = self.client.topics[self.topic_name].get_balanced_consumer( + consumer = self.get_balanced_consumer( b'test_external_kazoo_client', zookeeper=zk, consumer_timeout_ms=10, use_rdkafka=self.USE_RDKAFKA) [msg for msg in consumer] consumer.stop() + test_external_kazoo_client.skip_condition = lambda cls: cls.MANAGED_CONSUMER def test_no_partitions(self): """Ensure a consumer assigned no partitions doesn't fail""" - consumer = self.client.topics[self.topic_name].get_balanced_consumer( + + def _decide_dummy(p, consumer_id=None): + return set() + consumer = self.get_balanced_consumer( b'test_no_partitions', zookeeper_connect=self.kafka.zookeeper, auto_start=False, consumer_timeout_ms=50, use_rdkafka=self.USE_RDKAFKA) - consumer._decide_partitions = lambda p: set() + + consumer._decide_partitions = _decide_dummy consumer.start() res = consumer.consume() self.assertEqual(res, None) @@ -319,18 +364,17 @@ def test_zk_conn_lost(self): zk = self.get_zk() zk.start() try: - topic = self.client.topics[self.topic_name] consumer_group = b'test_zk_conn_lost' - consumer = topic.get_balanced_consumer(consumer_group, - zookeeper=zk, - use_rdkafka=self.USE_RDKAFKA) + consumer = self.get_balanced_consumer(consumer_group, + zookeeper=zk, + use_rdkafka=self.USE_RDKAFKA) self.assertTrue(check_partitions(consumer)) with consumer._rebalancing_lock: zk.stop() # expires session, dropping all our nodes # Start a second consumer on a different zk connection - other_consumer = topic.get_balanced_consumer( + other_consumer = self.get_balanced_consumer( consumer_group, use_rdkafka=self.USE_RDKAFKA) # Slightly contrived: we'll grab a lock to keep _rebalance() from @@ -353,6 +397,7 @@ def test_zk_conn_lost(self): zk.stop() except: pass + test_zk_conn_lost.skip_condition = lambda cls: cls.MANAGED_CONSUMER def wait_for_rebalancing(self, *balanced_consumers): """Test helper that loops while rebalancing is ongoing @@ -374,9 +419,22 @@ def wait_for_rebalancing(self, *balanced_consumers): raise AssertionError("Rebalancing failed") -@pytest.mark.skipif(platform.python_implementation() == "PyPy", - reason="Unresolved crashes") -class BalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests): +@patch_subclass(BalancedConsumerIntegrationTests, + platform.python_implementation() == "PyPy") +class BalancedConsumerGEventIntegrationTests(unittest2.TestCase): + USE_GEVENT = True + + +@patch_subclass(BalancedConsumerIntegrationTests, kafka_version < version_09) +class ManagedBalancedConsumerIntegrationTests(unittest2.TestCase): + MANAGED_CONSUMER = True + + +@patch_subclass( + BalancedConsumerIntegrationTests, + platform.python_implementation() == "PyPy" or kafka_version < version_09) +class ManagedBalancedConsumerGEventIntegrationTests(unittest2.TestCase): + MANAGED_CONSUMER = True USE_GEVENT = True diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 7fc120761..d4f5a7ec7 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -79,7 +79,7 @@ def test_snappy_compression(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.ProduceResponse( - buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02') ) self.assertEqual(response.topics[b'test'][0].err, 3) @@ -207,7 +207,7 @@ class TestOffsetCommitFetchAPI(unittest2.TestCase): maxDiff = None def test_consumer_metadata_request(self): - req = protocol.ConsumerMetadataRequest(b'test') + req = protocol.GroupCoordinatorRequest(b'test') msg = req.get_bytes() self.assertEqual( msg, @@ -215,7 +215,7 @@ def test_consumer_metadata_request(self): ) def test_consumer_metadata_response(self): - response = protocol.ConsumerMetadataResponse( + response = protocol.GroupCoordinatorResponse( buffer(b'\x00\x00\x00\x00\x00\x00\x00\remmett-debian\x00\x00#\x84') ) self.assertEqual(response.coordinator_id, 0) @@ -256,5 +256,187 @@ def test_offset_fetch_response(self): self.assertEqual(response.topics[b'emmett.dummy'][0].offset, 1) +class TestGroupMembershipAPI(unittest2.TestCase): + maxDiff = None + + def test_consumer_group_protocol_metadata(self): + meta = protocol.ConsumerGroupProtocolMetadata() + msg = meta.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x00\x00\x00' # version + b'\x00\x01' # len(subscription) + b'\x00\n' # len(topic name) + b'dummytopic' # topic name + b'\x00\x00\x00\x0c' # len(userdata) + b'testuserdata') # userdata + ) + + def test_join_group_request(self): + req = protocol.JoinGroupRequest(b'dummygroup', member_id=b'testmember') + msg = req.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x00\x00|\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x07pykafka' # header + b'\x00\n' # len(groupid) + b'dummygroup' # groupid + b'\x00\x00u0' # session timeout + b'\x00\n' # len(memberid) + b'testmember' # memberid + b'\x00\x08' # len(protocol type) + b'consumer' # protocol type + b'\x00\x00\x00\x01' # len(group protocols) + b'\x00\x19' # len(protocol name) + b'pykafkaassignmentstrategy' # protocol name + b'\x00\x00\x00"' # len(protocol metadata) + b'\x00\x00\x00\x00\x00\x01\x00\ndummytopic\x00\x00\x00\x0ctestuserdata' # protocol metadata + ) + ) + + def test_join_group_response(self): + response = protocol.JoinGroupResponse( + bytearray( + b'\x00\x00' # error code + b'\x00\x00\x00\x01' # generation id + b'\x00\x17' # len (group protocol) + b'dummyassignmentstrategy' # group protocol + b'\x00,' # len(leader id) + b'pykafka-b2361322-674c-4e26-9194-305962636e57' # leader id + b'\x00,' # len(member id) + b'pykafka-b2361322-674c-4e26-9194-305962636e57' # member id + b'\x00\x00\x00\x01' # leb(members) + b'\x00,' # len(member id) + b'pykafka-b2361322-674c-4e26-9194-305962636e57' # member id + b'\x00\x00\x00"' # len(member metadata) + b'\x00\x00\x00\x00\x00\x01\x00\ndummytopic\x00\x00\x00\x0ctestuserdata\x00\x00\x00\x00' # member metadata + ) + ) + self.assertEqual(response.generation_id, 1) + self.assertEqual(response.group_protocol, b'dummyassignmentstrategy') + self.assertEqual(response.leader_id, + b'pykafka-b2361322-674c-4e26-9194-305962636e57') + self.assertEqual(response.member_id, + b'pykafka-b2361322-674c-4e26-9194-305962636e57') + self.assertEqual(response.members, + {b'pykafka-b2361322-674c-4e26-9194-305962636e57': b'\x00\x00\x00\x00\x00\x01\x00\ndummytopic\x00\x00\x00\x0ctestuserdata'}) + + def test_member_assignment_construction(self): + assignment = protocol.MemberAssignment([(b"mytopic1", [3, 5, 7, 9]), + (b"mytopic2", [2, 4, 6, 8])]) + msg = assignment.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x01' # version + b'\x00\x00\x00\x02' # len(partition assignment) + b'\x00\x08' # len(topic) + b'mytopic1' # topic + b'\x00\x00\x00\x04' # len(partitions) + b'\x00\x00\x00\x03' # partition + b'\x00\x00\x00\x05' # partition + b'\x00\x00\x00\x07' # partition + b'\x00\x00\x00\t' # partition + b'\x00\x08' # len(topic) + b'mytopic2' # topic + b'\x00\x00\x00\x04' # len(partitions) + b'\x00\x00\x00\x02' # partition + b'\x00\x00\x00\x04' # partition + b'\x00\x00\x00\x06' # partition + b'\x00\x00\x00\x08' # partition + ) + ) + + def test_sync_group_request(self): + req = protocol.SyncGroupRequest( + b'dummygroup', 1, b'testmember1', + [ + protocol.MemberAssignment([(b"mytopic1", [3, 5, 7, 9]), + (b"mytopic2", [3, 5, 7, 9])], member_id=b"a"), + protocol.MemberAssignment([(b"mytopic1", [2, 4, 6, 8]), + (b"mytopic2", [2, 4, 6, 8])], member_id=b"b") + ]) + msg = req.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x00\x00\xc4\x00\x0e\x00\x00\x00\x00\x00\x00\x00\x07pykafka' # header + b'\x00\n' # len(group id) + b'dummygroup' # group id + b'\x00\x00\x00\x01' # generation id + b'\x00\x0b' # len(member id) + b'testmember1' # member id + b'\x00\x00\x00\x02' # len(group assignment) + b'\x00\x01' # len(member id) + b'a' # member id + b'\x00\x00\x00B' # len(member assignment) + b'\x00\x01\x00\x00\x00\x02\x00\x08mytopic1\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x05\x00\x00\x00\x07\x00\x00\x00\t\x00\x08mytopic2\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x05\x00\x00\x00\x07\x00\x00\x00\t' # member assignment + b'\x00\x01' # len(member id) + b'b' # member id + b'\x00\x00\x00B' # len(member assignment) + b'\x00\x01\x00\x00\x00\x02\x00\x08mytopic1\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x04\x00\x00\x00\x06\x00\x00\x00\x08\x00\x08mytopic2\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x04\x00\x00\x00\x06\x00\x00\x00\x08' # member assignment + ) + ) + + def test_sync_group_response(self): + response = protocol.SyncGroupResponse( + bytearray( + b'\x00\x00' # error code + b'\x00\x00\x00H' # len(member assignment) + b'\x00\x01\x00\x00\x00\x01\x00\x14testtopic_replicated\x00\x00\x00\n\x00\x00\x00\x06\x00\x00\x00\x07\x00\x00\x00\x08\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04\x00\x00\x00\x05,pyk' # member assignment + ) + ) + self.assertEqual(response.error_code, 0) + expected_assignment = [(b'testtopic_replicated', [6, 7, 8, 9, 0, 1, 2, 3, 4, 5])] + self.assertEqual(response.member_assignment.partition_assignment, + expected_assignment) + + def test_heartbeat_request(self): + req = protocol.HeartbeatRequest(b'dummygroup', 1, b'testmember') + msg = req.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x00\x00-\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x07pykafka' # header + b'\x00\n' # len(group id) + b'dummygroup' # group id + b'\x00\x00\x00\x01' # generation id + b'\x00\n' # len(member id) + b'testmember' # member id + ) + ) + + def test_heartbeat_response(self): + response = protocol.HeartbeatResponse( + bytearray( + b'\x00\x00' # error code + ) + ) + self.assertEqual(response.error_code, 0) + + def test_leave_group_request(self): + req = protocol.LeaveGroupRequest(b'dummygroup', b'testmember') + msg = req.get_bytes() + self.assertEqual( + msg, + bytearray( + b'\x00\x00\x00)\x00\r\x00\x00\x00\x00\x00\x00\x00\x07pykafka' # header + b'\x00\n' # len(group id) + b'dummygroup' # group id + b'\x00\n' # len(member id) + b'testmember' # member id + ) + ) + + def test_leave_group_response(self): + response = protocol.LeaveGroupResponse( + bytearray( + b'\x00\x00' # error code + ) + ) + self.assertEqual(response.error_code, 0) + + if __name__ == '__main__': unittest2.main() diff --git a/tox.ini b/tox.ini index bf420cf0e..d4ee950c0 100644 --- a/tox.ini +++ b/tox.ini @@ -6,4 +6,4 @@ commands = pip install -r test-requirements.txt pip install -e . py.test {posargs} -passenv = BROKERS ZOOKEEPER KAFKA_BIN C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS +passenv = BROKERS ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS