diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index 5ce863037..950ae3dc1 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -4,9 +4,10 @@ import logging import socket from kafka.client_async import KafkaClient, selectors +import kafka.errors as Errors from kafka.errors import ( - IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError, - NodeNotReadyError, NotControllerError) + IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, + UnrecognizedBrokerVersion) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, @@ -230,17 +231,22 @@ def _validate_timeout(self, timeout_ms): return timeout_ms or self.config['request_timeout_ms'] def _refresh_controller_id(self): - """Determine the kafka cluster controller - """ - response = self._send_request_to_node( - self._client.least_loaded_node(), - MetadataRequest[1]([]) - ) - self._controller_id = response.controller_id - version = self._client.check_version(self._controller_id) - if version < (0, 10, 0): - raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." + """Determine the kafka cluster controller.""" + version = self._matching_api_version(MetadataRequest) + if 1 <= version <= 6: + request = MetadataRequest[version]() + response = self._send_request_to_node(self._client.least_loaded_node(), request) + controller_id = response.controller_id + # verify the controller is new enough to support our requests + controller_version = self._client.check_version(controller_id) + if controller_version < (0, 10, 0): + raise IncompatibleBrokerVersion( + "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." + .format(controller_version)) + self._controller_id = controller_id + else: + raise UnrecognizedBrokerVersion( + "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." .format(version)) def _send_request_to_node(self, node, request): @@ -261,22 +267,34 @@ def _send_request_to_node(self, node, request): else: raise future.exception # pylint: disable-msg=raising-bad-type - def _send(self, request): - """Send a kafka protocol message to the cluster controller. Will block until the message result is received. + def _send_request_to_controller(self, request): + """Send a kafka protocol message to the cluster controller. + + Will block until the message result is received. :param request: The message to send - :return The kafka protocol response for the message - :exception NodeNotReadyError: If the controller connection can't be established + :return: The kafka protocol response for the message """ - remaining_tries = 2 - while remaining_tries > 0: - remaining_tries = remaining_tries - 1 - try: - return self._send_request_to_node(self._controller_id, request) - except (NotControllerError, KafkaConnectionError) as e: - # controller changed? refresh it - self._refresh_controller_id() - raise NodeNotReadyError(self._controller_id) + tries = 2 # in case our cached self._controller_id is outdated + while tries: + tries -= 1 + response = self._send_request_to_node(self._controller_id, request) + # DeleteTopicsResponse returns topic_error_codes rather than topic_errors + for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes): + error_type = Errors.for_code(error_code) + if tries and isinstance(error_type, NotControllerError): + # No need to inspect the rest of the errors for + # non-retriable errors because NotControllerError should + # either be thrown for all errors or no errors. + self._refresh_controller_id() + break + elif error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + else: + return response + raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") @staticmethod def _convert_new_topic_request(new_topic): @@ -322,7 +340,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None): raise NotImplementedError( "Support for CreateTopics v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_controller(request) def delete_topics(self, topics, timeout_ms=None): """Delete topics from the cluster @@ -342,19 +360,25 @@ def delete_topics(self, topics, timeout_ms=None): raise NotImplementedError( "Support for DeleteTopics v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_controller(request) # list topics functionality is in ClusterMetadata + # Note: if implemented here, send the request to the least_loaded_node() # describe topics functionality is in ClusterMetadata + # Note: if implemented here, send the request to the controller # describe cluster functionality is in ClusterMetadata + # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not implemented + # describe_acls protocol not yet implemented + # Note: send the request to the least_loaded_node() - # create_acls protocol not implemented + # create_acls protocol not yet implemented + # Note: send the request to the least_loaded_node() - # delete_acls protocol not implemented + # delete_acls protocol not yet implemented + # Note: send the request to the least_loaded_node() @staticmethod def _convert_describe_config_resource_request(config_resource): @@ -394,7 +418,7 @@ def describe_configs(self, config_resources, include_synonyms=None): raise NotImplementedError( "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_node(self._client.least_loaded_node(), request) @staticmethod def _convert_alter_config_resource_request(config_resource): @@ -409,6 +433,12 @@ def _convert_alter_config_resource_request(config_resource): def alter_configs(self, config_resources): """Alter configuration parameters of one or more kafka resources. + Warning: + This is currently broken for BROKER resources because those must be + sent to that specific broker, versus this always picks the + least-loaded node. See the comment in the source code for details. + We would happily accept a PR fixing this. + :param config_resources: An array of ConfigResource objects. :return: Appropriate version of AlterConfigsResponse class """ @@ -421,11 +451,19 @@ def alter_configs(self, config_resources): raise NotImplementedError( "Support for AlterConfigs v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + # TODO the Java client has the note: + # // We must make a separate AlterConfigs request for every BROKER resource we want to alter + # // and send the request to that specific broker. Other resources are grouped together into + # // a single request that may be sent to any broker. + # + # So this is currently broken as it always sends to the least_loaded_node() + return self._send_request_to_node(self._client.least_loaded_node(), request) - # alter replica logs dir protocol not implemented + # alter replica logs dir protocol not yet implemented + # Note: have to lookup the broker with the replica assignment and send the request to that broker - # describe log dirs protocol not implemented + # describe log dirs protocol not yet implemented + # Note: have to lookup the broker with the replica assignment and send the request to that broker @staticmethod def _convert_create_partitions_request(topic_name, new_partitions): @@ -458,17 +496,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non raise NotImplementedError( "Support for CreatePartitions v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_controller(request) - # delete records protocol not implemented + # delete records protocol not yet implemented + # Note: send the request to the partition leaders - # create delegation token protocol not implemented + # create delegation token protocol not yet implemented + # Note: send the request to the least_loaded_node() - # renew delegation token protocol not implemented + # renew delegation token protocol not yet implemented + # Note: send the request to the least_loaded_node() - # expire delegation_token protocol not implemented + # expire delegation_token protocol not yet implemented + # Note: send the request to the least_loaded_node() - # describe delegation_token protocol not implemented + # describe delegation_token protocol not yet implemented + # Note: send the request to the least_loaded_node() def describe_consumer_groups(self, group_ids): """Describe a set of consumer groups. @@ -485,7 +528,8 @@ def describe_consumer_groups(self, group_ids): raise NotImplementedError( "Support for DescribeGroups v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + # TODO this is completely broken, as it needs to send to the group coordinator + # return self._send(request) def list_consumer_groups(self): """List all consumer groups known to the cluster. @@ -499,6 +543,8 @@ def list_consumer_groups(self): raise NotImplementedError( "Support for ListGroups v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + # TODO this is completely broken, as it needs to send to the group coordinator + # return self._send(request) - # delete groups protocol not implemented + # delete groups protocol not yet implemented + # Note: send the request to the group's coordinator.