Skip to content

Commit

Permalink
fix: consume multiple schema messages on each consume call
Browse files Browse the repository at this point in the history
By default the consumer returns only a single message. This creates
a very slow startup due a lot of overhead.
  • Loading branch information
jjaakola-aiven committed May 7, 2024
1 parent ee94d17 commit 038ea67
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ def __init__(
Thread.__init__(self, name="schema-reader")
self.master_coordinator = master_coordinator
self.timeout_s = 0.2
# Consumer default is 1 message for each consume call
# For good startup performance the consumption of multiple
# records for each consume round is essential.
self.max_messages_to_process = 1000
self.config = config

self.database = database
Expand All @@ -146,6 +150,7 @@ def __init__(
self.offset = OFFSET_UNINITIALIZED
self._highest_offset = OFFSET_UNINITIALIZED
self.ready = False
self.prev_processed_offset = 0

# This event controls when the Reader should stop running, it will be
# set by another thread (e.g. `KarapaceSchemaRegistry`)
Expand All @@ -157,6 +162,8 @@ def __init__(
self.processed_canonical_keys_total = 0
self.processed_deprecated_karapace_keys_total = 0
self.last_check = time.monotonic()
self.start_time = time.monotonic()
self.startup_previous_processed_offset = 0

def close(self) -> None:
LOG.info("Closing schema_reader")
Expand Down Expand Up @@ -282,15 +289,21 @@ def _is_ready(self) -> bool:
cur_time = time.monotonic()
time_from_last_check = cur_time - self.last_check
progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2)
startup_processed_message_per_second = (self.offset - self.startup_previous_processed_offset) / time_from_last_check
LOG.info(
"Replay progress (%s): %s/%s (%s %%)",
"Replay progress (%s): %s/%s (%s %%) (recs/s %s)",
round(time_from_last_check, 2),
self.offset,
self._highest_offset,
progress_pct,
startup_processed_message_per_second,
)
self.last_check = cur_time
return self.offset >= self._highest_offset
self.startup_previous_processed_offset = self.offset
ready = self.offset >= self._highest_offset
if ready:
LOG.info("Ready in %s seconds", time.monotonic() - self.start_time)
return ready

def highest_offset(self) -> int:
return max(self._highest_offset, self._offset_watcher.greatest_offset())
Expand All @@ -307,7 +320,7 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None:
def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"

msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s)
msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process)
if self.ready is False:
self.ready = self._is_ready()

Expand Down

0 comments on commit 038ea67

Please sign in to comment.