Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Producer waitlist for pending send() items #814

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/528.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Change internal structure used for waiter in Producer.send() call. Now there should
be no performance degrade when a large backlog of messages is pending (issue #528)
211 changes: 124 additions & 87 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import collections
import copy
import time
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple

import async_timeout
from aiokafka.errors import (KafkaTimeoutError,
NotLeaderForPartitionError,
LeaderNotAvailableError,
Expand Down Expand Up @@ -111,10 +114,8 @@ def __init__(self, tp, builder, ttl):

# Waiters
# Set when messages are delivered to Kafka based on ACK setting
self.future = create_future()
self.deliver_future = create_future()
self._msg_futures = []
# Set when sender takes this batch
self._drain_waiter = create_future()
self._retry_count = 0

@property
Expand Down Expand Up @@ -155,8 +156,8 @@ def done(self, base_offset, timestamp=None, log_start_offset=None,
timestamp_type = 1

# Set main batch future
if not self.future.done():
self.future.set_result(_record_metadata_class(
if not self.deliver_future.done():
self.deliver_future.set_result(_record_metadata_class(
topic, partition, tp, base_offset, timestamp, timestamp_type,
log_start_offset))

Expand All @@ -176,16 +177,16 @@ def done(self, base_offset, timestamp=None, log_start_offset=None,
def done_noack(self):
""" Resolve all pending futures to None """
# Faster resolve for base_offset=None case.
if not self.future.done():
self.future.set_result(None)
if not self.deliver_future.done():
self.deliver_future.set_result(None)
for future, _ in self._msg_futures:
if future.done():
continue
future.set_result(None)

def failure(self, exception):
if not self.future.done():
self.future.set_exception(exception)
if not self.deliver_future.done():
self.deliver_future.set_exception(exception)
for future, _ in self._msg_futures:
if future.done():
continue
Expand All @@ -196,37 +197,13 @@ def failure(self, exception):
# Consume exception to avoid warnings. We delegate this consumption
# to user only in case of explicit batch API.
if self._msg_futures:
self.future.exception()

# In case where sender fails and closes batches all waiters have to be
# reset also.
if not self._drain_waiter.done():
self._drain_waiter.set_exception(exception)

async def wait_drain(self, timeout=None):
"""Wait until all message from this batch is processed"""
waiter = self._drain_waiter
await asyncio.wait([waiter], timeout=timeout)
if waiter.done():
waiter.result() # Check for exception
self.deliver_future.exception()

def expired(self):
"""Check that batch is expired or not"""
return (time.monotonic() - self._ctime) > self._ttl

def drain_ready(self):
"""Compress batch to be ready for send"""
if not self._drain_waiter.done():
self._drain_waiter.set_result(None)
self._retry_count += 1

def reset_drain(self):
"""Reset drain waiter, until we will do another retry"""
assert self._drain_waiter.done()
self._drain_waiter = create_future()

def set_producer_state(self, producer_id, producer_epoch, base_sequence):
assert not self._drain_waiter.done()
self._builder._set_producer_state(
producer_id, producer_epoch, base_sequence)

Expand All @@ -240,6 +217,24 @@ def is_empty(self):
def retry_count(self):
return self._retry_count

def inc_retry_count(self):
self._retry_count += 1


HeadersType = List[Tuple[str, Any]]


@dataclass
class WaitlistHandle():

# Waitlist items are either pending batches or pending messages
message_attrs: Optional[Tuple[Any, Any, int, HeadersType]]
batch_builder: Optional[BatchBuilder]

# Future exposed to Producer.send(). Is not shielded, so can be cancelled
# before resolving.
send_future: "asyncio.Future[asyncio.Future[RecordMetadata]]"


class MessageAccumulator:
"""Accumulator of messages batched by topic-partition
Expand All @@ -254,6 +249,7 @@ def __init__(
loop = get_running_loop()
self._loop = loop
self._batches = collections.defaultdict(collections.deque)
self._waitlist = collections.defaultdict(collections.deque)
self._pending_batches = set()
self._cluster = cluster
self._batch_size = batch_size
Expand All @@ -273,9 +269,9 @@ async def flush(self):
waiters = []
for batches in self._batches.values():
for batch in list(batches):
waiters.append(batch.future)
waiters.append(batch.deliver_future)
for batch in list(self._pending_batches):
waiters.append(batch.future)
waiters.append(batch.deliver_future)
if waiters:
await asyncio.wait(waiters)

Expand All @@ -286,9 +282,9 @@ async def flush_for_commit(self):
# We force all buffers to close to finalyze the transaction
# scope. We should not add anything to this transaction.
batch._builder.close()
waiters.append(batch.future)
waiters.append(batch.deliver_future)
for batch in self._pending_batches:
waiters.append(batch.future)
waiters.append(batch.deliver_future)
# Wait for all waiters to finish. We only wait for the scope we defined
# above, other batches should not be delivered as part of this
# transaction
Expand All @@ -309,38 +305,73 @@ async def close(self):
await self.flush()

async def add_message(
self, tp, key, value, timeout, timestamp_ms=None,
headers=[]
self, tp, key, value, timeout, timestamp_ms=None, headers=[]
):
""" Add message to batch by topic-partition
If batch is already full this method waits (`timeout` seconds maximum)
until batch is drained by send task
"""
while True:
if self._closed:
# this can happen when producer is closing but try to send some
# messages in async task
raise ProducerClosed()
if self._exception is not None:
raise copy.copy(self._exception)

pending_batches = self._batches.get(tp)
if not pending_batches:
builder = self.create_builder()
batch = self._append_batch(builder, tp)
else:
batch = pending_batches[-1]
self._check_errors()

future = batch.append(key, value, timestamp_ms, headers=headers)
if not self._waitlist.get(tp):
future = self._try_add_message(
tp, key, value, timestamp_ms, headers)
if future is not None:
return future
# Batch is full, can't append data atm,
# waiting until batch per topic-partition is drained
start = time.monotonic()
await batch.wait_drain(timeout)
timeout -= time.monotonic() - start
if timeout <= 0:
raise KafkaTimeoutError()

# Batch is full, can't append data atm, enqueue data to be sent
# after batch for this partition is drained.
handle = WaitlistHandle(
message_attrs=(key, value, timestamp_ms, headers),
batch_builder=None,
send_future=self._loop.create_future())
self._waitlist[tp].append(handle)

try:
async with async_timeout.timeout(timeout):
return await handle.send_future
except asyncio.TimeoutError:
raise KafkaTimeoutError()

def _check_errors(self):
if self._closed:
# this can happen when producer is closing but try to send some
# messages in async task
raise ProducerClosed()
if self._exception is not None:
raise copy.copy(self._exception)

def _try_add_message(self, tp, key, value, timestamp_ms, headers):
pending_batches = self._batches.get(tp)
if not pending_batches:
builder = self.create_builder()
batch = self._append_batch(builder, tp)
else:
batch = pending_batches[-1]
return batch.append(key, value, timestamp_ms, headers=headers)

def _process_waitlist(self, tp):
while self._waitlist.get(tp):
handle = self._waitlist[tp].popleft()
# We do not send messages that are no longer waited for, just clean
# them up.
if handle.send_future.done():
continue

if handle.batch_builder is None:
msg_future = self._try_add_message(tp, *handle.message_attrs)
if msg_future is not None:
handle.send_future.set_result(msg_future)
else:
if not self._batches.get(tp):
builder = handle.batch_builder
batch = self._append_batch(builder, tp)
handle.send_future.set_result(batch.deliver_future)

# Return item to waitlist if it was not processed
if not handle.send_future.done():
self._waitlist[tp].appendleft(handle)
break

def data_waiter(self):
""" Return waiter future that will be resolved when accumulator contain
Expand All @@ -361,27 +392,36 @@ def _pop_batch(self, tp):
producer_id=self._txn_manager.producer_id,
producer_epoch=self._txn_manager.producer_epoch,
base_sequence=seq)
batch.drain_ready()
if len(self._batches[tp]) == 0:
del self._batches[tp]
self._pending_batches.add(batch)

if not_retry:
def cb(fut, batch=batch, self=self):
self._pending_batches.remove(batch)
batch.future.add_done_callback(cb)
batch.deliver_future.add_done_callback(cb)

batch.inc_retry_count()
# Populate next batch based on waitlist items (if any)
self._process_waitlist(tp)
return batch

def reenqueue(self, batch):
tp = batch.tp
self._batches[tp].appendleft(batch)
self._pending_batches.remove(batch)
batch.reset_drain()

def drain_by_nodes(self, ignore_nodes, muted_partitions=set()):
""" Group batches by leader to partition nodes. """
nodes = collections.defaultdict(dict)
unknown_leaders_exist = False

# Reset the data waiter before processing batches to allow waitlist
# processing to reset it.
if not self._wait_data_future.done():
self._wait_data_future.set_result(None)
self._wait_data_future = self._loop.create_future()

for tp in list(self._batches.keys()):
# Just ignoring by node is not enough, as leader can change during
# the cycle
Expand Down Expand Up @@ -413,13 +453,6 @@ def drain_by_nodes(self, ignore_nodes, muted_partitions=set()):
# delivery future here, no message futures.
batch.done_noack()

# all batches are drained from accumulator
# so create "wait data" future again for waiting new data in send
# task
if not self._wait_data_future.done():
self._wait_data_future.set_result(None)
self._wait_data_future = self._loop.create_future()

return nodes, unknown_leaders_exist

def create_builder(self):
Expand Down Expand Up @@ -467,18 +500,22 @@ async def add_batch(self, builder, tp, timeout):
aiokafka.errors.KafkaTimeoutError: the batch could not be added
within the specified timeout.
"""
if self._closed:
raise ProducerClosed()
if self._exception is not None:
raise copy.copy(self._exception)

start = time.monotonic()
while timeout > 0:
pending = self._batches.get(tp)
if pending:
await pending[-1].wait_drain(timeout=timeout)
timeout -= time.monotonic() - start
else:
batch = self._append_batch(builder, tp)
return asyncio.shield(batch.future)
raise KafkaTimeoutError()
self._check_errors()

pending = self._batches.get(tp)
if not pending:
batch = self._append_batch(builder, tp)
return asyncio.shield(batch.deliver_future)

# Delay the send until there is no pending batches
handle = WaitlistHandle(
message_attrs=None,
batch_builder=builder,
send_future=self._loop.create_future())
self._waitlist[tp].append(handle)
try:
async with async_timeout.timeout(timeout):
batch_deliver_future = await handle.send_future
return asyncio.shield(batch_deliver_future)
except asyncio.TimeoutError:
raise KafkaTimeoutError()
8 changes: 4 additions & 4 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,10 @@ async def send(
tp = TopicPartition(topic, partition)
log.debug("Sending (key=%s value=%s) to %s", key, value, tp)

fut = await self._message_accumulator.add_message(
deliver_future = await self._message_accumulator.add_message(
tp, key_bytes, value_bytes, self._request_timeout_ms / 1000,
timestamp_ms=timestamp_ms, headers=headers)
return fut
return deliver_future

async def send_and_wait(
self, topic, value=None, key=None, partition=None,
Expand Down Expand Up @@ -515,9 +515,9 @@ async def send_batch(self, batch, topic, *, partition):

tp = TopicPartition(topic, partition)
log.debug("Sending batch to %s", tp)
future = await self._message_accumulator.add_batch(
deliver_future = await self._message_accumulator.add_batch(
batch, tp, self._request_timeout_ms / 1000)
return future
return deliver_future

def _ensure_transactional(self):
if self._txn_manager is None or \
Expand Down
Loading