Skip to content

Commit

Permalink
Stop using broker-errors for client-side problems
Browse files Browse the repository at this point in the history
`UnsupportedVersionError` is intended to indicate a server-side error:
https://github.com/dpkp/kafka-python/blob/ba7372e44ffa1ee49fb4d5efbd67534393e944db/kafka/errors.py#L375-L378

So we should not be raising it for client-side errors. I realize that
semantically this seems like the appropriate error to raise. However,
this is confusing when debugging... for a real-life example, see
Parsely/pykafka#697. So I strongly feel that
server-side errors should be kept separate from client-side errors,
even if all the client is doing is proactively protecting against
hitting a situation where the broker would return this error.
  • Loading branch information
jeffwidman committed Nov 17, 2018
1 parent 21d68c9 commit 6c11911
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 34 deletions.
66 changes: 33 additions & 33 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import socket
from kafka.client_async import KafkaClient, selectors
from kafka.errors import (
KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError)
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
Expand Down Expand Up @@ -223,8 +224,8 @@ def _matching_api_version(self, operation):
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
# max library version is less than min broker version. Not sure any brokers
# actually set a min version greater than 0 right now, tho. But maybe in the future?
raise UnsupportedVersionError(
"Could not find matching protocol version for {}"
raise IncompatibleBrokerVersion(
"No version of the '{}' kafka protocol is supported by both the client and broker."
.format(operation.__name__))
return version

Expand All @@ -246,9 +247,9 @@ def _refresh_controller_id(self):
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
raise UnsupportedVersionError(
"Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
.format(version))
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(version))

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
Expand Down Expand Up @@ -311,9 +312,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if validate_only:
raise UnsupportedVersionError(
"validate_only not supported on cluster version {}"
.format(self.config['api_version']))
raise IncompatibleBrokerVersion(
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = CreateTopicsRequest[version](
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout = timeout_ms
Expand All @@ -326,10 +327,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
validate_only = validate_only
)
else:
raise UnsupportedVersionError(
"missing implementation of CreateTopics for library supported version {}"
.format(version)
)
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

def delete_topics(self, topics, timeout_ms=None):
Expand All @@ -347,9 +347,9 @@ def delete_topics(self, topics, timeout_ms=None):
timeout = timeout_ms
)
else:
raise UnsupportedVersionError(
"missing implementation of DeleteTopics for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# list topics functionality is in ClusterMetadata
Expand Down Expand Up @@ -386,8 +386,8 @@ def describe_configs(self, config_resources, include_synonyms=None):
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise UnsupportedVersionError(
"include_synonyms not supported on cluster version {}"
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = DescribeConfigsRequest[version](
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
Expand All @@ -399,9 +399,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
include_synonyms = include_synonyms
)
else:
raise UnsupportedVersionError(
"missing implementation of DescribeConfigs for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

@staticmethod
Expand All @@ -426,9 +426,9 @@ def alter_configs(self, config_resources):
resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
else:
raise UnsupportedVersionError(
"missing implementation of AlterConfigs for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# alter replica logs dir protocol not implemented
Expand Down Expand Up @@ -463,9 +463,9 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
validate_only = validate_only
)
else:
raise UnsupportedVersionError(
"missing implementation of CreatePartitions for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# delete records protocol not implemented
Expand All @@ -490,9 +490,9 @@ def describe_consumer_groups(self, group_ids):
groups = group_ids
)
else:
raise UnsupportedVersionError(
"missing implementation of DescribeGroups for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

def list_consumer_groups(self):
Expand All @@ -504,9 +504,9 @@ def list_consumer_groups(self):
if version <= 1:
request = ListGroupsRequest[version]()
else:
raise UnsupportedVersionError(
"missing implementation of ListGroups for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# delete groups protocol not implemented
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ def get_api_versions(self):
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
return self._api_versions;
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
Expand Down
4 changes: 4 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class UnrecognizedBrokerVersion(KafkaError):
pass


class IncompatibleBrokerVersion(KafkaError):
pass


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
super(CommitFailedError, self).__init__(
Expand Down

0 comments on commit 6c11911

Please sign in to comment.