Skip to content

Commit

Permalink
feat: rebalance listener added to Consumer constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Aug 19, 2022
1 parent fb52ec2 commit 38d896c
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 58 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ target/
# PyCharm
.idea

# VSCode
.vscode

# virtualenvs
venv/
.venv/

kafka_2*

tests/ssl_cert
Expand Down
1 change: 1 addition & 0 deletions CHANGES/842.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rebalance listener added to Consumer constructor
13 changes: 11 additions & 2 deletions aiokafka/abc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import abc

from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
import aiokafka
from aiokafka.structs import TopicPartition
from typing import List, Optional


class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
Expand Down Expand Up @@ -45,8 +49,13 @@ class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
called to load the state.
"""

def __init__(
self, consumer: Optional["aiokafka.consumer.AIOKafkaConsumer"] = None
) -> None:
self.consumer = consumer

@abc.abstractmethod
def on_partitions_revoked(self, revoked):
def on_partitions_revoked(self, revoked: List[TopicPartition]) -> None:
"""
A coroutine or function the user can implement to provide cleanup or
custom state save on the start of a rebalance operation.
Expand All @@ -67,7 +76,7 @@ def on_partitions_revoked(self, revoked):
pass

@abc.abstractmethod
def on_partitions_assigned(self, assigned):
def on_partitions_assigned(self, assigned: List[TopicPartition]) -> None:
"""
A coroutine or function the user can implement to provide load of
custom consumer state or cache warmup on completion of a successful
Expand Down
51 changes: 42 additions & 9 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
import sys
from typing import Iterable, Optional, Pattern
import traceback
import warnings

Expand Down Expand Up @@ -133,6 +134,27 @@ class AIOKafkaConsumer:
group member. If API methods block waiting for messages, that time
does not count against this timeout. See `KIP-62`_ for more
information. Default 300000
listener (ConsumerRebalanceListener): Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
rebalance_timeout_ms (int): The maximum time server will wait for this
consumer to rejoin the group in a case of rebalance. In Java client
this behaviour is bound to `max.poll.interval.ms` configuration,
Expand Down Expand Up @@ -242,6 +264,7 @@ def __init__(self, *topics, loop=None,
metadata_max_age_ms=5 * 60 * 1000,
partition_assignment_strategy=(RoundRobinPartitionAssignor,),
max_poll_interval_ms=300000,
listener: Optional[ConsumerRebalanceListener] = None,
rebalance_timeout_ms=None,
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
Expand Down Expand Up @@ -321,10 +344,14 @@ def __init__(self, *topics, loop=None,
self._source_traceback = traceback.extract_stack(sys._getframe(1))
self._closed = False

if listener is not None:
listener.consumer = self
self.listener = listener

if topics:
topics = self._validate_topics(topics)
self._client.set_topics(topics)
self._subscription.subscribe(topics=topics)
self.subscribe(topics)

def __del__(self, _warnings=warnings):
if self._closed is False:
Expand Down Expand Up @@ -1008,7 +1035,12 @@ async def end_offsets(self, partitions):
partitions, self._request_timeout_ms)
return offsets

def subscribe(self, topics=(), pattern=None, listener=None):
def subscribe(
self,
topics: Iterable[str] = (),
pattern: Optional[Pattern] = None,
listener: Optional[ConsumerRebalanceListener] = None
) -> None:
""" Subscribe to a list of topics, or a topic regex pattern.
Partitions will be dynamically assigned via a group coordinator.
Expand Down Expand Up @@ -1046,26 +1078,27 @@ def subscribe(self, topics=(), pattern=None, listener=None):
IllegalStateError: if called after previously calling :meth:`assign`
ValueError: if neither topics or pattern is provided or both
are provided
TypeError: if listener is not a :class:`.ConsumerRebalanceListener`
"""
if not (topics or pattern):
raise ValueError(
"You should provide either `topics` or `pattern`")
if topics and pattern:
raise ValueError(
"You can't provide both `topics` and `pattern`")
if listener is not None and \
not isinstance(listener, ConsumerRebalanceListener):
raise TypeError(
"listener should be an instance of ConsumerRebalanceListener")

# Override the self.listener as the user wants this
if listener is not None:
listener.consumer = self
self.listener = listener

if pattern is not None:
try:
pattern = re.compile(pattern)
except re.error as err:
raise ValueError(
f"{pattern!r} is not a valid pattern: {err}")
self._subscription.subscribe_pattern(
pattern=pattern, listener=listener)
pattern=pattern, listener=self.listener)
# NOTE: set_topics will trigger a rebalance, so the coordinator
# will get the initial subscription shortly by ``metadata_changed``
# handler.
Expand All @@ -1074,7 +1107,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
elif topics:
topics = self._validate_topics(topics)
self._subscription.subscribe(
topics=topics, listener=listener)
topics=topics, listener=self.listener)
self._client.set_topics(self._subscription.subscription.topics)
if self._group_id is None:
# We have reset the assignment, but client.set_topics will
Expand Down
39 changes: 32 additions & 7 deletions aiokafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import shield, Event, Future
from enum import Enum

from typing import Dict, FrozenSet, Iterable, List, Pattern, Set
from typing import Dict, FrozenSet, Iterable, List, Optional, Pattern, Set

from aiokafka.errors import IllegalStateError
from aiokafka.structs import OffsetAndMetadata, TopicPartition
Expand Down Expand Up @@ -136,24 +136,50 @@ def _notify_assignment_waiters(self):
waiter.set_result(None)
self._assignment_waiters.clear()

def _validate_rebalance_listener(
self,
listener: Optional[ConsumerRebalanceListener] = None
) -> None:
""" Validates a ConsumerRebalanceListener.
Arguments:
listener (ConsumerRebalanceListener): Optionally include listener
callback, which will be called before and after each rebalance
operation.
Raises:
TypeError: if listener is not a :class:`.ConsumerRebalanceListener`
"""
if listener is not None and \
not isinstance(listener, ConsumerRebalanceListener):
raise TypeError(
"listener should be an instance of ConsumerRebalanceListener")

# Consumer callable API:

def subscribe(self, topics: Set[str], listener=None):
def subscribe(
self,
topics: Set[str],
listener: Optional[ConsumerRebalanceListener] = None
) -> None:
""" Subscribe to a list (or tuple) of topics
Caller: Consumer.
Affects: SubscriptionState.subscription
"""
assert isinstance(topics, set)
assert (listener is None or
isinstance(listener, ConsumerRebalanceListener))

self._validate_rebalance_listener(listener=listener)
self._set_subscription_type(SubscriptionType.AUTO_TOPICS)

self._change_subscription(Subscription(topics, loop=self._loop))
self._listener = listener
self._notify_subscription_waiters()

def subscribe_pattern(self, pattern: Pattern, listener=None):
def subscribe_pattern(
self,
pattern: Pattern,
listener: Optional[ConsumerRebalanceListener] = None
) -> None:
""" Subscribe to all topics matching a regex pattern.
Subsequent calls `subscribe_from_pattern()` by Coordinator will provide
the actual subscription topics.
Expand All @@ -162,8 +188,7 @@ def subscribe_pattern(self, pattern: Pattern, listener=None):
Affects: SubscriptionState.subscribed_pattern
"""
assert hasattr(pattern, "match"), "Expected Pattern type"
assert (listener is None or
isinstance(listener, ConsumerRebalanceListener))
self._validate_rebalance_listener(listener=listener)
self._set_subscription_type(SubscriptionType.AUTO_PATTERN)

self._subscribed_pattern = pattern
Expand Down
Loading

0 comments on commit 38d896c

Please sign in to comment.