Skip to content

Commit

Permalink
Fix send to controller
Browse files Browse the repository at this point in the history
The controller send error handling was completely broken.
It also pinned the metadata version unnecessarily.

Additionally, several of the methods were sending to the controller
but either that was unnecessary, or just plain wrong. So updated
following the pattern of the Java Admin client.
  • Loading branch information
jeffwidman committed Nov 17, 2018
1 parent dd8d304 commit 240da06
Showing 1 changed file with 85 additions and 39 deletions.
124 changes: 85 additions & 39 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging
import socket
from kafka.client_async import KafkaClient, selectors
from kafka import errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
UnrecognizedBrokerVersion)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
Expand Down Expand Up @@ -240,17 +241,22 @@ def _validate_timeout(self, timeout_ms):
return timeout_ms or self.config['request_timeout_ms']

def _refresh_controller_id(self):
"""Determine the kafka cluster controller
"""
response = self._send_request_to_node(
self._client.least_loaded_node(),
MetadataRequest[1]([])
)
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
"""Determine the kafka cluster controller."""
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
response = self._send_request_to_node(self._client.least_loaded_node(), request)
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
else:
raise UnrecognizedBrokerVersion(
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))

def _send_request_to_node(self, node, request):
Expand All @@ -271,22 +277,34 @@ def _send_request_to_node(self, node, request):
else:
raise future.exception # pylint: disable-msg=raising-bad-type

def _send(self, request):
"""Send a kafka protocol message to the cluster controller. Will block until the message result is received.
def _send_request_to_controller(self, request):
"""Send a kafka protocol message to the cluster controller.
Will block until the message result is received.
:param request: The message to send
:return The kafka protocol response for the message
:exception NodeNotReadyError: If the controller connection can't be established
:return: The kafka protocol response for the message
"""
remaining_tries = 2
while remaining_tries > 0:
remaining_tries = remaining_tries - 1
try:
return self._send_request_to_node(self._controller_id, request)
except (NotControllerError, KafkaConnectionError) as e:
# controller changed? refresh it
self._refresh_controller_id()
raise NodeNotReadyError(self._controller_id)
tries = 2 # in case our cached self._controller_id is outdated
while tries:
tries -= 1
response = self._send_request_to_node(self._controller_id, request)
# DeleteTopicsResponse returns topic_error_codes rather than topic_errors
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
error_type = Errors.for_code(error_code)
if tries and isinstance(error_type, NotControllerError):
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
break
elif error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
else:
return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")

@staticmethod
def _convert_new_topic_request(new_topic):
Expand Down Expand Up @@ -332,7 +350,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_controller(request)

def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster
Expand All @@ -352,19 +370,25 @@ def delete_topics(self, topics, timeout_ms=None):
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_controller(request)

# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the controller

# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe_acls protocol not implemented
# describe_acls protocol not yet implemented
# Note: send the request to the least_loaded_node()

# create_acls protocol not implemented
# create_acls protocol not yet implemented
# Note: send the request to the least_loaded_node()

# delete_acls protocol not implemented
# delete_acls protocol not yet implemented
# Note: send the request to the least_loaded_node()

@staticmethod
def _convert_describe_config_resource_request(config_resource):
Expand Down Expand Up @@ -404,7 +428,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_node(self._client.least_loaded_node(), request)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
Expand All @@ -419,6 +443,12 @@ def _convert_alter_config_resource_request(config_resource):
def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more kafka resources.
Warning:
This is currently broken for BROKER resources because those must be
sent to that specific broker, versus this always picks the
least-loaded node. See the comment in the source code for details.
We would happily accept a PR fixing this.
:param config_resources: An array of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class
"""
Expand All @@ -431,11 +461,19 @@ def alter_configs(self, config_resources):
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
# TODO the Java client has the note:
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
# // and send the request to that specific broker. Other resources are grouped together into
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
return self._send_request_to_node(self._client.least_loaded_node(), request)

# alter replica logs dir protocol not implemented
# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker

# describe log dirs protocol not implemented
# describe log dirs protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker

@staticmethod
def _convert_create_partitions_request(topic_name, new_partitions):
Expand Down Expand Up @@ -468,17 +506,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_controller(request)

# delete records protocol not implemented
# delete records protocol not yet implemented
# Note: send the request to the partition leaders

# create delegation token protocol not implemented
# Note: send the request to the least_loaded_node()

# renew delegation token protocol not implemented
# Note: send the request to the least_loaded_node()

# expire delegation_token protocol not implemented
# Note: send the request to the least_loaded_node()

# describe delegation_token protocol not implemented
# Note: send the request to the least_loaded_node()

def describe_consumer_groups(self, group_ids):
"""Describe a set of consumer groups.
Expand All @@ -495,7 +538,8 @@ def describe_consumer_groups(self, group_ids):
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

def list_consumer_groups(self):
"""List all consumer groups known to the cluster.
Expand All @@ -509,6 +553,8 @@ def list_consumer_groups(self):
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

# delete groups protocol not implemented
# Note: send the request to the group's coordinator.

0 comments on commit 240da06

Please sign in to comment.