diff --git a/.travis.yml b/.travis.yml index c4e2f1306..99b966979 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,31 +6,31 @@ cache: - $HOME/.ccache matrix: include: - - env: TOX_ENV=py27 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py27 KAFKA_VERSION=2.0.0 python: 2.7 - env: TOX_ENV=py27 KAFKA_VERSION=1.0.1 python: 2.7 - - env: TOX_ENV=py34 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py34 KAFKA_VERSION=2.0.0 python: 3.4 - env: TOX_ENV=py34 KAFKA_VERSION=1.0.1 python: 3.4 - - env: TOX_ENV=py35 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py35 KAFKA_VERSION=2.0.0 python: 3.5 - env: TOX_ENV=py35 KAFKA_VERSION=1.0.1 python: 3.5 - - env: TOX_ENV=py36 KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py36 KAFKA_VERSION=2.0.0 python: 3.6 - env: TOX_ENV=py36 KAFKA_VERSION=1.0.1 python: 3.6 - - env: TOX_ENV=pypy KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=pypy KAFKA_VERSION=2.0.0 python: pypy - env: TOX_ENV=pypy KAFKA_VERSION=1.0.1 python: pypy - - env: TOX_ENV=py27-gevent KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py27-gevent KAFKA_VERSION=2.0.0 python: 2.7 - env: TOX_ENV=py27-gevent KAFKA_VERSION=1.0.1 python: 2.7 - - env: TOX_ENV=py36-gevent KAFKA_VERSION=0.8.2.2 + - env: TOX_ENV=py36-gevent KAFKA_VERSION=2.0.0 python: 3.6 - env: TOX_ENV=py36-gevent KAFKA_VERSION=1.0.1 python: 3.6 @@ -38,6 +38,7 @@ env: global: - PATH="/usr/lib/ccache:$PATH" - KAFKA_BIN="$HOME/kafka-bin" + - PYTHONUNBUFFERED=1 addons: apt: @@ -68,9 +69,9 @@ install: fi - pip install -U pip setuptools - pip install codecov kazoo tox testinstances - - wget https://github.com/edenhill/librdkafka/archive/v0.9.5.tar.gz - - tar -xzf v0.9.5.tar.gz - - cd librdkafka-0.9.5/ && ./configure --prefix=$HOME + - wget https://github.com/edenhill/librdkafka/archive/v0.11.3.tar.gz + - tar -xzf v0.11.3.tar.gz + - cd librdkafka-0.11.3/ && ./configure --prefix=$HOME - make -j 2 && make -j 2 install && cd - before_script: @@ -79,10 +80,28 @@ before_script: - export LD_LIBRARY_PATH=$HOME/lib:$LD_LIBRARY_PATH - export CFLAGS="-coverage" - TEMPFILE=`tempfile` - - python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE & - - while true; do sleep 1; echo "Waiting for cluster..."; if [[ `grep ZOOKEEPER $TEMPFILE` ]]; then break; fi; done + - KAFKA_LOGS=`tempfile` + - python -m pykafka.test.kafka_instance 3 --kafka-version $KAFKA_VERSION --download-dir $KAFKA_BIN --export-hosts $TEMPFILE --log-level INFO 2>$KAFKA_LOGS & + - | + kafka_pid=$! + start=$(date +%s) + until grep ZOOKEEPER $TEMPFILE 1>/dev/null 2>/dev/null; do + sleep 1 + echo "Waiting for cluster..." + if [[ $(($(date +%s) - start)) -gt 300 ]]; then + echo "Timeout waiting for cluster!" + cat $KAFKA_LOGS + exit 1 + fi; + if ! kill -0 "$kafka_pid" >/dev/null 2>&1 ; then + echo "Kafka test cluster died during startup!" + cat $KAFKA_LOGS + exit 2 + fi + done - export `grep BROKERS $TEMPFILE` - export `grep BROKERS_SSL $TEMPFILE` + - export `grep BROKERS_SASL $TEMPFILE` - export `grep ZOOKEEPER $TEMPFILE` script: diff --git a/README.rst b/README.rst index 244055d57..9767e1edd 100644 --- a/README.rst +++ b/README.rst @@ -65,6 +65,13 @@ for further details): >>> client = KafkaClient(hosts="127.0.0.1:,...", ... ssl_config=config) +Or, for SASL authenticated connection, you might write (and also see i.e. ``PlainAuthenticator`` dos for further details): + +.. sourcecode:: python + + >>> from pykafka import KafkaClient, PlainAuthenticator + >>> authenticator = PlainAuthenticator(user='alice', password='alice-secret') + >>> client = KafkaClient(hosts="127.0.0.1:,...", sasl_authenticator=authenticator) If the cluster you've connected to has any topics defined on it, you can list them with: @@ -178,8 +185,10 @@ of the librdkafka shared objects. You can find this location with `locate librdk After that, all that's needed is that you pass an extra parameter ``use_rdkafka=True`` to ``topic.get_producer()``, -``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``. Note -that some configuration options may have different optimal values; it may be +``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``. +If you're using SASL authenticated connections, make sure to pass the ``security_protocol`` +parameter to your authenticator so librdkafka knows which endpoint to authenticate with. +Note that some configuration options may have different optimal values; it may be worthwhile to consult librdkafka's `configuration notes`_ for this. .. _0.9.1: https://github.com/edenhill/librdkafka/releases/tag/0.9.1 diff --git a/doc/api/sasl_authenticators.rst b/doc/api/sasl_authenticators.rst new file mode 100644 index 000000000..3e676e2c1 --- /dev/null +++ b/doc/api/sasl_authenticators.rst @@ -0,0 +1,5 @@ +pykafka.sasl_authenticators +================ + +.. automodule:: pykafka.sasl_authenticators + :members: diff --git a/pykafka/__init__.py b/pykafka/__init__.py index d9d0a595e..7f28c8543 100644 --- a/pykafka/__init__.py +++ b/pykafka/__init__.py @@ -11,6 +11,7 @@ from .balancedconsumer import BalancedConsumer from .managedbalancedconsumer import ManagedBalancedConsumer from .membershipprotocol import RangeProtocol, RoundRobinProtocol +from .sasl_authenticators import PlainAuthenticator, ScramAuthenticator __version__ = "2.8.1-dev.2" @@ -28,6 +29,8 @@ "ManagedBalancedConsumer", "RangeProtocol", "RoundRobinProtocol", + "PlainAuthenticator", + "ScramAuthenticator" ] logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/pykafka/broker.py b/pykafka/broker.py index bfcdfd15c..fc644c0da 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -62,6 +62,7 @@ def __init__(self, source_host='', source_port=0, ssl_config=None, + sasl_authenticator=None, broker_version="0.9.0", api_versions=None): """Create a Broker instance. @@ -92,6 +93,8 @@ def __init__(self, :type source_port: int :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -117,6 +120,7 @@ def __init__(self, self._req_handlers = {} self._broker_version = broker_version self._api_versions = api_versions + self._sasl_authenticator = sasl_authenticator try: self.connect() except SocketDisconnectedError: @@ -144,6 +148,7 @@ def from_metadata(cls, source_host='', source_port=0, ssl_config=None, + sasl_authenticator=None, broker_version="0.9.0", api_versions=None): """Create a Broker using BrokerMetadata @@ -169,6 +174,8 @@ def from_metadata(cls, :type source_port: int :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -184,6 +191,7 @@ def from_metadata(cls, source_host=source_host, source_port=source_port, ssl_config=ssl_config, + sasl_authenticator=sasl_authenticator, broker_version=broker_version, api_versions=api_versions) @@ -203,6 +211,22 @@ def offsets_channel_connected(self): return self._offsets_channel_connection.connected return False + @property + def authenticated(self): + """Returns True if this object's main connection to the Kafka broker + is authenticated + """ + return self._connection.authenticated + + @property + def offsets_channel_authenticated(self): + """Returns True if this object's offsets channel connection to the + Kafka broker is authenticated + """ + if self._offsets_channel_connection: + return self._offsets_channel_connection.authenticated + return False + @property def id(self): """The broker's ID within the Kafka cluster""" @@ -246,7 +270,8 @@ def connect(self, attempts=3): buffer_size=self._buffer_size, source_host=self._source_host, source_port=self._source_port, - ssl_config=self._ssl_config) + ssl_config=self._ssl_config, + sasl_authenticator=self._sasl_authenticator) self._connection.connect(self._socket_timeout_ms, attempts=attempts) self._req_handler = RequestHandler(self._handler, self._connection) self._req_handler.start() @@ -262,7 +287,8 @@ def connect_offsets_channel(self, attempts=3): self.host, self.port, self._handler, buffer_size=self._buffer_size, source_host=self._source_host, source_port=self._source_port, - ssl_config=self._ssl_config) + ssl_config=self._ssl_config, + sasl_authenticator=self._sasl_authenticator) self._offsets_channel_connection.connect(self._offsets_channel_socket_timeout_ms, attempts=attempts) self._offsets_channel_req_handler = RequestHandler( diff --git a/pykafka/client.py b/pykafka/client.py index 193e00f79..5b66d5742 100644 --- a/pykafka/client.py +++ b/pykafka/client.py @@ -88,6 +88,7 @@ def __init__(self, exclude_internal_topics=True, source_address='', ssl_config=None, + sasl_authenticator=None, broker_version='0.9.0'): """Create a connection to a Kafka cluster. @@ -118,6 +119,8 @@ def __init__(self, :type source_address: str `'host:port'` :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -139,6 +142,7 @@ def __init__(self, source_address=self._source_address, zookeeper_hosts=zookeeper_hosts, ssl_config=ssl_config, + sasl_authenticator=sasl_authenticator, broker_version=broker_version) self.brokers = self.cluster.brokers self.topics = self.cluster.topics diff --git a/pykafka/cluster.py b/pykafka/cluster.py index 28c3fdfc2..a43293c63 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -175,6 +175,7 @@ def __init__(self, source_address='', zookeeper_hosts=None, ssl_config=None, + sasl_authenticator=None, broker_version='0.9.0'): """Create a new Cluster instance. @@ -199,6 +200,8 @@ def __init__(self, :type source_address: str `'host:port'` :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` :param broker_version: The protocol version of the cluster being connected to. If this parameter doesn't match the actual broker version, some pykafka features may not work properly. @@ -214,6 +217,7 @@ def __init__(self, self._source_host = self._source_address.split(':')[0] self._source_port = 0 self._ssl_config = ssl_config + self.sasl_authenticator = sasl_authenticator self._zookeeper_connect = zookeeper_hosts self._max_connection_retries = 3 self._max_connection_retries_offset_mgr = 8 @@ -284,6 +288,7 @@ def _request_random_broker(self, broker_connects, req_fn): source_host=self._source_host, source_port=self._source_port, ssl_config=self._ssl_config, + sasl_authenticator=self.sasl_authenticator, broker_version=self._broker_version, api_versions=self._api_versions) response = req_fn(broker) @@ -402,6 +407,7 @@ def _update_brokers(self, broker_metadata, controller_id): source_host=self._source_host, source_port=self._source_port, ssl_config=self._ssl_config, + sasl_authenticator=self.sasl_authenticator, broker_version=self._broker_version, api_versions=self._api_versions) elif not self._brokers[id_].connected: diff --git a/pykafka/connection.py b/pykafka/connection.py index 203fae024..5289c13d5 100644 --- a/pykafka/connection.py +++ b/pykafka/connection.py @@ -117,7 +117,8 @@ def __init__(self, buffer_size=1024 * 1024, source_host='', source_port=0, - ssl_config=None): + ssl_config=None, + sasl_authenticator=None): """Initialize a socket connection to Kafka. :param host: The host to which to connect @@ -139,6 +140,8 @@ def __init__(self, :type source_port: int :param ssl_config: Config object for SSL connection :type ssl_config: :class:`pykafka.connection.SslConfig` + :param sasl_authenticator: Authenticator to use for authentication using sasl. + :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator` """ self._buff = bytearray(buffer_size) self.host = host @@ -149,6 +152,8 @@ def __init__(self, self.source_port = source_port self._wrap_socket = ( ssl_config.wrap_socket if ssl_config else lambda x: x) + self._sasl_authenticator = sasl_authenticator + self.authenticated = sasl_authenticator is None def __del__(self): """Close this connection when the object is deleted.""" @@ -161,6 +166,7 @@ def connected(self): def connect(self, timeout, attempts=1): """Connect to the broker, retrying if specified.""" + self.authenticated = False log.debug("Connecting to %s:%s", self.host, self.port) for attempt in range(0, attempts): try: @@ -172,6 +178,9 @@ def connect(self, timeout, attempts=1): ) ) log.debug("Successfully connected to %s:%s", self.host, self.port) + if self._sasl_authenticator is not None: + self._sasl_authenticator.authenticate(self) + self.authenticated = True return except (self._handler.SockErr, self._handler.GaiError) as err: log.info("Attempt %s: failed to connect to %s:%s", attempt, self.host, self.port) @@ -193,6 +202,7 @@ def disconnect(self): pass finally: self._socket = None + self.authenticated = False def reconnect(self): """Disconnect from the broker, then reconnect""" @@ -203,6 +213,7 @@ def request(self, request): """Send a request over the socket connection""" bytes_ = request.get_bytes() if not self._socket: + self.authenticated = False raise SocketDisconnectedError("".format(self.host, self.port)) try: self._socket.sendall(bytes_) @@ -211,7 +222,7 @@ def request(self, request): self.disconnect() raise SocketDisconnectedError("".format(self.host, self.port)) - def response(self): + def response_raw(self): """Wait for a response from the broker""" size = bytes() expected_len = 4 # Size => int32 @@ -231,5 +242,9 @@ def response(self): except SocketDisconnectedError: self.disconnect() raise SocketDisconnectedError("".format(self.host, self.port)) + return self._buff[:size] + + def response(self): # Drop CorrelationId => int32 - return buffer(self._buff[4:4 + size]) + return buffer(self.response_raw()[4:]) + diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index 70bc5dc9f..e5bf911a9 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -91,6 +91,11 @@ def __init__(self, partition, *args, **kwargs): self.partition = partition +class AuthenticationException(KafkaException): + """Indicates that something went wrong during Authentication.""" + pass + + """ Protocol Client Exceptions https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes @@ -254,6 +259,26 @@ class GroupAuthorizationFailed(ProtocolClientError): ERROR_CODE = 30 +class ClusteAuthorizationFailed(ProtocolClientError): + """Cluster authorization failed.""" + ERROR_CODE = 31 + + +class UnsupportedSaslMechanism(ProtocolClientError, AuthenticationException): + """The broker does not support the requested SASL mechanism.""" + ERROR_CODE = 33 + + +class IllegalSaslState(ProtocolClientError, AuthenticationException): + """Request is not valid given the current SASL state.""" + ERROR_CODE = 34 + + +class SaslAuthenticationFailed(ProtocolClientError, AuthenticationException): + """SASL authentication failed.""" + ERROR_CODE = 58 + + ERROR_CODES = dict( (exc.ERROR_CODE, exc) for exc in (UnknownError, @@ -276,7 +301,11 @@ class GroupAuthorizationFailed(ProtocolClientError): InvalidSessionTimeout, RebalanceInProgress, TopicAuthorizationFailed, - GroupAuthorizationFailed) + GroupAuthorizationFailed, + ClusteAuthorizationFailed, + UnsupportedSaslMechanism, + IllegalSaslState, + SaslAuthenticationFailed) ) diff --git a/pykafka/protocol/__init__.py b/pykafka/protocol/__init__.py index f6bd62d29..6722a0828 100644 --- a/pykafka/protocol/__init__.py +++ b/pykafka/protocol/__init__.py @@ -26,6 +26,8 @@ OffsetFetchRequestV1, OffsetFetchResponseV1, OffsetFetchRequestV2, OffsetFetchResponseV2) from .produce import ProduceRequest, ProduceResponse, ProducePartitionResponse +from .sasl import (SaslHandshakeRequest, SaslHandshakeRequestV1, SaslHandshakeResponse, SaslHandshakeResponseV1, + SaslAuthenticateRequest, SaslAuthenticateRquestV1, SaslAuthenticateResponse, SaslAuthenticateResponseV1) """ Author: Keith Bourgoin, Emmett Butler @@ -44,18 +46,18 @@ Each message is encoded as either a Request or Response: RequestOrResponse => Size (RequestMessage | ResponseMessage) -  Size => int32 + Size => int32 RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage -  ApiKey => int16 -  ApiVersion => int16 -  CorrelationId => int32 -  ClientId => string -  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest + ApiKey => int16 + ApiVersion => int16 + CorrelationId => int32 + ClientId => string + RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest Response => CorrelationId ResponseMessage -  CorrelationId => int32 -  ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse + CorrelationId => int32 + ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse """ __all__ = ["MetadataRequest", "MetadataResponse", "ProduceRequest", "ProduceResponse", "PartitionFetchRequest", "FetchRequest", "FetchPartitionResponse", @@ -79,4 +81,7 @@ "OffsetFetchResponseV1", "OffsetFetchRequestV2", "OffsetFetchResponseV2", "MetadataRequestV2", "MetadataResponseV2", "MetadataRequestV3", "MetadataResponseV3", "MetadataRequestV4", "MetadataResponseV4", - "MetadataRequestV5", "MetadataResponseV5"] + "MetadataRequestV5", "MetadataResponseV5", "SaslHandshakeRequest", + "SaslHandshakeRequestV1", "SaslHandshakeResponse", "SaslHandshakeResponseV1", + "SaslAuthenticateRequest", "SaslAuthenticateRquestV1", "SaslAuthenticateResponse", + "SaslAuthenticateResponseV1"] diff --git a/pykafka/protocol/sasl.py b/pykafka/protocol/sasl.py new file mode 100644 index 000000000..a0634e01a --- /dev/null +++ b/pykafka/protocol/sasl.py @@ -0,0 +1,183 @@ +import struct + +from pykafka.utils import struct_helpers +from .base import Request, Response + + +class SaslHandshakeRequest(Request): + """A SASL handshake request. + Specification:: + + SaslHandshake Request (Version: 0) => mechanism + mechanism => STRING + """ + API_KEY = 17 + API_VERSION = 0 + + @classmethod + def get_versions(cls): + return {0: SaslHandshakeRequest, 1: SaslHandshakeRequestV1} + + def __init__(self, mechanism): + self.mechanism = mechanism.encode() + + def __len__(self): + return self.HEADER_LEN + 2 + len(self.mechanism) + + def get_bytes(self): + """Create new sasl handshake request""" + output = bytearray(len(self)) + self._write_header(output, api_version=self.API_VERSION) + offset = self.HEADER_LEN + fmt = '!h%ds' % len(self.mechanism) + struct.pack_into(fmt, output, offset, len(self.mechanism), self.mechanism) + return output + + +class SaslHandshakeRequestV1(SaslHandshakeRequest): + """A SASL handshake request. + Specification:: + + SaslHandshake Request (Version: 1) => mechanism + mechanism => STRING + """ + API_VERSION = 1 + + +class SaslHandshakeResponse(Response): + """A SASL handshake response. + Specification:: + + SaslHandshake Response (Version: 0) => error_code [mechanisms] + error_code => INT16 + mechanisms => STRING + """ + API_KEY = 17 + + @classmethod + def get_versions(cls): + return {0: SaslHandshakeResponse, 1: SaslHandshakeResponseV1} + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h [S]' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.mechanisms = response[1] + + +class SaslHandshakeResponseV1(SaslHandshakeResponse): + """A SASL handshake response. + Specification:: + + SaslHandshake Response (Version: 1) => error_code [mechanisms] + error_code => INT16 + mechanisms => STRING + """ + API_VERSION = 1 + + +class SaslAuthenticateRequest(Request): + """A SASL authenticate request + Specification:: + + SaslAuthenticate Request (Version: 0) => auth_bytes + auth_bytes => BYTES + """ + API_KEY = 36 + API_VERSION = 0 + + @classmethod + def get_versions(cls): + return {0: SaslAuthenticateRequest, 1: SaslAuthenticateRquestV1} + + def __init__(self, auth_bytes): + self.auth_bytes = auth_bytes + + def __len__(self): + if self.auth_bytes is not None: + return self.HEADER_LEN + 4 + len(self.auth_bytes) + return self.HEADER_LEN + 4 + + def get_bytes(self): + """Create new sasl authenticate request""" + output = bytearray(len(self)) + self._write_header(output, api_version=self.API_VERSION) + offset = self.HEADER_LEN + if self.auth_bytes is not None: + fmt = '!i%ds' % len(self.auth_bytes) + struct.pack_into(fmt, output, offset, len(self.auth_bytes), self.auth_bytes) + else: + fmt = '!i' + struct.pack_into(fmt, output, offset, -1) + return output + + +class SaslAuthenticateRquestV1(SaslAuthenticateRequest): + """A SASL authenticate request + Specification:: + + SaslAuthenticate Request (Version: 1) => auth_bytes + auth_bytes => BYTES + """ + API_VERSION = 1 + + +class SaslAuthenticateResponse(Response): + """A SASL authenticate response + Specification:: + + SaslAuthenticate Response (Version: 0) => error_code error_message auth_bytes + error_code => INT16 + error_message => NULLABLE_STRING + auth_bytes => BYTES + """ + API_KEY = 36 + + @classmethod + def get_versions(cls): + return {0: SaslAuthenticateResponse, 1: SaslAuthenticateResponseV1} + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h S Y' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.error_message = response[1].decode() if response[1] is not None else None + self.auth_bytes = response[2] + + +class SaslAuthenticateResponseV1(SaslAuthenticateResponse): + """A SASL authenticate response + Specification:: + + SaslAuthenticate Response (Version: 1) => error_code error_message auth_bytes session_lifetime_ms + error_code => INT16 + error_message => NULLABLE_STRING + auth_bytes => BYTES + session_lifetime_ms => INT64 + """ + + def __init__(self, buff): + """Deserialize into a new Response + + :param buff: Serialized message + :type buff: :class:`bytearray` + """ + fmt = 'h S Y q' + response = struct_helpers.unpack_from(fmt, buff, 0) + + self.error_code = response[0] + self.error_message = response[1].decode() if response[1] is not None else None + self.auth_bytes = response[2] + self.session_lifetime_ms = response[3] diff --git a/pykafka/rdkafka/producer.py b/pykafka/rdkafka/producer.py index cdb9b1e27..86ff1025c 100644 --- a/pykafka/rdkafka/producer.py +++ b/pykafka/rdkafka/producer.py @@ -57,6 +57,7 @@ def __init__(self, self._rdk_producer = None self._poller_thread = None self._stop_poller_thread = cluster.handler.Event() + self._sasl_conf = {} if cluster.sasl_authenticator is None else cluster.sasl_authenticator.get_rd_kafka_opts() # super() must come last because it calls start() super(RdKafkaProducer, self).__init__(**callargs) @@ -191,6 +192,10 @@ def _mk_rdkafka_config_lists(self): # "partitioner" # dealt with in pykafka # "opaque" } + + # append configurations necessary for sasl authentication + conf.update(self._sasl_conf) + # librdkafka expects all config values as strings: conf = [(key, str(conf[key])) for key in conf] topic_conf = [(key, str(topic_conf[key])) for key in topic_conf] diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index d3444463d..d286c89aa 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -64,6 +64,7 @@ def __init__(self, self._stop_poller_thread = cluster.handler.Event() self._broker_version = cluster._broker_version self._fetch_error_backoff_ms = valid_int(fetch_error_backoff_ms) + self._sasl_conf = {} if cluster.sasl_authenticator is None else cluster.sasl_authenticator.get_rd_kafka_opts() # super() must come last for the case where auto_start=True super(RdKafkaSimpleConsumer, self).__init__(**callargs) @@ -248,9 +249,10 @@ def _mk_rdkafka_config_lists(self): # queued.max.messages.kbytes so for now we infer the implied # maximum (which, with default settings, is ~2GB per partition): "queued.min.messages": self._queued_max_messages, - "queued.max.messages.kbytes": str( - self._queued_max_messages - * self._fetch_message_max_bytes // 1024), + "queued.max.messages.kbytes": min( + 2097151, + self._queued_max_messages * self._fetch_message_max_bytes // 1024 + ), "fetch.wait.max.ms": self._fetch_wait_max_ms, "fetch.message.max.bytes": self._fetch_message_max_bytes, @@ -285,6 +287,10 @@ def _mk_rdkafka_config_lists(self): ##"offset.store.sync.interval.ms" ##"offset.store.method" } + + # append configurations necessary for sasl authentication + conf.update(self._sasl_conf) + # librdkafka expects all config values as strings: conf = [(key, str(conf[key])) for key in conf] topic_conf = [(key, str(topic_conf[key])) for key in topic_conf] diff --git a/pykafka/sasl_authenticators.py b/pykafka/sasl_authenticators.py new file mode 100644 index 000000000..7f573ef07 --- /dev/null +++ b/pykafka/sasl_authenticators.py @@ -0,0 +1,343 @@ +import base64 +import hashlib +import hmac +import logging +import struct +from uuid import uuid4 + +import six + +from .exceptions import AuthenticationException, ERROR_CODES, UnsupportedSaslMechanism +from .protocol import ( + SaslHandshakeRequest, + SaslHandshakeResponse, + ApiVersionsRequest, + ApiVersionsResponse, + SaslAuthenticateRequest, + SaslAuthenticateResponse, +) + +log = logging.getLogger(__name__) + + +if six.PY2: + def xor_bytes(left, right): + return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) +else: + def xor_bytes(left, right): + return bytes(lb ^ rb for lb, rb in zip(left, right)) + + +class BytesWrapper(object): + """ + Class that implements :meth:`get_bytes` and wraps some payload so it can be used for + :meth:`connection.BrokerConnection.request` during legacy sasl authentication sequence. + """ + + def __init__(self, payload): + """ + Create a new FakeRequest. + + :param payload: The payload to wrap + :type payload: bytes + """ + self.payload = payload + + def get_bytes(self): + return struct.pack("!i", len(self.payload)) + self.payload + + +class BaseAuthenticator(object): + """ + Base class for authentication mechanisms. + Subclasses are supposed to implement: + 1. :meth:`BaseAuthenticator.get_rd_kafka_opts` which should return a dictionary + whose items will be appended to the config given to librdkafka consumers and producers. + 2. :meth:`BaseAuthenticator.exchange_tokens` which is supposed to use + :meth:`BaseAuthenticator.send_and_receive` to send and receive the byte strings necessary to authenticate + with the broker. + """ + + MAX_AUTH_VERSION = 1 + MAX_HANDSHAKE_VERSION = 1 + + def __init__(self, mechanism, security_protocol=None): + """ + Base class for SASL authentication mechanisms. + + :param mechanism: The mechanism this authenticator is supposed to use. + :type mechanism: str + :param security_protocol: The security protocol determining the broker endpoint this + authenticator is supposed to authenticate with. + Only used for rdkafka based consumers and producers. + """ + + self.mechanism = mechanism + self.handshake_version = None + self.auth_version = None + self.security_protocol = security_protocol + self._broker_connection = None + + def get_rd_kafka_opts(self): + """ + Creates the config entries necessary for librdkafka to successfully authenticate with the broker. + + :return: Dictionary to enrich config for librdkafka based consumers and producers. + """ + raise NotImplementedError() + + def authenticate(self, broker_connection): + """ + Runs the authentication sequence on the given broker connection. + + .. warning:: + This is not thread safe! + + :param broker_connection: The broker connection to authenticate with. + :type broker_connection: :class:`pykafka.connection.BrokerConnection` + """ + self._broker_connection = broker_connection + if self.handshake_version is None: + self._fetch_api_versions() + log.debug( + "Authenticating to {}:{} using mechanism {}.".format( + self._broker_connection.host, self._broker_connection.port, self.mechanism + ) + ) + self._initialize_authentication() + self.exchange_tokens() + log.debug("Authentication successful.") + + def _initialize_authentication(self): + """ + Initializes the authentication sequence. + """ + self._broker_connection.request(SaslHandshakeRequest.get_versions()[self.handshake_version](self.mechanism)) + response = SaslHandshakeResponse.get_versions()[self.handshake_version](self._broker_connection.response()) + if response.error_code != 0: + if response.error_code == UnsupportedSaslMechanism.ERROR_CODE: + msg = "Broker only supports sasl mechanisms {}, requested was {}" + raise UnsupportedSaslMechanism(msg.format(",".join(response.mechanisms), self.mechanism)) + raise ERROR_CODES[response.error_code]("Authentication Handshake failed") + + def exchange_tokens(self): + """ + Runs the authentication sequence. Implementation varies among SASL mechanism and has to be supplied by + subclasses. See also :meth:`PlainAuthenticator.exchange_tokens` or :meth:`ScramAuthenticator.exchange_tokens` + for exemplary implementations. + """ + raise NotImplementedError() + + def send_and_receive(self, token): + """ + Sends the given token to the broker and receives the brokers response. + This will automatically use the appropriate mechanism to do so. + I.e. use SaslAuthenticateRequest if the server supports it or just send the bytes directly if it doesn't. + + :param token: The token to be sent to the broker. + :type token: bytes + :return: bytes, the servers response + """ + self._send_token(token) + return self._receive_token() + + def _send_token(self, token): + log.debug("Seding auth token") + if self.handshake_version == 0: + req = BytesWrapper(token) + else: + req = SaslAuthenticateRequest.get_versions()[self.auth_version](token) + self._broker_connection.request(req) + + def _receive_token(self): + log.debug("Receiving auth token") + if self.handshake_version == 0: + return self._broker_connection.response_raw() + + data = self._broker_connection.response() + response = SaslAuthenticateResponse.get_versions()[self.auth_version](data) + if response.error_code != 0: + raise ERROR_CODES[response.error_code](response.error_message) + return response.auth_bytes + + def _fetch_api_versions(self): + """ + The api version request can be run without authentication in order to determine which authentication api + versions to use. That's what this method does. + """ + log.debug("Fetch SASL authentication api versions.") + self._broker_connection.request(ApiVersionsRequest()) + response = ApiVersionsResponse(self._broker_connection.response()) + + self.handshake_version = response.api_versions[SaslHandshakeRequest.API_KEY].max + self.auth_version = response.api_versions.get(SaslAuthenticateRequest.API_KEY, None) + + self.handshake_version = min(self.MAX_HANDSHAKE_VERSION, self.handshake_version) + if self.auth_version is not None: + self.auth_version = min(self.auth_version.max, self.MAX_AUTH_VERSION) + log.debug( + "Determinded handshake api version {} and authenticate api version {}".format( + self.handshake_version, self.auth_version + ) + ) + + +class ScramAuthenticator(BaseAuthenticator): + """ + Authenticates with Kafka using the salted challenge response authentication mechanism. + """ + + MECHANISMS = {"SCRAM-SHA-256": ("sha256", hashlib.sha256), "SCRAM-SHA-512": ("sha512", hashlib.sha512)} + + def __init__(self, mechanism, user, password, security_protocol=None): + """ + Create new ScramAuthenticator + + :param mechanism: The mechanism this authenticator is supposed to use. + :type mechanism: str, one of 'SCRAM-SHA-256' or 'SCRAM-SHA-512' + :param user: The user to authenticate as. + :type user: str + :param password: The user's password. + :type password: str + :param security_protocol: The security protocol determining the broker endpoint this + authenticator is supposed to authenticate with. + Only used for rdkafka based consumers and producers. + """ + super(ScramAuthenticator, self).__init__(mechanism, security_protocol) + self.nonce = None + self.auth_message = None + self.salted_password = None + self.user = user + self.password = password.encode() + self.hashname, self.hashfunc = self.MECHANISMS[mechanism] + self.mechanism = mechanism + self.stored_key = None + self.client_key = None + self.client_signature = None + self.client_proof = None + self.server_key = None + self.server_signature = None + + def client_first_message(self): + """ + Create and return the client first message. This will also reset all internal variables. + :return: str, the client first message + """ + self.nonce = str(uuid4()).replace("-", "") + client_first_bare = "n={},r={}".format(self.user, self.nonce) + self.auth_message = client_first_bare + return "n,," + client_first_bare + + def process_server_first_message(self, server_first_message): + """ + Parse and process server first message, this will extract all necessary information from the server's first + response such as iteration count or salt and use it to prepare the client final message. + + :param server_first_message: The first message sent by the server + :type server_first_message: str + """ + self.auth_message += "," + server_first_message + params = dict(pair.split("=", 1) for pair in server_first_message.split(",")) + server_nonce = params["r"] + if not server_nonce.startswith(self.nonce): + raise AuthenticationException("Server nonce, did not start with client nonce!") + self.nonce = server_nonce + self.auth_message += ",c=biws,r=" + self.nonce + + salt = base64.b64decode(params["s"].encode()) + iterations = int(params["i"]) + self._create_salted_password(salt, iterations) + + self.client_key = self._hmac(self.salted_password, b"Client Key") + self.stored_key = self.hashfunc(self.client_key).digest() + self.client_signature = self._hmac(self.stored_key, self.auth_message.encode()) + self.client_proof = xor_bytes(self.client_key, self.client_signature) + self.server_key = self._hmac(self.salted_password, b"Server Key") + self.server_signature = self._hmac(self.server_key, self.auth_message.encode()) + + def _hmac(self, key, msg): + """ + Run the hmac algorithm on `key` and `msg` using the appropriate digest method for the configures scram + mechanism. + :param key: The key for the hmac algorithm + :type key: bytes + :param msg: The message for the hmac algorithm + :type msg: bytes + :return: bytes, the result of applying hmac on `key` and `msg` + """ + return hmac.new(key, msg, digestmod=self.hashfunc).digest() + + def _create_salted_password(self, salt, iterations): + self.salted_password = hashlib.pbkdf2_hmac(self.hashname, self.password, salt, iterations) + + def client_final_message(self): + """ + Create and return the client final message. + :return: str, the client final message + """ + return "c=biws,r={},p={}".format(self.nonce, base64.b64encode(self.client_proof).decode()) + + def process_server_final_message(self, server_final_message): + """ + Parse and process server final message. This will run validation on the server's response to make sure that + everything is all right. + + :param server_final_message: The first message sent by the server + :type server_final_message: str + """ + params = dict(pair.split("=", 1) for pair in server_final_message.split(",")) + if self.server_signature != base64.b64decode(params["v"].encode()): + raise AuthenticationException("Server sent wrong signature!") + + def get_rd_kafka_opts(self): + return { + "sasl.mechanisms": self.mechanism, + "sasl.username": self.user, + "sasl.password": self.password.decode(), + "security.protocol": self.security_protocol, + } + + def exchange_tokens(self): + client_first = self.client_first_message() + server_first = self.send_and_receive(client_first.encode()).decode() + self.process_server_first_message(server_first) + + client_final = self.client_final_message() + server_final = self.send_and_receive(client_final.encode()).decode() + self.process_server_final_message(server_final) + + +class PlainAuthenticator(BaseAuthenticator): + """ + Authenticates with kafka using the Plain mechanism. I.e. sending user and password in plaintext. + """ + + def __init__(self, user, password, security_protocol=None): + """ + Create new PlainAuthenticator. + + :param user: The user to authenticate as. + :type user: str + :param password: The user's password. + :type password: str + :param security_protocol: The security protocol determining the broker endpoint this + authenticator is supposed to authenticate with. + Only used for rdkafka based consumers and producers. + """ + super(PlainAuthenticator, self).__init__("PLAIN", security_protocol) + self.user = user + self.password = password + + def get_rd_kafka_opts(self): + return { + "sasl.mechanisms": self.mechanism, + "sasl.username": self.user, + "sasl.password": self.password, + "security.protocol": self.security_protocol, + } + + def exchange_tokens(self): + token = "\0".join([self.user, self.user, self.password]).encode() + response = self.send_and_receive(token) + if response != b"": + raise AuthenticationException("Server sent unexpected response!") diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index eefe8ed5c..7ea085980 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -32,13 +32,16 @@ from testinstances.managed_instance import ManagedInstance from pykafka.utils.compat import range, get_bytes, get_string +SASL_USER = 'alice' +SASL_PASSWORD = 'alice-secret' log = logging.getLogger(__name__) _kafka_properties = """ # Configurable settings broker.id={broker_id} -{port_config} +listeners={listeners} +advertised.listeners={listeners} zookeeper.connect={zk_connstr} log.dirs={data_dir} @@ -57,7 +60,6 @@ """ _kafka_ssl_properties = """ -listeners=PLAINTEXT://localhost:{port},SSL://localhost:{ssl_port} ssl.keystore.location={keystore_path} ssl.keystore.password={store_pass} ssl.key.password={store_pass} @@ -71,25 +73,50 @@ maxClientCnxns=0 """ +_kafka_sasl_properties = """ +sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 +security.inter.broker.protocol=SASL_PLAINTEXT +sasl.mechanism.inter.broker.protocol=PLAIN +""" + +_kafka_server_jaas_config = """ +KafkaServer {{ + org.apache.kafka.common.security.plain.PlainLoginModule required + username="{user}" + password="{password}" + user_{user}="{password}"; + + org.apache.kafka.common.security.scram.ScramLoginModule required + username="{user}" + password="{password}"; +}}; + +Client {{ +}}; +""" + + class KafkaConnection(object): """Connection to a Kafka cluster. Provides handy access to the shell scripts Kafka is bundled with. """ - def __init__(self, bin_dir, brokers, zookeeper, brokers_ssl=None): + def __init__(self, bin_dir, brokers, zookeeper, brokers_ssl=None, brokers_sasl=None): """Create a connection to the cluster. :param bin_dir: Location of downloaded kafka bin :param brokers: Comma-separated list of brokers - :param zookeeper: Connection straing for ZK + :param zookeeper: Connection string for ZK :param brokers_ssl: Comma-separated list of hosts with ssl-ports + :param brokers_sasl: Comma-separated list of hosts with sasl-ports """ self._bin_dir = bin_dir self.brokers = brokers self.zookeeper = zookeeper self.brokers_ssl = brokers_ssl + self.brokers_sasl = brokers_sasl self.certs = CertManager(bin_dir) if brokers_ssl is not None else None def _run_topics_sh(self, args): @@ -109,6 +136,22 @@ def create_topic(self, topic_name, num_partitions, replication_factor): '--replication-factor', replication_factor]) time.sleep(2) + def _run_configs_sh(self, args): + """Run kafka-config.sh with the provided list of arguments.""" + binfile = os.path.join(self._bin_dir, 'bin/kafka-configs.sh') + cmd = [binfile, '--zookeeper', self.zookeeper] + args + cmd = [get_string(c) for c in cmd] # execv needs only strings + log.debug('running: %s', ' '.join(cmd)) + return subprocess.check_output(cmd) + + def create_scram_user(self, user, password): + self._run_configs_sh([ + '--alter', + '--entity-type', 'users', + '--entity-name', user, + '--add-config', 'SCRAM-SHA-256=[password={pw}],SCRAM-SHA-512=[password={pw}]'.format(pw=password) + ]) + def delete_topic(self, topic_name): self._run_topics_sh(['--delete', '--topic', topic_name]) @@ -151,21 +194,32 @@ def __init__(self, use_gevent=False): """Start kafkainstace with given settings""" self._num_instances = num_instances - self._kafka_version = kafka_version + self._kafka_version = tuple(int(v) for v in kafka_version.split('.')) self._scala_version = scala_version self._bin_dir = bin_dir self._processes = [] self._broker_procs = [] self._brokers_started = 0 # incremented by _start_broker + self._all_log_files = {} + self._all_procs = {} self.zookeeper = None self.brokers = None self.brokers_ssl = None + self.brokers_sasl = None + self.sasl_enabled = self._kafka_version > (0, 10, 2) # SASL Scram only supported since 0.10.2 self.certs = self._gen_ssl_certs() # TODO: Need a better name so multiple can run at once. # other ManagedInstances use things like 'name-port' ManagedInstance.__init__(self, name, use_gevent=use_gevent) self.connection = KafkaConnection( bin_dir, self.brokers, self.zookeeper, self.brokers_ssl) + if self.sasl_enabled: + log.info("Creating scram user {}".format(SASL_USER)) + self.connection.create_scram_user(SASL_USER, SASL_PASSWORD) + log.info("Waiting for Cluster to fetch the userdata.") + # We could wait for the line "Processing override for entityPath: users/alice" but that's not completely + # reliable either + time.sleep(5) def _init_dirs(self): """Set up directories in the temp folder.""" @@ -182,8 +236,7 @@ def _init_dirs(self): def _download_kafka(self): """Make sure the Kafka code has been downloaded to the right dir.""" - binfile = os.path.join(self._bin_dir, - 'bin/kafka-server-start.sh') + binfile = os.path.join(self._bin_dir, 'bin/kafka-server-start.sh') if os.path.exists(binfile): return # already there @@ -201,7 +254,7 @@ def _download_kafka(self): url_fmt = 'https://archive.apache.org/dist/kafka/{kafka_version}/kafka_{scala_version}-{kafka_version}.tgz' url = url_fmt.format( scala_version=self._scala_version, - kafka_version=self._kafka_version + kafka_version='.'.join(str(v) for v in self._kafka_version) ) p1 = subprocess.Popen(['curl', '-vs', url], stdout=subprocess.PIPE) p2 = subprocess.Popen(['tar', 'xvz', '-C', self._bin_dir, @@ -210,9 +263,24 @@ def _download_kafka(self): p1.stdout.close() output, err = p2.communicate() os.chdir(curr_dir) - + if (1, 0, 0) <= self._kafka_version < (1, 1, 1): + # java version parsing is broken for some java versions in this kafka version range + log.info("Fixing java version parser in kafka-run-class.sh") + self._fix_run_class_sh() log.info('Downloaded Kafka to %s', self._bin_dir) + def _fix_run_class_sh(self): + run_class_sh = os.path.join(self._bin_dir, 'bin/kafka-run-class.sh') + parser_line = " JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version \"([0-9]*).*$/\\1/p')\n" + with open(run_class_sh, 'r') as o: + script_lines = o.readlines() + with open(run_class_sh, 'w') as o: + for line in script_lines: + if line.strip().startswith("JAVA_MAJOR_VERSION="): + o.write(parser_line) + else: + o.write(line) + def _is_port_free(self, port): """Check to see if a port is open""" try: @@ -243,12 +311,19 @@ def _add_ssl_broker(self, ssl_broker_port): brokers_ssl.append(new_broker_ssl) self.brokers_ssl = ",".join(brokers_ssl) + def _add_sasl_broker(self, sasl_broker_port): + if sasl_broker_port: + new_broker_sasl = "localhost:{}".format(sasl_broker_port) + brokers_sasl = self.brokers_sasl.split(",") if self.brokers_sasl else [] + brokers_sasl.append(new_broker_sasl) + self.brokers_sasl = ",".join(brokers_sasl) + def _gen_ssl_certs(self): """Attempt generating ssl certificates for testing :returns: :class:`CertManager` or None upon failure """ - if self._kafka_version >= "0.9": # no SSL support in earlier versions + if self._kafka_version >= (0, 9): # no SSL support in earlier versions try: return CertManager(self._bin_dir) except: # eg. because openssl or other tools not installed @@ -263,17 +338,32 @@ def _start_process(self): zk_port = self._start_zookeeper() self.zookeeper = 'localhost:{port}'.format(port=zk_port) - broker_ports, broker_ssl_ports = self._start_brokers() + broker_ports, broker_ssl_ports, broker_sasl_ports = self._start_brokers() # Process is started when the port isn't free anymore - all_ports = [zk_port] + broker_ports - for i in range(10): + all_ports = [zk_port] + broker_ports + broker_ssl_ports + broker_sasl_ports + + for _ in range(180): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') return # hooray! success + if any(proc.poll() is not None for proc in self._all_procs.values()): + msg = "One or more processes terminated already!" + for name, proc in self._all_procs.items(): + returncode = proc.poll() + if returncode is None: + msg += "\n {} is still running".format(name) + else: + msg += "\n {} exited with {}".format(name, returncode) + log.error("{} exited with {}".format(name, returncode)) + log.error("{} logs:".format(name)) + with open(self._all_log_files[name], 'r') as o: + for line in o: + log.error(line.strip()) + + raise ProcessNotStartingError(msg) log.info('Waiting for cluster to start....') time.sleep(6) # Waits 60s total - # If it got this far, it's an error raise ProcessNotStartingError('Unable to start Kafka cluster.') @@ -292,21 +382,17 @@ def _start_log_watcher(self): watch_thread.daemon = True watch_thread.start() - def _start_broker_proc(self, port, ssl_port=None): + def _start_broker_proc(self, port, ssl_port=None, sasl_port=None): """Start a broker proc and maintain handlers Returns a proc handler for the new broker. """ # make port config for new broker - if self.certs is not None: - port_config = _kafka_ssl_properties.format( - port=port, - ssl_port=ssl_port, - keystore_path=self.certs.keystore, - truststore_path=self.certs.truststore, - store_pass=self.certs.broker_pass) - else: - port_config = "port={}".format(port) + listeners = ['PLAINTEXT://localhost:{}'.format(port)] + if ssl_port is not None: + listeners.append('SSL://localhost:{}'.format(ssl_port)) + if sasl_port is not None: + listeners.append('SASL_PLAINTEXT://localhost:{}'.format(sasl_port)) self._brokers_started += 1 i = self._brokers_started @@ -314,38 +400,54 @@ def _start_broker_proc(self, port, ssl_port=None): # write conf file for the new broker conf = os.path.join(self._conf_dir, 'kafka_{instance}.properties'.format(instance=i)) + jaas_conf = os.path.join(self._conf_dir, 'kafka_{instance}_jaas.conf'.format(instance=i)) with open(conf, 'w') as f: f.write(_kafka_properties.format( broker_id=i, - port_config=port_config, + listeners=','.join(listeners), zk_connstr=self.zookeeper, data_dir=self._data_dir + '_{instance}'.format(instance=i), )) - + if ssl_port: + f.write(_kafka_ssl_properties.format( + port=port, + keystore_path=self.certs.keystore, + truststore_path=self.certs.truststore, + store_pass=self.certs.broker_pass) + ) + if sasl_port: + f.write(_kafka_sasl_properties) + with open(jaas_conf, 'w') as o: + o.write(_kafka_server_jaas_config.format(user=SASL_USER, password=SASL_PASSWORD)) # start process and append to self._broker_procs binfile = os.path.join(self._bin_dir, 'bin/kafka-server-start.sh') logfile = os.path.join(self._log_dir, 'kafka_{instance}.log'.format(instance=i)) new_proc = (utils.Popen( args=[binfile, conf], stderr=utils.STDOUT, - stdout=open(logfile, 'w'), - use_gevent=self.use_gevent + stdout=open(logfile, 'w', buffering=1), + use_gevent=self.use_gevent, + env={} if sasl_port is None else {'KAFKA_OPTS': '-Djava.security.auth.login.config={}'.format(jaas_conf)} )) self._broker_procs.append(new_proc) + self._all_procs["kafka_{}".format(i)] = new_proc + self._all_log_files["kafka_{}".format(i)] = logfile # add localhost:port to internal list of (ssl)brokers self._add_broker(port) self._add_ssl_broker(ssl_port) + self._add_sasl_broker(sasl_port) return new_proc - def _start_brokers(self): """Start all brokers and return used ports.""" ports = self._port_generator(9092) used_ports = [] used_ssl_ports = [] + used_sasl_ports = [] ssl_port = None + sasl_port = None for i in range(self._num_instances): port = next(ports) used_ports.append(port) @@ -354,9 +456,12 @@ def _start_brokers(self): if self.certs is not None: ssl_port = next(ports) used_ssl_ports.append(ssl_port) # to return at end - self._start_broker_proc(port, ssl_port) + if self.sasl_enabled: + sasl_port = next(ports) + used_sasl_ports.append(sasl_port) + self._start_broker_proc(port, ssl_port, sasl_port) - return used_ports, used_ssl_ports + return used_ports, used_ssl_ports, used_sasl_ports def _start_zookeeper(self): port = next(self._port_generator(2181)) @@ -372,9 +477,12 @@ def _start_zookeeper(self): self._zk_proc = utils.Popen( args=[binfile, conf], stderr=utils.STDOUT, - stdout=open(logfile, 'w'), + stdout=open(logfile, 'w', buffering=1), use_gevent=self.use_gevent ) + + self._all_procs["zookeeper"] = self._zk_proc + self._all_log_files["zookeeper"] = logfile return port def terminate(self): @@ -486,6 +594,7 @@ def _gen_broker_keystore(self): '-keystore', self.keystore, '-storepass:env', 'BROKER_PASS', '-keypass:env', 'BROKER_PASS', + '-keyalg', 'RSA', '-noprompt'] subprocess.check_call(cmd, env=env) cmd = ['keytool', '-certreq', '-alias', 'broker', @@ -555,8 +664,18 @@ def _gen_client_cert(self): help='Scala version for kafka build') parser.add_argument('--export-hosts', type=str, help='Write host strings to given file path') + parser.add_argument( + "--log-level", + choices=["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help='Set log level to given value, if "NOTSET" (default) no logging is active.', + default="NOTSET", + ) args = parser.parse_args() + if args.log_level != "NOTSET": + logging.basicConfig(level=getattr(logging, args.log_level), + format='%(threadName)10s - %(levelname)-8s - %(name)-12s - %(message)s') + _exiting = False def _catch_sigint(signum, frame): global _exiting @@ -570,6 +689,9 @@ def _catch_sigint(signum, frame): bin_dir=args.download_dir) print('Cluster started.') print('Brokers: {brokers}'.format(brokers=cluster.brokers)) + print('SSL Brokers: {brokers}'.format(brokers=cluster.brokers_ssl)) + print('SASL Brokers: {brokers}'.format(brokers=cluster.brokers_sasl)) + print('Log dir: {log_dirs}'.format(log_dirs=cluster._log_dir)) print('Zookeeper: {zk}'.format(zk=cluster.zookeeper)) print('Waiting for SIGINT to exit.') @@ -578,6 +700,8 @@ def _catch_sigint(signum, frame): f.write('BROKERS={}\n'.format(cluster.brokers)) if cluster.brokers_ssl: f.write('BROKERS_SSL={}\n'.format(cluster.brokers_ssl)) + if cluster.brokers_sasl: + f.write('BROKERS_SASL={}\n'.format(cluster.brokers_sasl)) f.write('ZOOKEEPER={}\n'.format(cluster.zookeeper)) while True: diff --git a/pykafka/test/utils.py b/pykafka/test/utils.py index eafbcaee9..eca6af474 100644 --- a/pykafka/test/utils.py +++ b/pykafka/test/utils.py @@ -18,7 +18,20 @@ def get_cluster(): return KafkaConnection(os.environ['KAFKA_BIN'], os.environ['BROKERS'], os.environ['ZOOKEEPER'], - os.environ.get('BROKERS_SSL', None)) + os.environ.get('BROKERS_SSL', None), + os.environ.get('BROKERS_SASL', None)) + elif os.environ.get('HOSTS_FILE', None): + # Broker is already running. Use that. + hosts = {} + with open(os.environ['HOSTS_FILE'], 'r') as o: + for line in o: + name, host = line.split('=', 1) + hosts[name.strip()] = host.strip() + return KafkaConnection(os.environ['KAFKA_BIN'], + hosts['BROKERS'], + hosts['ZOOKEEPER'], + hosts.get('BROKERS_SSL', None), + hosts.get('BROKERS_SASL', None)) else: return KafkaInstance(num_instances=3) diff --git a/test-requirements.txt b/test-requirements.txt index 793ff30e3..2e02f79b8 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,5 @@ lz4==2.1.10 lz4tools==1.3.1.2 -pytest==4.6.2 pytest-cov python-snappy mock diff --git a/tests/conftest.py b/tests/conftest.py index 5c136b325..815594883 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,2 +1,57 @@ import logging +import os + +import pytest + +from pykafka import PlainAuthenticator, ScramAuthenticator +from pykafka.test.kafka_instance import SASL_PASSWORD, SASL_USER +from pykafka.test.utils import get_cluster, stop_cluster + logging.basicConfig(level=logging.DEBUG) + +KAFKA_VERSION = tuple(int(v) for v in os.environ.get("KAFKA_VERSION", "0.8.0").split(".")) + + +@pytest.fixture +def kafka_version(): + return KAFKA_VERSION + + +@pytest.fixture( + params=[ + pytest.param( + "PLAIN", marks=pytest.mark.skipif(KAFKA_VERSION < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") + ), + pytest.param( + "SCRAM-SHA-256", + marks=pytest.mark.skipif(KAFKA_VERSION < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + pytest.param( + "SCRAM-SHA-512", + marks=pytest.mark.skipif(KAFKA_VERSION < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + ] +) +def authenticator(request): + sasl_mechanism = request.param + if sasl_mechanism.startswith("SCRAM"): + return ScramAuthenticator( + sasl_mechanism, user=SASL_USER, password=SASL_PASSWORD, security_protocol="SASL_PLAINTEXT" + ) + else: + return PlainAuthenticator(user=SASL_USER, password=SASL_PASSWORD, security_protocol="SASL_PLAINTEXT") + + +@pytest.fixture(scope="session") +def kafka(): + kafka = get_cluster() + yield kafka + stop_cluster(kafka) + + +@pytest.fixture +def sasl_kafka(kafka): + if not kafka.brokers_sasl: + pytest.skip("Cluster has no SASL endpoint.") + else: + yield kafka diff --git a/tests/pykafka/__init__.py b/tests/pykafka_tests/__init__.py similarity index 100% rename from tests/pykafka/__init__.py rename to tests/pykafka_tests/__init__.py diff --git a/tests/pykafka/rdkafka/__init__.py b/tests/pykafka_tests/rdkafka/__init__.py similarity index 100% rename from tests/pykafka/rdkafka/__init__.py rename to tests/pykafka_tests/rdkafka/__init__.py diff --git a/tests/pykafka/rdkafka/test_rd_kafka_consumer.py b/tests/pykafka_tests/rdkafka/test_rd_kafka_consumer.py similarity index 100% rename from tests/pykafka/rdkafka/test_rd_kafka_consumer.py rename to tests/pykafka_tests/rdkafka/test_rd_kafka_consumer.py diff --git a/tests/pykafka_tests/rdkafka/test_sasl.py b/tests/pykafka_tests/rdkafka/test_sasl.py new file mode 100644 index 000000000..72b208951 --- /dev/null +++ b/tests/pykafka_tests/rdkafka/test_sasl.py @@ -0,0 +1,25 @@ +import pytest +from uuid import uuid4 + +from pykafka import KafkaClient +try: + from pykafka.rdkafka import _rd_kafka + RDKAFKA = True +except ImportError: + RDKAFKA = False # C extension not built + + +@pytest.mark.skipif(not RDKAFKA, reason="C extension for librdkafka not built.") +def test_sasl_roundtrip_rdkafka(sasl_kafka, authenticator, kafka_version): + client = KafkaClient(sasl_kafka.brokers_sasl, sasl_authenticator=authenticator, + broker_version='.'.join(str(v) for v in kafka_version)) + + topic_name = uuid4().hex.encode() + payload = uuid4().hex.encode() + topic = client.topics[topic_name] + + producer = topic.get_producer(use_rdkafka=True, sync=True) + producer.produce(payload) + + consumer = topic.get_simple_consumer(use_rdkafka=True, consumer_timeout_ms=5000) + assert consumer.consume().value == payload \ No newline at end of file diff --git a/tests/pykafka/rdkafka/test_simple_consumer.py b/tests/pykafka_tests/rdkafka/test_simple_consumer.py similarity index 97% rename from tests/pykafka/rdkafka/test_simple_consumer.py rename to tests/pykafka_tests/rdkafka/test_simple_consumer.py index ed3489468..d53a80d34 100644 --- a/tests/pykafka/rdkafka/test_simple_consumer.py +++ b/tests/pykafka_tests/rdkafka/test_simple_consumer.py @@ -1,6 +1,6 @@ import pytest -from tests.pykafka import test_simpleconsumer, test_balancedconsumer +from tests.pykafka_tests import test_simpleconsumer, test_balancedconsumer from pykafka.utils.compat import range try: from pykafka.rdkafka import _rd_kafka # noqa diff --git a/tests/pykafka/rdkafka/test_ssl.py b/tests/pykafka_tests/rdkafka/test_ssl.py similarity index 88% rename from tests/pykafka/rdkafka/test_ssl.py rename to tests/pykafka_tests/rdkafka/test_ssl.py index 9e501d7f1..3145b0aff 100644 --- a/tests/pykafka/rdkafka/test_ssl.py +++ b/tests/pykafka_tests/rdkafka/test_ssl.py @@ -2,7 +2,7 @@ import pytest -from tests.pykafka import test_ssl +from tests.pykafka_tests import test_ssl @pytest.mark.skipif(platform.python_implementation() == "PyPy", diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka_tests/test_balancedconsumer.py similarity index 100% rename from tests/pykafka/test_balancedconsumer.py rename to tests/pykafka_tests/test_balancedconsumer.py diff --git a/tests/pykafka/test_cluster.py b/tests/pykafka_tests/test_cluster.py similarity index 100% rename from tests/pykafka/test_cluster.py rename to tests/pykafka_tests/test_cluster.py diff --git a/tests/pykafka/test_connection.py b/tests/pykafka_tests/test_connection.py similarity index 100% rename from tests/pykafka/test_connection.py rename to tests/pykafka_tests/test_connection.py diff --git a/tests/pykafka/test_partition.py b/tests/pykafka_tests/test_partition.py similarity index 100% rename from tests/pykafka/test_partition.py rename to tests/pykafka_tests/test_partition.py diff --git a/tests/pykafka/test_partitioners.py b/tests/pykafka_tests/test_partitioners.py similarity index 100% rename from tests/pykafka/test_partitioners.py rename to tests/pykafka_tests/test_partitioners.py diff --git a/tests/pykafka/test_producer.py b/tests/pykafka_tests/test_producer.py similarity index 100% rename from tests/pykafka/test_producer.py rename to tests/pykafka_tests/test_producer.py diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka_tests/test_protocol.py similarity index 100% rename from tests/pykafka/test_protocol.py rename to tests/pykafka_tests/test_protocol.py diff --git a/tests/pykafka_tests/test_sasl.py b/tests/pykafka_tests/test_sasl.py new file mode 100644 index 000000000..f0266cd46 --- /dev/null +++ b/tests/pykafka_tests/test_sasl.py @@ -0,0 +1,26 @@ +from uuid import uuid4 + +from pykafka import KafkaClient + + +def test_sasl_roundtrip(sasl_kafka, authenticator, kafka_version): + """Test producing then consuming + + This is mostly important to test the pykafka.rdkafka classes, which + should be passed SASL settings during producer/consumer init. + """ + client = KafkaClient( + sasl_kafka.brokers_sasl, + sasl_authenticator=authenticator, + broker_version=".".join(str(v) for v in kafka_version), + ) + + topic_name = uuid4().hex.encode() + payload = uuid4().hex.encode() + topic = client.topics[topic_name] + + producer = topic.get_producer(use_rdkafka=False, sync=True) + producer.produce(payload) + + consumer = topic.get_simple_consumer(use_rdkafka=False, consumer_timeout_ms=5000) + assert consumer.consume().value == payload diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka_tests/test_simpleconsumer.py similarity index 100% rename from tests/pykafka/test_simpleconsumer.py rename to tests/pykafka_tests/test_simpleconsumer.py diff --git a/tests/pykafka/test_ssl.py b/tests/pykafka_tests/test_ssl.py similarity index 100% rename from tests/pykafka/test_ssl.py rename to tests/pykafka_tests/test_ssl.py diff --git a/tests/pykafka/utils/__init__.py b/tests/pykafka_tests/utils/__init__.py similarity index 100% rename from tests/pykafka/utils/__init__.py rename to tests/pykafka_tests/utils/__init__.py diff --git a/tests/pykafka/utils/test_compression.py b/tests/pykafka_tests/utils/test_compression.py similarity index 100% rename from tests/pykafka/utils/test_compression.py rename to tests/pykafka_tests/utils/test_compression.py diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka_tests/utils/test_struct_helpers.py similarity index 100% rename from tests/pykafka/utils/test_struct_helpers.py rename to tests/pykafka_tests/utils/test_struct_helpers.py diff --git a/tox.ini b/tox.ini index 853c680e0..f2afe3a01 100644 --- a/tox.ini +++ b/tox.ini @@ -8,4 +8,4 @@ deps = gevent: gevent==1.3.6 commands = py.test {posargs} -passenv = BROKERS BROKERS_SSL ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS +passenv = BROKERS BROKERS_SSL BROKERS_SASL ZOOKEEPER KAFKA_BIN KAFKA_VERSION C_INCLUDE_PATH LIBRARY_PATH LD_LIBRARY_PATH CFLAGS HOSTS_FILE