-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This was completely broken previously because it didn't lookup the group coordinator of the consumer group. Also added basic error handling/raising. Note: I added the `group_coordinator_id` as an optional kwarg. As best I can tell, the Java client doesn't include this and instead looks it up every time. However, if we add this, it allows the caller the flexibility to bypass the network round trip of the lookup if for some reason they already know the `group_coordinator_id`.
- Loading branch information
1 parent
665f1e4
commit 8dab14b
Showing
1 changed file
with
50 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -557,23 +557,60 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal | |
# describe delegation_token protocol not yet implemented | ||
# Note: send the request to the least_loaded_node() | ||
|
||
def describe_consumer_groups(self, group_ids): | ||
def describe_consumer_groups(self, group_ids, group_coordinator_id=None): | ||
"""Describe a set of consumer groups. | ||
:param group_ids: A list of consumer group id names | ||
:return: Appropriate version of DescribeGroupsResponse class | ||
Any errors are immediately raised. | ||
:param group_ids: A list of consumer group IDs. These are typically the | ||
group names as strings. | ||
:param group_coordinator_id: The node_id of the groups' coordinator | ||
broker. If set to None, it will query the cluster for each group to | ||
find that group's coordinator. Explicitly specifying this can be | ||
useful for avoiding extra network round trips if you already know | ||
the group coordinator. This is only useful when all the group_ids | ||
have the same coordinator, otherwise it will error. Default: None. | ||
:return: A list of group descriptions. For now the group descriptions | ||
are the raw results from the DescribeGroupsResponse. Long-term, we | ||
plan to change this to return namedtuples as well as decoding the | ||
partition assignments. | ||
""" | ||
group_descriptions = [] | ||
version = self._matching_api_version(DescribeGroupsRequest) | ||
if version <= 1: | ||
request = DescribeGroupsRequest[version]( | ||
groups = group_ids | ||
) | ||
else: | ||
raise NotImplementedError( | ||
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin." | ||
.format(version)) | ||
# TODO this is completely broken, as it needs to send to the group coordinator | ||
# return self._send(request) | ||
for group_id in group_ids: | ||
if group_coordinator_id is None: | ||
this_groups_coordinator_id = self._find_group_coordinator_id(group_id) | ||
if version <= 1: | ||
# Note: KAFKA-6788 A potential optimization is to group the | ||
# request per coordinator and send one request with a list of | ||
# all consumer groups. Java still hasn't implemented this | ||
# because the error checking is hard to get right when some | ||
# groups error and others don't. | ||
request = DescribeGroupsRequest[version](groups=(group_id,)) | ||
response = self._send_request_to_node(this_groups_coordinator_id, request) | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
jeffwidman
Author
Collaborator
|
||
assert len(response.groups) == 1 | ||
# TODO need to implement converting the response tuple into | ||
# a more accessible interface like a namedtuple and then stop | ||
# hardcoding tuple indices here. Several Java examples, | ||
# including KafkaAdminClient.java | ||
group_description = response.groups[0] | ||
error_code = group_description[0] | ||
error_type = Errors.for_code(error_code) | ||
# Java has the note: KAFKA-6789, we can retry based on the error code | ||
if error_type is not Errors.NoError: | ||
raise error_type( | ||
"Request '{}' failed with response '{}'." | ||
.format(request, response)) | ||
# TODO Java checks the group protocol type, and if consumer | ||
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes | ||
# the members' partition assignments... that hasn't yet been | ||
# implemented here so just return the raw struct results | ||
group_descriptions.append(group_description) | ||
else: | ||
raise NotImplementedError( | ||
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin." | ||
.format(version)) | ||
return group_descriptions | ||
|
||
def list_consumer_groups(self, broker_ids=None): | ||
"""List all consumer groups known to the cluster. | ||
|
@jeffwidman I might just be missing something here, but if the
group_coordinator_id
argument is set, it doesn't look likethis_groups_coordinator_id
will be set. Bug?Sorry for the drive-by comment, got stuck on this while reviewing commits for fixing my PR