Skip to content

Commit

Permalink
Decide startup readiness based on the highest record offset
Browse files Browse the repository at this point in the history
Karapace relied on poll to return empty topic-messages dictionary.
This change will mark state to be ready when the highest offset
is catched up.
  • Loading branch information
jjaakola-aiven committed Nov 1, 2022
1 parent 6d0bf5a commit 79e9e73
Showing 1 changed file with 50 additions and 6 deletions.
56 changes: 50 additions & 6 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
See LICENSE for details
"""
from contextlib import closing, ExitStack
from kafka import KafkaConsumer
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaConfigurationError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.errors import (
KafkaConfigurationError,
KafkaTimeoutError,
NoBrokersAvailable,
NodeNotReadyError,
TopicAlreadyExistsError,
)
from karapace import constants
from karapace.config import Config
from karapace.errors import InvalidSchema
Expand All @@ -31,7 +37,9 @@

# The value `0` is a valid offset and it represents the first message produced
# to a topic, therefore it can not be used.
OFFSET_EMPTY = -1
# On startup with pristine schema topic the beginning and end offset is -1
# therefore it can not be used.
OFFSET_UNINITIALIZED = -2
LOG = logging.getLogger(__name__)

KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS = 2.0
Expand Down Expand Up @@ -156,7 +164,7 @@ def __init__(
# the topic has not been compacted yet, waiting allows this to consume
# the soft delete message and return the correct data instead of the
# old stale version that has not been deleted yet.)
self.offset = OFFSET_EMPTY
self.offset = OFFSET_UNINITIALIZED
self.ready = False

# This event controls when the Reader should stop running, it will be
Expand Down Expand Up @@ -247,18 +255,54 @@ def run(self) -> None:
self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)

while not self._stop.is_set():
if self.offset == OFFSET_UNINITIALIZED:
# Handles also a unusual case of purged schemas topic where starting offset can be > 0
self.offset = self._get_beginning_offset()
try:
self.handle_messages()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")

def _get_beginning_offset(self) -> int:
try:
offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)])
# Offset in the response is the offset for last offset.
# Reduce by one for matching on startup.
beginning_offset = list(offsets.values())[0] - 1
return beginning_offset
except KafkaTimeoutError:
LOG.exception("Reading begin offsets timed out.")
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="_get_beginning_offset")
LOG.exception("Unexpected exception when reading begin offsets.")
return OFFSET_UNINITIALIZED

def _is_ready(self) -> bool:
if self.ready:
return True

try:
offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)])
# Offset in the response is the offset for the next upcoming message.
# Reduce by one for actual highest offset.
highest_offset = list(offsets.values())[0] - 1
return self.offset == highest_offset
except KafkaTimeoutError:
LOG.exception("Reading end offsets timed out.")
return False
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="_is_ready")
LOG.exception("Unexpected exception when reading end offsets.")
return False

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"

raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms)
if self.ready is False and not raw_msgs:
self.ready = True
if self.ready is False:
self.ready = self._is_ready()

watch_offsets = False
if self.master_coordinator is not None:
are_we_master, _ = self.master_coordinator.get_master_info()
Expand Down

0 comments on commit 79e9e73

Please sign in to comment.