diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 70aaa3a77..93e37f711 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -121,6 +121,10 @@ def __init__( Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator self.timeout_s = 0.2 + # Consumer default is 1 message for each consume call + # For good startup performance the consumption of multiple + # records for each consume round is essential. + self.max_messages_to_process = 1000 self.config = config self.database = database @@ -146,6 +150,7 @@ def __init__( self.offset = OFFSET_UNINITIALIZED self._highest_offset = OFFSET_UNINITIALIZED self.ready = False + self.prev_processed_offset = 0 # This event controls when the Reader should stop running, it will be # set by another thread (e.g. `KarapaceSchemaRegistry`) @@ -157,6 +162,8 @@ def __init__( self.processed_canonical_keys_total = 0 self.processed_deprecated_karapace_keys_total = 0 self.last_check = time.monotonic() + self.start_time = time.monotonic() + self.startup_previous_processed_offset = 0 def close(self) -> None: LOG.info("Closing schema_reader") @@ -282,15 +289,21 @@ def _is_ready(self) -> bool: cur_time = time.monotonic() time_from_last_check = cur_time - self.last_check progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2) + startup_processed_message_per_second = (self.offset - self.startup_previous_processed_offset) / time_from_last_check LOG.info( - "Replay progress (%s): %s/%s (%s %%)", + "Replay progress (%s): %s/%s (%s %%) (recs/s %s)", round(time_from_last_check, 2), self.offset, self._highest_offset, progress_pct, + startup_processed_message_per_second, ) self.last_check = cur_time - return self.offset >= self._highest_offset + self.startup_previous_processed_offset = self.offset + ready = self.offset >= self._highest_offset + if ready: + LOG.info("Ready in %s seconds", time.monotonic() - self.start_time) + return ready def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) @@ -307,7 +320,7 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s) + msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process) if self.ready is False: self.ready = self._is_ready()