Skip to content

Commit

Permalink
Add group coordinator lookup
Browse files Browse the repository at this point in the history
We need a way to send a request to the group coordinator.

I spent a day and a half trying to implement a `_send_request_to_group_coordinator()`
that included:
1. caching the value of the group coordinator so that it wouldn't have
to be repeatedly looked up on every call. This is particularly important
because the `list_consumer_groups()`, `list_consumer_group_offsets()`,
and `describe_consumer_groups()` will frequently be used by monitoring
scripts. I know across the production clusters that I support, using a
cached value will save ~1M calls per day.
2. clean and consistent error handling. This is difficult because the
responses are inconsistent about error codes. Some have a top-level
error code, some bury it within the description of the actual item.
3. Avoiding tight coupling between this method and the request/response
classes... the custom parsing logic for errors etc, given that it's
non-standard, should live in the callers, not here.

So finally I gave up and just went with this simpler solution and made
it so the callers can optionally bypass this if they somehow already
know the group coordinator.
  • Loading branch information
jeffwidman committed Nov 18, 2018
1 parent e596b26 commit a0ed7ac
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.version import __version__

Expand Down Expand Up @@ -259,6 +260,44 @@ def _refresh_controller_id(self):
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))

def _find_group_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator of the given group.
Sends a FindCoordinatorRequest message to the cluster. Will block until
the FindCoordinatorResponse is received. Any errors are immediately
raised.
:param group_id: The consumer group ID. This is typically the group
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
#
# TODO add support for dynamically picking version of
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
gc_request = GroupCoordinatorRequest[0](group_id)
gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
# use the extra error checking in add_group_coordinator() rather than
# immediately returning the group coordinator.
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
if not success:
error_type = Errors.for_code(gc_response.error_code)
assert error_type is not Errors.NoError
# Note: When error_type.retriable, Java will retry... see
# KafkaAdminClient's handleFindCoordinatorError method
raise error_type(
"Could not identify group coordinator for group_id '{}' from response '{}'."
.format(group_id, gc_response))
group_coordinator = self._client.cluster.coordinator_for_group(group_id)
# will be None if the coordinator was never populated, which should never happen here
assert group_coordinator is not None
# will be -1 if add_group_coordinator() failed... but by this point the
# error should have been raised.
assert group_coordinator != -1
return group_coordinator

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
Expand Down

0 comments on commit a0ed7ac

Please sign in to comment.