-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stopping multiple consumers in a short window freezes on GroupCoordinator._heartbeat_routine() #773
Comments
I think it may be happening when stop() is called after a rebalance and that consumer might not be a part of the group anymore but I'm not sure. I'm trying to write a script that will recreate it reliably |
This code reproduces the issue. I've been able to make it happen on Python 3.8.10 and 3.9.6 but it does not occur for Python 3.8.5. This was running on a Ubuntu 20.04 Docker container. import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import json
import random
EVENTS_PER_SECOND = 25
async def producer_task():
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers="<bootstrap-servers>",
)
data = json.dumps(
{
"foobar": "test",
"barfoo": "testing",
"raboof": "1388383131",
"fraboo": "paaonsasj",
"39u13913u31": "931uf0ih91f3",
"9fh-131f3f13f": {"timestamp": "2021-08-22T22:58:18.654101Z"},
},
indent=2,
).encode("utf-8")
await producer.start()
count = 0
# The more events produced, the more likely we'll hang
while True:
await producer.send_and_wait("<topic>", data)
await asyncio.sleep(1.0 / EVENTS_PER_SECOND)
count += 1
def display_task_statuses(ltasks):
d = {"pending": 0, "stopped": 0}
for _, task in ltasks:
if not task.done():
d["pending"] += 1
else:
d["stopped"] += 1
print("CONSUMER TASKS STATUSES", d)
async def consumer_task(consumer: AIOKafkaConsumer, all_consumers: list):
count = 0
while True:
try:
msg = await consumer.getone()
# print(f"{consumer._group_id} Got message #{msg.offset}")
await consumer.commit()
display_task_statuses(all_consumers)
count += 1
except asyncio.CancelledError:
raise
async def start_and_stop_consumers():
consumers = []
loop = asyncio.get_running_loop()
loop.create_task(producer_task())
# allow for some events to be produced
await asyncio.sleep(5.0)
# the more the consumers the more likely consumer.stop() will hang
for i in range(200):
consumer = AIOKafkaConsumer(
loop=loop,
bootstrap_servers="<bootstrap-servers>",
group_id=f"our-consumer-group-{i}",
enable_auto_commit=False,
max_poll_interval_ms=2590000,
)
consumer.subscribe(["<topic>"])
await consumer.start()
await consumer.seek_to_end()
consumers.append(
(
consumer,
loop.create_task(
consumer_task(consumer, consumers), name=f"our-consumer-{i}-task"
),
)
)
print(f"Started consumer {i}")
for consumer, task in consumers:
# We expect to stop() hang for a random consumer after a few minutes and hang
# indefinitely on a random task.
# Keep watch of pending vs stopped consumer tasks being printed, `stopped` will stop increasing.
await consumer.stop()
task.cancel()
print(f"Stopped consumer {consumer._group_id}")
# if everything works as in CPython 3.8.5 we should see the last printed
# line read "Stopped consumer our-consumer-group-199"
if __name__ == "__main__":
asyncio.run(start_and_stop_consumers()) |
This behaviour suggests that it could be related to the bug in |
Wow. This sounds consistent with the behaviour I've been seeing around other |
It's possible to monkey-patch |
Thanks. The biggest blocker is actually the AIOKafkaConsumer() stop call and having unreleased resources left over. I've mentioned that here python/cpython#26097 (comment). Thank you for your suggestion around monkey-patching! |
Describe the bug
I have several hundred consumers all connected to the same singular topic with autocommit disabled. Each consumer is part of a unique individual group. When I call
AIOKafkaConsumer.stop()
on each of the consumers within a for loop the task doing the stopping eventually halts completely. When I probe into the task I find that it is stuck waiting onGroupCoordinator._heartbeat_routine()
and it remains this way indefinitely.Expected behaviour
GroupCoordinator._heartbeat_routine() should not prevent
AIOKafkaConsumer.stop()
from completing.Environment (please complete the following information):
python -c "import aiokafka; print(aiokafka.__version__)"
): 0.7.1python -c "import kafka; print(kafka.__version__)"
): 2.0.2kafka-topics.sh --version
): 2.6.0Reproducible example
The text was updated successfully, but these errors were encountered: