From e9902ce2bf073778167728a61e43ef6ceeae7239 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Tue, 1 Nov 2022 14:20:45 +0200 Subject: [PATCH] Decide startup readiness based on the highest record offset Karapace relied on poll to return empty topic-messages dictionary. This change will mark state to be ready when the highest offset is catched up. --- karapace/schema_reader.py | 54 ++++++++++++++++-- tests/unit/test_schema_reader.py | 94 +++++++++++++++++++++++++++++++- 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index a3e051b75..2154dc75b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -5,9 +5,15 @@ See LICENSE for details """ from contextlib import closing, ExitStack -from kafka import KafkaConsumer +from kafka import KafkaConsumer, TopicPartition from kafka.admin import KafkaAdminClient, NewTopic -from kafka.errors import KafkaConfigurationError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError +from kafka.errors import ( + KafkaConfigurationError, + KafkaTimeoutError, + NoBrokersAvailable, + NodeNotReadyError, + TopicAlreadyExistsError, +) from karapace import constants from karapace.config import Config from karapace.errors import InvalidSchema @@ -31,6 +37,7 @@ # The value `0` is a valid offset and it represents the first message produced # to a topic, therefore it can not be used. +OFFSET_UNINITIALIZED = -2 OFFSET_EMPTY = -1 LOG = logging.getLogger(__name__) @@ -156,7 +163,7 @@ def __init__( # the topic has not been compacted yet, waiting allows this to consume # the soft delete message and return the correct data instead of the # old stale version that has not been deleted yet.) - self.offset = OFFSET_EMPTY + self.offset = OFFSET_UNINITIALIZED self.ready = False # This event controls when the Reader should stop running, it will be @@ -247,18 +254,55 @@ def run(self) -> None: self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS) while not self._stop.is_set(): + if self.offset == OFFSET_UNINITIALIZED: + # Handles also a unusual case of purged schemas topic where starting offset can be > 0 + # and no records to process. + self.offset = self._get_beginning_offset() try: self.handle_messages() except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") LOG.exception("Unexpected exception in schema reader loop") + def _get_beginning_offset(self) -> int: + try: + offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)]) + # Offset in the response is the offset for last offset. + # Reduce by one for matching on startup. + beginning_offset = list(offsets.values())[0] - 1 + return beginning_offset + except KafkaTimeoutError: + LOG.exception("Reading begin offsets timed out.") + except Exception as e: # pylint: disable=broad-except + self.stats.unexpected_exception(ex=e, where="_get_beginning_offset") + LOG.exception("Unexpected exception when reading begin offsets.") + return OFFSET_UNINITIALIZED + + def _is_ready(self) -> bool: + if self.ready: + return True + + try: + offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)]) + # Offset in the response is the offset for the next upcoming message. + # Reduce by one for actual highest offset. + highest_offset = list(offsets.values())[0] - 1 + return self.offset >= highest_offset + except KafkaTimeoutError: + LOG.exception("Reading end offsets timed out.") + return False + except Exception as e: # pylint: disable=broad-except + self.stats.unexpected_exception(ex=e, where="_is_ready") + LOG.exception("Unexpected exception when reading end offsets.") + return False + def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) - if self.ready is False and not raw_msgs: - self.ready = True + if self.ready is False: + self.ready = self._is_ready() + watch_offsets = False if self.master_coordinator is not None: are_we_master, _ = self.master_coordinator.get_master_info() diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 048fb56ca..803e5b6fd 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -1,6 +1,17 @@ +""" +karapace - Test schema reader +Copyright (c) 2022 Aiven Ltd +See LICENSE for details +""" + from concurrent.futures import ThreadPoolExecutor -from karapace.schema_reader import OffsetsWatcher +from dataclasses import dataclass +from karapace.config import DEFAULTS +from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED, OffsetsWatcher +from tests.base_testcase import BaseTestCase +from unittest.mock import Mock +import pytest import random import time @@ -52,3 +63,84 @@ def produce() -> None: ), "Expected greatest offset is not one less than total count" assert produced_cnt == 100, "Did not produce expected amount of records" assert consumed_cnt == 100, "Did not consume expected amount of records" + + +@dataclass +class ReadinessTestCase(BaseTestCase): + cur_offset: int + end_offset: int + expected: bool + + +@pytest.mark.parametrize( + "testcase", + [ + ReadinessTestCase( + test_name="Empty schemas topic", + cur_offset=OFFSET_EMPTY, + end_offset=0, + expected=True, + ), + ReadinessTestCase( + test_name="Schema topic with data, beginning offset is 0", + cur_offset=OFFSET_EMPTY, + end_offset=100, + expected=False, + ), + ReadinessTestCase( + test_name="Schema topic with single record", + cur_offset=OFFSET_EMPTY, + end_offset=1, + expected=False, + ), + ReadinessTestCase( + test_name="Beginning offset cannot be resolved.", + cur_offset=OFFSET_UNINITIALIZED, + end_offset=0, + expected=False, + ), + ReadinessTestCase( + test_name="Purged/compacted schemas topic, begin offset n > 0, end offset n+1", + cur_offset=90, + end_offset=91, + expected=True, + ), + ReadinessTestCase( + test_name="Schema topic with single record and replayed", + cur_offset=0, + end_offset=0, + expected=True, + ), + ReadinessTestCase( + test_name="Schema topic with data but compacted or purged, cur offset 10", + cur_offset=10, + end_offset=100, + expected=False, + ), + ReadinessTestCase( + test_name="Schema topic with data, cur offset is highest", + cur_offset=99, + end_offset=100, + expected=True, + ), + ReadinessTestCase( + test_name="Schema topic with data, cur offset is greater than highest", + cur_offset=101, + end_offset=100, + expected=True, + ), + ], +) +def test_readiness_check(testcase: ReadinessTestCase) -> None: + key_formatter_mock = Mock() + consumer_mock = Mock() + consumer_mock.poll.return_value = {} + # Return dict {partition: offsets}, end offset is the next upcoming record offset + consumer_mock.end_offsets.return_value = {0: testcase.end_offset} + + schema_reader = KafkaSchemaReader(config=DEFAULTS, key_formatter=key_formatter_mock, master_coordinator=None) + schema_reader.consumer = consumer_mock + schema_reader.offset = testcase.cur_offset + + schema_reader.handle_messages() + assert schema_reader.ready is testcase.expected