Skip to content

Commit

Permalink
Decide startup readiness based on the highest record offset
Browse files Browse the repository at this point in the history
Karapace relied on poll to return empty topic-messages dictionary.
This change will mark state to be ready when the highest offset
is catched up.
  • Loading branch information
jjaakola-aiven committed Nov 3, 2022
1 parent 6d0bf5a commit 254c2bc
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 6 deletions.
54 changes: 49 additions & 5 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
See LICENSE for details
"""
from contextlib import closing, ExitStack
from kafka import KafkaConsumer
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaConfigurationError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.errors import (
KafkaConfigurationError,
KafkaTimeoutError,
NoBrokersAvailable,
NodeNotReadyError,
TopicAlreadyExistsError,
)
from karapace import constants
from karapace.config import Config
from karapace.errors import InvalidSchema
Expand All @@ -31,6 +37,7 @@

# The value `0` is a valid offset and it represents the first message produced
# to a topic, therefore it can not be used.
OFFSET_UNINITIALIZED = -2
OFFSET_EMPTY = -1
LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -156,7 +163,7 @@ def __init__(
# the topic has not been compacted yet, waiting allows this to consume
# the soft delete message and return the correct data instead of the
# old stale version that has not been deleted yet.)
self.offset = OFFSET_EMPTY
self.offset = OFFSET_UNINITIALIZED
self.ready = False

# This event controls when the Reader should stop running, it will be
Expand Down Expand Up @@ -247,18 +254,55 @@ def run(self) -> None:
self._stop.wait(timeout=SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS)

while not self._stop.is_set():
if self.offset == OFFSET_UNINITIALIZED:
# Handles also a unusual case of purged schemas topic where starting offset can be > 0
# and no records to process.
self.offset = self._get_beginning_offset()
try:
self.handle_messages()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")

def _get_beginning_offset(self) -> int:
try:
offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)])
# Offset in the response is the offset for last offset.
# Reduce by one for matching on startup.
beginning_offset = list(offsets.values())[0] - 1
return beginning_offset
except KafkaTimeoutError:
LOG.exception("Reading begin offsets timed out.")
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="_get_beginning_offset")
LOG.exception("Unexpected exception when reading begin offsets.")
return OFFSET_UNINITIALIZED

def _is_ready(self) -> bool:
if self.ready:
return True

try:
offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)])
# Offset in the response is the offset for the next upcoming message.
# Reduce by one for actual highest offset.
highest_offset = list(offsets.values())[0] - 1
return self.offset >= highest_offset
except KafkaTimeoutError:
LOG.exception("Reading end offsets timed out.")
return False
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="_is_ready")
LOG.exception("Unexpected exception when reading end offsets.")
return False

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"

raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms)
if self.ready is False and not raw_msgs:
self.ready = True
if self.ready is False:
self.ready = self._is_ready()

watch_offsets = False
if self.master_coordinator is not None:
are_we_master, _ = self.master_coordinator.get_master_info()
Expand Down
94 changes: 93 additions & 1 deletion tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
"""
karapace - Test schema reader
Copyright (c) 2022 Aiven Ltd
See LICENSE for details
"""

from concurrent.futures import ThreadPoolExecutor
from karapace.schema_reader import OffsetsWatcher
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED, OffsetsWatcher
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

import pytest
import random
import time

Expand Down Expand Up @@ -52,3 +63,84 @@ def produce() -> None:
), "Expected greatest offset is not one less than total count"
assert produced_cnt == 100, "Did not produce expected amount of records"
assert consumed_cnt == 100, "Did not consume expected amount of records"


@dataclass
class ReadinessTestCase(BaseTestCase):
cur_offset: int
end_offset: int
expected: bool


@pytest.mark.parametrize(
"testcase",
[
ReadinessTestCase(
test_name="Empty schemas topic",
cur_offset=OFFSET_EMPTY,
end_offset=0,
expected=True,
),
ReadinessTestCase(
test_name="Schema topic with data, beginning offset is 0",
cur_offset=OFFSET_EMPTY,
end_offset=100,
expected=False,
),
ReadinessTestCase(
test_name="Schema topic with single record",
cur_offset=OFFSET_EMPTY,
end_offset=1,
expected=False,
),
ReadinessTestCase(
test_name="Beginning offset cannot be resolved.",
cur_offset=OFFSET_UNINITIALIZED,
end_offset=0,
expected=False,
),
ReadinessTestCase(
test_name="Purged/compacted schemas topic, begin offset n > 0, end offset n+1",
cur_offset=90,
end_offset=91,
expected=True,
),
ReadinessTestCase(
test_name="Schema topic with single record and replayed",
cur_offset=0,
end_offset=0,
expected=True,
),
ReadinessTestCase(
test_name="Schema topic with data but compacted or purged, cur offset 10",
cur_offset=10,
end_offset=100,
expected=False,
),
ReadinessTestCase(
test_name="Schema topic with data, cur offset is highest",
cur_offset=99,
end_offset=100,
expected=True,
),
ReadinessTestCase(
test_name="Schema topic with data, cur offset is greater than highest",
cur_offset=101,
end_offset=100,
expected=True,
),
],
)
def test_readiness_check(testcase: ReadinessTestCase) -> None:
key_formatter_mock = Mock()
consumer_mock = Mock()
consumer_mock.poll.return_value = {}
# Return dict {partition: offsets}, end offset is the next upcoming record offset
consumer_mock.end_offsets.return_value = {0: testcase.end_offset}

schema_reader = KafkaSchemaReader(config=DEFAULTS, key_formatter=key_formatter_mock, master_coordinator=None)
schema_reader.consumer = consumer_mock
schema_reader.offset = testcase.cur_offset

schema_reader.handle_messages()
assert schema_reader.ready is testcase.expected

0 comments on commit 254c2bc

Please sign in to comment.