Skip to content

Commit

Permalink
feat: rebalance listener added to Consumer constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Jul 6, 2022
1 parent fb52ec2 commit 704815f
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 17 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ target/
# PyCharm
.idea

# VSCode
.vscode

# virtualenvs
venv/
.venv/

kafka_2*

tests/ssl_cert
Expand Down
1 change: 1 addition & 0 deletions CHANGES/842.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rebalance listener added to Consumer constructor
5 changes: 5 additions & 0 deletions aiokafka/abc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import abc
import typing

from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener

RebalanceListenerCT = typing.TypeVar(
"RebalanceListenerCT", bound="ConsumerRebalanceListener")


class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
"""
Expand Down
42 changes: 33 additions & 9 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import logging
import re
import sys
from typing import Iterable, Optional, Pattern
import traceback
import warnings

from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor

from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.abc import RebalanceListenerCT
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import (
TopicAuthorizationFailedError, OffsetOutOfRangeError,
Expand Down Expand Up @@ -133,6 +134,27 @@ class AIOKafkaConsumer:
group member. If API methods block waiting for messages, that time
does not count against this timeout. See `KIP-62`_ for more
information. Default 300000
listener (RebalanceListenerCT): Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
rebalance_timeout_ms (int): The maximum time server will wait for this
consumer to rejoin the group in a case of rebalance. In Java client
this behaviour is bound to `max.poll.interval.ms` configuration,
Expand Down Expand Up @@ -242,6 +264,7 @@ def __init__(self, *topics, loop=None,
metadata_max_age_ms=5 * 60 * 1000,
partition_assignment_strategy=(RoundRobinPartitionAssignor,),
max_poll_interval_ms=300000,
listener: Optional[RebalanceListenerCT] = None,
rebalance_timeout_ms=None,
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
Expand Down Expand Up @@ -324,7 +347,7 @@ def __init__(self, *topics, loop=None,
if topics:
topics = self._validate_topics(topics)
self._client.set_topics(topics)
self._subscription.subscribe(topics=topics)
self._subscription.subscribe(topics=topics, listener=listener)

def __del__(self, _warnings=warnings):
if self._closed is False:
Expand Down Expand Up @@ -1008,7 +1031,12 @@ async def end_offsets(self, partitions):
partitions, self._request_timeout_ms)
return offsets

def subscribe(self, topics=(), pattern=None, listener=None):
def subscribe(
self,
topics: Iterable[str] = (),
pattern: Optional[Pattern] = None,
listener: Optional[RebalanceListenerCT] = None
) -> None:
""" Subscribe to a list of topics, or a topic regex pattern.
Partitions will be dynamically assigned via a group coordinator.
Expand All @@ -1021,7 +1049,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
topics (list): List of topics for subscription.
pattern (str): Pattern to match available topics. You must provide
either topics or pattern, but not both.
listener (ConsumerRebalanceListener): Optionally include listener
listener (RebalanceListenerCT): Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
Expand All @@ -1046,18 +1074,14 @@ def subscribe(self, topics=(), pattern=None, listener=None):
IllegalStateError: if called after previously calling :meth:`assign`
ValueError: if neither topics or pattern is provided or both
are provided
TypeError: if listener is not a :class:`.ConsumerRebalanceListener`
"""
if not (topics or pattern):
raise ValueError(
"You should provide either `topics` or `pattern`")
if topics and pattern:
raise ValueError(
"You can't provide both `topics` and `pattern`")
if listener is not None and \
not isinstance(listener, ConsumerRebalanceListener):
raise TypeError(
"listener should be an instance of ConsumerRebalanceListener")

if pattern is not None:
try:
pattern = re.compile(pattern)
Expand Down
41 changes: 33 additions & 8 deletions aiokafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from asyncio import shield, Event, Future
from enum import Enum

from typing import Dict, FrozenSet, Iterable, List, Pattern, Set
from typing import Dict, FrozenSet, Iterable, List, Optional, Pattern, Set

from aiokafka.errors import IllegalStateError
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.abc import ConsumerRebalanceListener, RebalanceListenerCT
from aiokafka.util import create_future, get_running_loop

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -136,24 +136,50 @@ def _notify_assignment_waiters(self):
waiter.set_result(None)
self._assignment_waiters.clear()

def _validate_rebalance_listener(
self,
listener: Optional[RebalanceListenerCT] = None
) -> None:
""" Validates a ConsumerRebalanceListener.
Arguments:
listener (RebalanceListenerCT): Optionally include listener
callback, which will be called before and after each rebalance
operation.
Raises:
TypeError: if listener is not a :class:`.ConsumerRebalanceListener`
"""
if listener is not None and \
not isinstance(listener, ConsumerRebalanceListener):
raise TypeError(
"listener should be an instance of ConsumerRebalanceListener")

# Consumer callable API:

def subscribe(self, topics: Set[str], listener=None):
def subscribe(
self,
topics: Set[str],
listener: Optional[RebalanceListenerCT] = None
) -> None:
""" Subscribe to a list (or tuple) of topics
Caller: Consumer.
Affects: SubscriptionState.subscription
"""
assert isinstance(topics, set)
assert (listener is None or
isinstance(listener, ConsumerRebalanceListener))

self._validate_rebalance_listener(listener=listener)
self._set_subscription_type(SubscriptionType.AUTO_TOPICS)

self._change_subscription(Subscription(topics, loop=self._loop))
self._listener = listener
self._notify_subscription_waiters()

def subscribe_pattern(self, pattern: Pattern, listener=None):
def subscribe_pattern(
self,
pattern: Pattern,
listener: Optional[RebalanceListenerCT] = None
) -> None:
""" Subscribe to all topics matching a regex pattern.
Subsequent calls `subscribe_from_pattern()` by Coordinator will provide
the actual subscription topics.
Expand All @@ -162,8 +188,7 @@ def subscribe_pattern(self, pattern: Pattern, listener=None):
Affects: SubscriptionState.subscribed_pattern
"""
assert hasattr(pattern, "match"), "Expected Pattern type"
assert (listener is None or
isinstance(listener, ConsumerRebalanceListener))
self._validate_rebalance_listener(listener=listener)
self._set_subscription_type(SubscriptionType.AUTO_PATTERN)

self._subscribed_pattern = pattern
Expand Down
5 changes: 5 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ def get_version(release):
'sphinx.ext.intersphinx',
'sphinx.ext.viewcode',
'sphinx.ext.napoleon',
'sphinx_autodoc_typehints',
'alabaster',
]


# Napoleon settings
napoleon_use_param = False

try:
import sphinxcontrib.spelling # noqa
extensions.append('sphinxcontrib.spelling')
Expand Down
1 change: 1 addition & 0 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
Sphinx==4.3.0
sphinxcontrib-asyncio==0.3.0
sphinxcontrib-spelling==7.2.1
sphinx-autodoc-typehints
alabaster==0.7.12
-e .

0 comments on commit 704815f

Please sign in to comment.