Skip to content

Commit

Permalink
Stop using broker-errors for client-side problems
Browse files Browse the repository at this point in the history
`UnsupportedVersionError` is intended to indicate a server-side error:
https://github.com/dpkp/kafka-python/blob/ba7372e44ffa1ee49fb4d5efbd67534393e944db/kafka/errors.py#L375-L378

So we should not be raising it for client-side errors. I realize that
semantically this seems like the appropriate error to raise. However,
this is confusing when debugging... for a real-life example, see
Parsely/pykafka#697. So I strongly feel that
server-side errors should be kept separate from client-side errors,
even if all the client is doing is proactively protecting against
hitting a situation where the broker would return this error.
  • Loading branch information
jeffwidman committed Nov 18, 2018
1 parent 7bd6b5d commit f3105a4
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 38 deletions.
76 changes: 39 additions & 37 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import socket
from kafka.client_async import KafkaClient, selectors
from kafka.errors import (
KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError)
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
Expand All @@ -25,9 +26,11 @@ class KafkaAdmin(object):
nicer, more pythonic objects. Unfortunately, this will likely break
those interfaces.
The KafkaAdmin class will negotiate for the latest version of each message protocol format supported
by both the kafka-python client library and the kafka broker. Usage of optional fields from protocol
versions that are not supported by the broker will result in UnsupportedVersionError exceptions.
The KafkaAdmin class will negotiate for the latest version of each message
protocol format supported by both the kafka-python client library and the
kafka broker. Usage of optional fields from protocol versions that are not
supported by the broker will result in IncompatibleBrokerVersion exceptions.
Use of this class requires a minimum broker version >= 0.10.0.0.
Expand Down Expand Up @@ -223,8 +226,8 @@ def _matching_api_version(self, operation):
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
# max library version is less than min broker version. Not sure any brokers
# actually set a min version greater than 0 right now, tho. But maybe in the future?
raise UnsupportedVersionError(
"Could not find matching protocol version for {}"
raise IncompatibleBrokerVersion(
"No version of the '{}' kafka protocol is supported by both the client and broker."
.format(operation.__name__))
return version

Expand All @@ -246,9 +249,9 @@ def _refresh_controller_id(self):
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
raise UnsupportedVersionError(
"Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
.format(version))
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(version))

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
Expand Down Expand Up @@ -311,9 +314,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if validate_only:
raise UnsupportedVersionError(
"validate_only not supported on cluster version {}"
.format(self.config['api_version']))
raise IncompatibleBrokerVersion(
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = CreateTopicsRequest[version](
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout = timeout_ms
Expand All @@ -326,10 +329,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
validate_only = validate_only
)
else:
raise UnsupportedVersionError(
"missing implementation of CreateTopics for library supported version {}"
.format(version)
)
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

def delete_topics(self, topics, timeout_ms=None):
Expand All @@ -347,9 +349,9 @@ def delete_topics(self, topics, timeout_ms=None):
timeout = timeout_ms
)
else:
raise UnsupportedVersionError(
"missing implementation of DeleteTopics for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# list topics functionality is in ClusterMetadata
Expand Down Expand Up @@ -386,9 +388,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise UnsupportedVersionError(
"include_synonyms not supported on cluster version {}"
.format(self.config['api_version']))
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = DescribeConfigsRequest[version](
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
)
Expand All @@ -399,9 +401,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
include_synonyms = include_synonyms
)
else:
raise UnsupportedVersionError(
"missing implementation of DescribeConfigs for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

@staticmethod
Expand All @@ -426,9 +428,9 @@ def alter_configs(self, config_resources):
resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
else:
raise UnsupportedVersionError(
"missing implementation of AlterConfigs for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# alter replica logs dir protocol not implemented
Expand Down Expand Up @@ -463,9 +465,9 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
validate_only = validate_only
)
else:
raise UnsupportedVersionError(
"missing implementation of CreatePartitions for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# delete records protocol not implemented
Expand All @@ -490,9 +492,9 @@ def describe_consumer_groups(self, group_ids):
groups = group_ids
)
else:
raise UnsupportedVersionError(
"missing implementation of DescribeGroups for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

def list_consumer_groups(self):
Expand All @@ -504,9 +506,9 @@ def list_consumer_groups(self):
if version <= 1:
request = ListGroupsRequest[version]()
else:
raise UnsupportedVersionError(
"missing implementation of ListGroups for library supported version {}"
.format(version))
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)

# delete groups protocol not implemented
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ def get_api_versions(self):
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
return self._api_versions;
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
Expand Down
4 changes: 4 additions & 0 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class UnrecognizedBrokerVersion(KafkaError):
pass


class IncompatibleBrokerVersion(KafkaError):
pass


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
super(CommitFailedError, self).__init__(
Expand Down

0 comments on commit f3105a4

Please sign in to comment.