Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KafkaAdmin send to controller #1640

Merged
merged 1 commit into from
Nov 18, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 90 additions & 44 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging
import socket
from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
UnrecognizedBrokerVersion)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
Expand Down Expand Up @@ -230,17 +231,22 @@ def _validate_timeout(self, timeout_ms):
return timeout_ms or self.config['request_timeout_ms']

def _refresh_controller_id(self):
"""Determine the kafka cluster controller
"""
response = self._send_request_to_node(
self._client.least_loaded_node(),
MetadataRequest[1]([])
)
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
"""Determine the kafka cluster controller."""
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
response = self._send_request_to_node(self._client.least_loaded_node(), request)
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
else:
raise UnrecognizedBrokerVersion(
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))

def _send_request_to_node(self, node, request):
Expand All @@ -261,22 +267,34 @@ def _send_request_to_node(self, node, request):
else:
raise future.exception # pylint: disable-msg=raising-bad-type

def _send(self, request):
"""Send a kafka protocol message to the cluster controller. Will block until the message result is received.
def _send_request_to_controller(self, request):
"""Send a kafka protocol message to the cluster controller.

Will block until the message result is received.

:param request: The message to send
:return The kafka protocol response for the message
:exception NodeNotReadyError: If the controller connection can't be established
:return: The kafka protocol response for the message
"""
remaining_tries = 2
while remaining_tries > 0:
remaining_tries = remaining_tries - 1
try:
return self._send_request_to_node(self._controller_id, request)
except (NotControllerError, KafkaConnectionError) as e:
# controller changed? refresh it
self._refresh_controller_id()
raise NodeNotReadyError(self._controller_id)
tries = 2 # in case our cached self._controller_id is outdated
while tries:
tries -= 1
response = self._send_request_to_node(self._controller_id, request)
# DeleteTopicsResponse returns topic_error_codes rather than topic_errors
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
error_type = Errors.for_code(error_code)
if tries and isinstance(error_type, NotControllerError):
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
break
elif error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
else:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice use of for / else !

return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")

@staticmethod
def _convert_new_topic_request(new_topic):
Expand Down Expand Up @@ -322,7 +340,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_controller(request)

def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster
Expand All @@ -342,19 +360,25 @@ def delete_topics(self, topics, timeout_ms=None):
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_controller(request)

# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the controller

# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()

# describe_acls protocol not implemented
# describe_acls protocol not yet implemented
# Note: send the request to the least_loaded_node()

# create_acls protocol not implemented
# create_acls protocol not yet implemented
# Note: send the request to the least_loaded_node()

# delete_acls protocol not implemented
# delete_acls protocol not yet implemented
# Note: send the request to the least_loaded_node()

@staticmethod
def _convert_describe_config_resource_request(config_resource):
Expand Down Expand Up @@ -394,7 +418,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_node(self._client.least_loaded_node(), request)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
Expand All @@ -409,6 +433,12 @@ def _convert_alter_config_resource_request(config_resource):
def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more kafka resources.

Warning:
This is currently broken for BROKER resources because those must be
sent to that specific broker, versus this always picks the
least-loaded node. See the comment in the source code for details.
We would happily accept a PR fixing this.

:param config_resources: An array of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class
"""
Expand All @@ -421,11 +451,19 @@ def alter_configs(self, config_resources):
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
# TODO the Java client has the note:
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
# // and send the request to that specific broker. Other resources are grouped together into
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
return self._send_request_to_node(self._client.least_loaded_node(), request)

# alter replica logs dir protocol not implemented
# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker

# describe log dirs protocol not implemented
# describe log dirs protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker

@staticmethod
def _convert_create_partitions_request(topic_name, new_partitions):
Expand Down Expand Up @@ -458,17 +496,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
return self._send_request_to_controller(request)

# delete records protocol not implemented
# delete records protocol not yet implemented
# Note: send the request to the partition leaders

# create delegation token protocol not implemented
# create delegation token protocol not yet implemented
# Note: send the request to the least_loaded_node()

# renew delegation token protocol not implemented
# renew delegation token protocol not yet implemented
# Note: send the request to the least_loaded_node()

# expire delegation_token protocol not implemented
# expire delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()

# describe delegation_token protocol not implemented
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()

def describe_consumer_groups(self, group_ids):
"""Describe a set of consumer groups.
Expand All @@ -485,7 +528,8 @@ def describe_consumer_groups(self, group_ids):
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

def list_consumer_groups(self):
"""List all consumer groups known to the cluster.
Expand All @@ -499,6 +543,8 @@ def list_consumer_groups(self):
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send(request)
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

# delete groups protocol not implemented
# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.