diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index 225d4b01a..8fa112bd5 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -7,7 +7,7 @@ from enum import Enum from kafka import KafkaConsumer, KafkaProducer from kafka.admin import KafkaAdminClient -from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError +from kafka.errors import TopicAlreadyExistsError from kafka.structs import PartitionMetadata from karapace import constants from karapace.anonymize_schemas import anonymize_avro @@ -15,9 +15,10 @@ from karapace.key_format import KeyFormatter from karapace.schema_reader import new_schema_topic_from_config from karapace.typing import JsonData -from karapace.utils import json_decode, json_encode, KarapaceKafkaClient, Timeout +from karapace.utils import json_decode, json_encode, KarapaceKafkaClient from pathlib import Path from tempfile import mkstemp +from tenacity import retry, RetryCallState, stop_after_delay, wait_fixed from typing import AbstractSet, Callable, IO, Optional, TextIO, Tuple, Union import argparse @@ -26,7 +27,6 @@ import logging import os import sys -import time LOG = logging.getLogger(__name__) @@ -49,6 +49,28 @@ class PartitionCountError(BackupError): pass +def __before_sleep(description: str) -> Callable[[RetryCallState], None]: + """Returns a function to print a user-friendly message before going to sleep in retries. + + :param description: of the action, should compose well with _failed_ and _returned_ as next words. + :returns: a function that can be used in ``tenacity.retry``'s ``before_sleep`` argument for printing a user-friendly + message that explains which action failed, that a retry is going to happen, and how to abort if desired. + """ + + def before_sleep(it: RetryCallState) -> None: + nonlocal description + outcome = it.outcome + if outcome is None: + result = "did not complete yet" + elif outcome.failed: + result = f"failed ({outcome.exception()})" + else: + result = f"returned {outcome.result()!r}" + print(f"{description} {result}, retrying... (Ctrl+C to abort)", file=sys.stderr) + + return before_sleep + + def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[PartitionMetadata]]) -> None: """Checks that the given topic has exactly one partition. @@ -64,6 +86,81 @@ def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[Pa ) +@contextlib.contextmanager +def _admin(config: Config) -> KafkaAdminClient: + """Creates an automatically closing Kafka admin client. + + :param config: for the client. + :raises Exception: if client creation fails, concrete exception types are unknown, see Kafka implementation. + """ + + @retry( + before_sleep=__before_sleep("Kafka Admin client creation"), + reraise=True, + stop=stop_after_delay(60), # seconds + wait=wait_fixed(1), # seconds + ) + def __admin() -> KafkaAdminClient: + nonlocal config + return KafkaAdminClient( + api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, + bootstrap_servers=config["bootstrap_uri"], + client_id=config["client_id"], + security_protocol=config["security_protocol"], + ssl_cafile=config["ssl_cafile"], + ssl_certfile=config["ssl_certfile"], + ssl_keyfile=config["ssl_keyfile"], + kafka_client=KarapaceKafkaClient, + ) + + admin = __admin() + try: + yield admin + finally: + admin.close() + + +@retry( + before_sleep=__before_sleep("Schemas topic creation"), + reraise=True, + stop=stop_after_delay(60), # seconds + wait=wait_fixed(1), # seconds +) +def _maybe_create_topic(config: Config, name: Optional[str] = None) -> Optional[bool]: + """Creates the topic if the given name and the one in the config are the same. + + :param config: for the admin client. + :param name: of the topic to create. + :returns: ``True`` if the topic was created, ``False`` if it already exists, and ``None`` if the given name does not + match the name of the schema topic in the config, in which case nothing has been done. + :raises Exception: if topic creation fails, concrete exception types are unknown, see Kafka implementation. + """ + topic = new_schema_topic_from_config(config) + + if name is not None and topic.name != name: + LOG.warning( + "Not creating topic, because the name %r from the config and the name %r from the CLI differ.", + topic.name, + name, + ) + return None + + with _admin(config) as admin: + try: + admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) + LOG.info( + "Created topic %r (partition count: %s, replication factor: %s, config: %s)", + topic.name, + topic.num_partitions, + topic.replication_factor, + topic.topic_configs, + ) + return True + except TopicAlreadyExistsError: + LOG.debug("Topic %r already exists", topic.name) + return False + + @contextlib.contextmanager def _consumer(config: Config, topic: str) -> KafkaConsumer: """Creates an automatically closing Kafka consumer client. @@ -195,7 +292,6 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str] self.config = config self.backup_location = backup_path self.topic_name = topic_option or self.config["topic_name"] - self.admin_client = None self.timeout_ms = 1000 # Schema key formatter @@ -203,70 +299,11 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str] if self.topic_name == constants.DEFAULT_SCHEMA_TOPIC or self.config.get("force_key_correction", False): self.key_formatter = KeyFormatter() - def init_admin_client(self) -> None: - start_time = time.monotonic() - wait_time = constants.MINUTE - while True: - if time.monotonic() - start_time > wait_time: - raise Timeout(f"Timeout ({wait_time}) on creating admin client") - - try: - self.admin_client = KafkaAdminClient( - api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, - bootstrap_servers=self.config["bootstrap_uri"], - client_id=self.config["client_id"], - security_protocol=self.config["security_protocol"], - ssl_cafile=self.config["ssl_cafile"], - ssl_certfile=self.config["ssl_certfile"], - ssl_keyfile=self.config["ssl_keyfile"], - kafka_client=KarapaceKafkaClient, - ) - break - except (NodeNotReadyError, NoBrokersAvailable, AssertionError): - LOG.warning("No Brokers available yet, retrying init_admin_client()") - except: # pylint: disable=bare-except - LOG.exception("Failed to initialize admin client, retrying init_admin_client()") - - time.sleep(2.0) - - def _create_schema_topic_if_needed(self) -> None: - if self.topic_name != self.config["topic_name"]: - LOG.info("Topic name overridden, not creating a topic with schema configuration") - return - - self.init_admin_client() - start_time = time.monotonic() - wait_time = constants.MINUTE - while True: - if time.monotonic() - start_time > wait_time: - raise Timeout(f"Timeout ({wait_time}) on creating admin client") - - schema_topic = new_schema_topic_from_config(self.config) - try: - LOG.info("Creating schema topic: %r", schema_topic) - self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) - LOG.info("Topic: %r created successfully", self.config["topic_name"]) - break - except TopicAlreadyExistsError: - LOG.info("Topic: %r already exists", self.config["topic_name"]) - break - except: # pylint: disable=bare-except - LOG.exception( - "Failed to create topic: %r, retrying _create_schema_topic_if_needed()", self.config["topic_name"] - ) - time.sleep(5) - - def close(self) -> None: - LOG.info("Closing schema backup reader") - if self.admin_client: - self.admin_client.close() - self.admin_client = None - def restore_backup(self) -> None: if not os.path.exists(self.backup_location): raise BackupError("Backup location doesn't exist") - self._create_schema_topic_if_needed() + _maybe_create_topic(self.config, self.topic_name) with _producer(self.config, self.topic_name) as producer: LOG.info("Starting backup restore for topic: %r", self.topic_name) @@ -276,7 +313,6 @@ def restore_backup(self) -> None: self._restore_backup_version_2(producer, fp) else: self._restore_backup_version_1_single_array(producer, fp) - self.close() def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None: key = self.encode_key(item[0]) @@ -321,7 +357,6 @@ def export(self, export_func, *, overwrite: Optional[bool] = None) -> None: fp.write(ser) LOG.info("Schema export written to %r", "stdout" if fp is sys.stdout else self.backup_location) - self.close() def encode_key(self, key: Optional[Union[JsonData, str]]) -> Optional[bytes]: if key == "null": @@ -388,25 +423,31 @@ def parse_args(): return parser.parse_args() -def main() -> int: - args = parse_args() - - with open(args.config, encoding="utf8") as handler: - config = read_config(handler) - - sb = SchemaBackup(config, args.location, args.topic) - - if args.command == "get": - sb.export(serialize_record, overwrite=args.overwrite) - return 0 - if args.command == "restore": - sb.restore_backup() - return 0 - if args.command == "export-anonymized-avro-schemas": - sb.export(anonymize_avro_schema_message, overwrite=args.overwrite) - return 0 - return 1 +def main() -> None: + try: + args = parse_args() + + with open(args.config, encoding="utf8") as handler: + config = read_config(handler) + + sb = SchemaBackup(config, args.location, args.topic) + + if args.command == "get": + sb.export(serialize_record, overwrite=args.overwrite) + elif args.command == "restore": + sb.restore_backup() + elif args.command == "export-anonymized-avro-schemas": + sb.export(anonymize_avro_schema_message, overwrite=args.overwrite) + else: + # Only reachable if a new subcommand was added that is not mapped above. There are other ways with argparse + # to handle this, but all rely on the programmer doing exactly the right thing. Only switching to another + # CLI framework would provide the ability to not handle this situation manually while ensuring that it is + # not possible to add a new subcommand without also providing a handler for it. + raise SystemExit(f"Entered unreachable code, unknown command: {args.command!r}") + except KeyboardInterrupt as e: + # Not an error -- user choice -- and thus should not end up in a Python stacktrace. + raise SystemExit(2) from e if __name__ == "__main__": - sys.exit(main()) + main() diff --git a/requirements.txt b/requirements.txt index a5a9c362b..5e3d0085d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ jsonschema==3.2.0 networkx==2.5 protobuf==3.19.5 python-dateutil==2.8.2 +tenacity==8.0.1 ujson==5.7.0 # Using 0.15.0 until following issue gets fixed. diff --git a/tests/unit/test_schema_backup.py b/tests/unit/test_schema_backup.py index 84c90a497..310784008 100644 --- a/tests/unit/test_schema_backup.py +++ b/tests/unit/test_schema_backup.py @@ -2,19 +2,83 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin import NewTopic +from kafka.errors import TopicAlreadyExistsError from kafka.structs import PartitionMetadata from karapace import config from karapace.config import Config -from karapace.schema_backup import _consumer, _producer, _writer, PartitionCountError +from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.schema_backup import _admin, _consumer, _maybe_create_topic, _producer, _writer, PartitionCountError from pathlib import Path from types import FunctionType -from typing import AbstractSet, Callable, Type, Union +from typing import AbstractSet, Callable, List, Type, Union from unittest import mock +from unittest.mock import MagicMock import pytest import sys +ADMIN_NEW_FQN = f"{KafkaAdminClient.__module__}.{KafkaAdminClient.__qualname__}.__new__" + + +class TestAdmin: + @mock.patch(ADMIN_NEW_FQN, autospec=True) + def test_auto_closing(self, admin_new: MagicMock): + admin_mock = admin_new.return_value + with _admin(config.DEFAULTS) as admin: + assert admin is admin_mock + assert admin_mock.close.call_count == 1 + + @mock.patch("time.sleep", autospec=True) + @mock.patch(ADMIN_NEW_FQN, autospec=True) + def test_retry(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None: + admin_mock = admin_new.return_value + admin_new.side_effect = [Exception("1"), Exception("2"), admin_mock] + with _admin(config.DEFAULTS) as admin: + assert admin is admin_mock + assert sleep_mock.call_count == 2 # proof that we waited between retries + assert admin_mock.close.call_count == 1 + + @pytest.mark.parametrize("e", (KeyboardInterrupt, SystemExit)) + @mock.patch("time.sleep", autospec=True) + @mock.patch(ADMIN_NEW_FQN, autospec=True) + def test_interrupts(self, admin_new: MagicMock, sleep_mock: MagicMock, e: Type[BaseException]) -> None: + admin_new.side_effect = [e()] + with pytest.raises(e): + with _admin(config.DEFAULTS): + pass + assert sleep_mock.call_count == 0 # proof that we did not retry + + +class TestMaybeCreateTopic: + @mock.patch(ADMIN_NEW_FQN, autospec=True) + def test_ok(self, admin_new: MagicMock) -> None: + assert _maybe_create_topic(config.DEFAULTS) is True + create_topics: MagicMock = admin_new.return_value.create_topics + assert create_topics.call_count == 1 + topic_list: List[NewTopic] = create_topics.call_args[0][0] + assert len(topic_list) == 1 + assert topic_list[0].name == DEFAULT_SCHEMA_TOPIC + + @mock.patch(ADMIN_NEW_FQN, autospec=True) + def test_exists(self, admin_new: MagicMock) -> None: + create_topics: MagicMock = admin_new.return_value.create_topics + create_topics.side_effect = TopicAlreadyExistsError() + assert _maybe_create_topic(config.DEFAULTS) is False + + @mock.patch("time.sleep", autospec=True) + @mock.patch(ADMIN_NEW_FQN, autospec=True) + def test_retry(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None: + create_topics: MagicMock = admin_new.return_value.create_topics + create_topics.side_effect = [Exception("1"), Exception("2"), None] + assert _maybe_create_topic(config.DEFAULTS) is True + assert sleep_mock.call_count == 2 # proof that we waited between retries + + def test_name_override(self) -> None: + assert "custom-name" != DEFAULT_SCHEMA_TOPIC + assert _maybe_create_topic(config.DEFAULTS, "custom-name") is None + class TestClients: @staticmethod