From 3cba05d786f40e844907bc37ac5f5c36f631c953 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 29 May 2024 14:24:15 +0200 Subject: [PATCH 01/13] utils: added decorator to simplify common error handling - we add a simple decorator which allows arguments for the exceptions to catch and also for the exception to later reraise from the caught exception - this simplifies the code within the version manager when we deal with versions that could be integer strings --- karapace/utils.py | 18 ++++++++++++++++++ tests/unit/test_utils.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 tests/unit/test_utils.py diff --git a/karapace/utils.py b/karapace/utils.py index 39a1ae2ec..b544a5e08 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -19,6 +19,7 @@ from types import MappingProxyType from typing import AnyStr, cast, IO, Literal, NoReturn, overload, TypeVar +import functools import importlib import kafka.client_async import logging @@ -214,6 +215,23 @@ def convert_to_int(object_: dict, key: str, content_type: str) -> None: ) +def catch_and_raise_error(to_catch: tuple[Exception], to_raise: Exception): + def wrapper(f): + @functools.wraps(f) + def catcher(*args, **kwargs): + try: + value = f(*args, **kwargs) + if not value: + raise to_raise + return value + except to_catch as exc: + raise to_raise from exc + + return catcher + + return wrapper + + class KarapaceKafkaClient(KafkaClient): def __init__(self, **configs): kafka.client_async.BrokerConnection = KarapaceBrokerConnection diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 000000000..836f96c32 --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,29 @@ +""" +karapace - Test utils + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from karapace.utils import catch_and_raise_error + +import pytest + + +def test_catch_and_raise_error(): + class RaiseMe(Exception): + pass + + @catch_and_raise_error(to_catch=(ValueError,), to_raise=RaiseMe) + def v(): + int("not a number") + + with pytest.raises(RaiseMe): + v() + + @catch_and_raise_error(to_catch=(ZeroDivisionError,), to_raise=RaiseMe) + def z(): + _ = 100 / 0 + + with pytest.raises(RaiseMe): + z() From 538d62bd4bc0aef26364ab7e49a4a9e24a0b247b Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 29 May 2024 14:41:55 +0200 Subject: [PATCH 02/13] refactor: consolidate version resolution and validation logic --- karapace/schema_models.py | 38 ++++++++++- tests/unit/test_schema_models.py | 111 +++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 tests/unit/test_schema_models.py diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 46e3832d5..b2c7b6b94 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -10,7 +10,7 @@ from jsonschema import Draft7Validator from jsonschema.exceptions import SchemaError from karapace.dependency import Dependency -from karapace.errors import InvalidSchema +from karapace.errors import InvalidSchema, InvalidVersion, VersionNotFoundException from karapace.protobuf.exception import ( Error as ProtobufError, IllegalArgumentException, @@ -23,8 +23,8 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_references import Reference from karapace.schema_type import SchemaType -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject -from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError +from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version +from karapace.utils import assert_never, catch_and_raise_error, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Dict, Final, final, Mapping, Sequence import hashlib @@ -388,3 +388,35 @@ class SchemaVersion: schema_id: SchemaId schema: TypedSchema references: Sequence[Reference] | None + + +class SchemaVersionManager: + LATEST_SCHEMA_VERSION_TAG: Final = "latest" + MINUS_1_SCHEMA_VERSION_TAG: Final = "-1" + + @classmethod + def latest_schema_tag_condition(cls, version: Version): + return (str(version) == cls.LATEST_SCHEMA_VERSION_TAG) or (str(version) == cls.MINUS_1_SCHEMA_VERSION_TAG) + + @classmethod + @catch_and_raise_error(to_catch=(ValueError,), to_raise=VersionNotFoundException) + def resolve_version( + cls, + schema_versions: Mapping[ResolvedVersion, SchemaVersion], + version: Version, + ) -> ResolvedVersion | None: + max_version = max(schema_versions) + if cls.latest_schema_tag_condition(version): + return max_version + if (int(version) <= max_version) and (int(version) >= int(cls.MINUS_1_SCHEMA_VERSION_TAG)): + return ResolvedVersion(version) + return None + + @classmethod + @catch_and_raise_error(to_catch=(ValueError,), to_raise=InvalidVersion) + def validate_version(cls, version: Version) -> Version | str | None: + if cls.latest_schema_tag_condition(version): + return cls.LATEST_SCHEMA_VERSION_TAG + if int(version) > 0: + return version + return None diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py new file mode 100644 index 000000000..391f12630 --- /dev/null +++ b/tests/unit/test_schema_models.py @@ -0,0 +1,111 @@ +""" +karapace - Test schema models + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from avro.schema import Schema as AvroSchema +from karapace.errors import InvalidVersion, VersionNotFoundException +from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, SchemaVersionManager, TypedSchema +from karapace.schema_type import SchemaType +from karapace.typing import ResolvedVersion, Version +from typing import Callable + +import pytest + +# Schema versions factory fixture type +SVFCallable = Callable[[None], Callable[[ResolvedVersion, dict[str]], dict[ResolvedVersion, SchemaVersion]]] + + +class TestSchemaVersionManager: + @pytest.fixture + def avro_schema(self) -> str: + return '{"type":"record","name":"testRecord","fields":[{"type":"string","name":"test"}]}' + + @pytest.fixture + def avro_schema_parsed(self, avro_schema: str) -> AvroSchema: + return parse_avro_schema_definition(avro_schema) + + @pytest.fixture + def schema_versions_factory( + self, + avro_schema: str, + avro_schema_parsed: AvroSchema, + ) -> Callable[[ResolvedVersion, dict[str]], dict[ResolvedVersion, SchemaVersion]]: + def schema_versions(resolved_version: int, schema_version_data: dict[str] | None = None): + if schema_version_data is None: + schema_version_data = dict() + base_schema_version_data = dict( + subject="test-topic", + version=resolved_version, + deleted=False, + schema_id=1, + schema=TypedSchema( + schema_type=SchemaType.AVRO, + schema_str=avro_schema, + schema=avro_schema_parsed, + ), + references=None, + ) + return {ResolvedVersion(resolved_version): SchemaVersion(**{**base_schema_version_data, **schema_version_data})} + + return schema_versions + + def test_schema_version_manager_tags(self): + assert SchemaVersionManager.LATEST_SCHEMA_VERSION_TAG == "latest" + assert SchemaVersionManager.MINUS_1_SCHEMA_VERSION_TAG == "-1" + + @pytest.mark.parametrize( + "version, is_latest", + [("latest", True), ("-1", True), ("-20", False), (10, False)], + ) + def test_schema_version_manager_latest_schema_tag_condition( + self, + version: Version, + is_latest: bool, + ): + assert SchemaVersionManager.latest_schema_tag_condition(version) is is_latest + + @pytest.mark.parametrize("invalid_version", ["invalid_version", 0]) + def test_schema_version_manager_validate_version_invalid(self, invalid_version: str | int): + with pytest.raises(InvalidVersion): + SchemaVersionManager.validate_version(invalid_version) + + @pytest.mark.parametrize( + "version, validated_version", + [("latest", "latest"), (-1, "latest"), ("-1", "latest"), (10, 10)], + ) + def test_schema_version_manager_validate_version( + self, + version: Version, + validated_version: Version, + ): + assert SchemaVersionManager.validate_version(version) == validated_version + + @pytest.mark.parametrize( + "version, resolved_version", + [("-1", 10), (-1, 10), (1, 1), (10, 10), ("latest", 10)], + ) + def test_schema_version_manager_resolve_version( + self, + version: Version, + resolved_version: ResolvedVersion, + schema_versions_factory: SVFCallable, + ): + schema_versions = dict() + schema_versions.update(schema_versions_factory(1)) + schema_versions.update(schema_versions_factory(2)) + schema_versions.update(schema_versions_factory(10)) + assert SchemaVersionManager.resolve_version(schema_versions, version) == resolved_version + + @pytest.mark.parametrize("invalid_version", ["invalid_version", 0, -20, "-10", "100", 2000]) + def test_schema_version_manager_resolve_version_invalid( + self, + invalid_version: str | int, + schema_versions_factory: SVFCallable, + ): + schema_versions = dict() + schema_versions.update(schema_versions_factory(1)) + with pytest.raises(VersionNotFoundException): + SchemaVersionManager.resolve_version(schema_versions, invalid_version) From f83cd9d1b3f23c97ada8952b080f4136c46da989 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 29 May 2024 14:52:35 +0200 Subject: [PATCH 03/13] refactor: use `SchemaVersionManager` to handle version logic --- karapace/schema_models.py | 10 +++---- karapace/schema_registry.py | 48 ++++++++++---------------------- karapace/schema_registry_apis.py | 13 +++++++-- karapace/utils.py | 4 +-- tests/unit/test_schema_models.py | 15 +++++----- tests/unit/test_utils.py | 17 ++++------- 6 files changed, 44 insertions(+), 63 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index b2c7b6b94..d9020c351 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -24,7 +24,7 @@ from karapace.schema_references import Reference from karapace.schema_type import SchemaType from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version -from karapace.utils import assert_never, catch_and_raise_error, json_decode, json_encode, JSONDecodeError +from karapace.utils import assert_never, intstr_conversion_guard, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Dict, Final, final, Mapping, Sequence import hashlib @@ -395,11 +395,11 @@ class SchemaVersionManager: MINUS_1_SCHEMA_VERSION_TAG: Final = "-1" @classmethod - def latest_schema_tag_condition(cls, version: Version): + def latest_schema_tag_condition(cls, version: Version) -> bool: return (str(version) == cls.LATEST_SCHEMA_VERSION_TAG) or (str(version) == cls.MINUS_1_SCHEMA_VERSION_TAG) @classmethod - @catch_and_raise_error(to_catch=(ValueError,), to_raise=VersionNotFoundException) + @intstr_conversion_guard(to_raise=VersionNotFoundException()) def resolve_version( cls, schema_versions: Mapping[ResolvedVersion, SchemaVersion], @@ -409,11 +409,11 @@ def resolve_version( if cls.latest_schema_tag_condition(version): return max_version if (int(version) <= max_version) and (int(version) >= int(cls.MINUS_1_SCHEMA_VERSION_TAG)): - return ResolvedVersion(version) + return ResolvedVersion(int(version)) return None @classmethod - @catch_and_raise_error(to_catch=(ValueError,), to_raise=InvalidVersion) + @intstr_conversion_guard(to_raise=InvalidVersion()) def validate_version(cls, version: Version) -> Version | str | None: if cls.latest_schema_tag_condition(version): return cls.LATEST_SCHEMA_VERSION_TAG diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index aa6f1dabc..9521534e4 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -11,7 +11,6 @@ from karapace.dependency import Dependency from karapace.errors import ( IncompatibleSchema, - InvalidVersion, ReferenceExistsException, SchemasNotFoundException, SchemaVersionNotSoftDeletedException, @@ -26,11 +25,18 @@ from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher -from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import ( + ParsedTypedSchema, + SchemaType, + SchemaVersion, + SchemaVersionManager, + TypedSchema, + ValidatedTypedSchema, +) from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject, Version -from typing import Mapping, Sequence +from typing import Sequence import asyncio import logging @@ -38,31 +44,6 @@ LOG = logging.getLogger(__name__) -def _resolve_version( - schema_versions: Mapping[ResolvedVersion, SchemaVersion], - version: Version, -) -> ResolvedVersion: - max_version = max(schema_versions) - if isinstance(version, str) and version == "latest": - return max_version - resolved_version = ResolvedVersion(int(version)) - if resolved_version <= max_version: - return resolved_version - raise VersionNotFoundException() - - -def validate_version(version: Version) -> Version: - try: - version_number = int(version) - if version_number > 0: - return version - raise InvalidVersion(f"Invalid version {version_number}") - except ValueError as ex: - if version == "latest": - return version - raise InvalidVersion(f"Invalid version {version}") from ex - - class KarapaceSchemaRegistry: def __init__(self, config: Config) -> None: self.config = config @@ -82,6 +63,7 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) + self.schema_version_manager = SchemaVersionManager() self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() @@ -222,7 +204,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version, for version_id, schema_version in schema_versions.items() if schema_version.deleted is False } - resolved_version = _resolve_version(schema_versions=schema_versions, version=version) + resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) schema_version = schema_versions.get(resolved_version, None) if not schema_version: @@ -261,11 +243,11 @@ def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[R return schemas def subject_version_get(self, subject: Subject, version: Version, *, include_deleted: bool = False) -> JsonObject: - validate_version(version) + self.schema_version_manager.validate_version(version) schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = _resolve_version(schema_versions=schema_versions, version=version) + resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: @@ -293,11 +275,11 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del async def subject_version_referencedby_get( self, subject: Subject, version: Version, *, include_deleted: bool = False ) -> list: - validate_version(version) + self.schema_version_manager.validate_version(version) schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = _resolve_version(schema_versions=schema_versions, version=version) + resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: raise VersionNotFoundException() diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 0216b5e5e..d972dd2b6 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -31,9 +31,16 @@ from karapace.karapace import KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import ( + ParsedTypedSchema, + SchemaType, + SchemaVersion, + SchemaVersionManager, + TypedSchema, + ValidatedTypedSchema, +) from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.schema_registry import KarapaceSchemaRegistry, validate_version +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject from karapace.utils import JSONDecodeError from typing import Any @@ -814,7 +821,7 @@ async def subject_version_delete( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None ) -> None: self._check_authorization(user, Operation.Write, f"Subject:{subject}") - version = validate_version(version) + version = SchemaVersionManager.validate_version(version) permanent = request.query.get("permanent", "false").lower() == "true" are_we_master, master_url = await self.schema_registry.get_master() diff --git a/karapace/utils.py b/karapace/utils.py index b544a5e08..b2d6b55f9 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -215,7 +215,7 @@ def convert_to_int(object_: dict, key: str, content_type: str) -> None: ) -def catch_and_raise_error(to_catch: tuple[Exception], to_raise: Exception): +def intstr_conversion_guard(to_raise: BaseException): def wrapper(f): @functools.wraps(f) def catcher(*args, **kwargs): @@ -224,7 +224,7 @@ def catcher(*args, **kwargs): if not value: raise to_raise return value - except to_catch as exc: + except ValueError as exc: raise to_raise from exc return catcher diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py index 391f12630..9bbdbe981 100644 --- a/tests/unit/test_schema_models.py +++ b/tests/unit/test_schema_models.py @@ -10,12 +10,12 @@ from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, SchemaVersionManager, TypedSchema from karapace.schema_type import SchemaType from karapace.typing import ResolvedVersion, Version -from typing import Callable +from typing import Any, Callable, Dict, Optional import pytest # Schema versions factory fixture type -SVFCallable = Callable[[None], Callable[[ResolvedVersion, dict[str]], dict[ResolvedVersion, SchemaVersion]]] +SVFCallable = Callable[[None], Callable[[ResolvedVersion, Dict[str, Any]], Dict[ResolvedVersion, SchemaVersion]]] class TestSchemaVersionManager: @@ -32,10 +32,9 @@ def schema_versions_factory( self, avro_schema: str, avro_schema_parsed: AvroSchema, - ) -> Callable[[ResolvedVersion, dict[str]], dict[ResolvedVersion, SchemaVersion]]: - def schema_versions(resolved_version: int, schema_version_data: dict[str] | None = None): - if schema_version_data is None: - schema_version_data = dict() + ) -> Callable[[ResolvedVersion, Dict[str, Any]], Dict[ResolvedVersion, SchemaVersion]]: + def schema_versions(resolved_version: int, schema_version_data: Optional[Dict[str, Any]] = None): + schema_version_data = schema_version_data or dict() base_schema_version_data = dict( subject="test-topic", version=resolved_version, @@ -68,7 +67,7 @@ def test_schema_version_manager_latest_schema_tag_condition( assert SchemaVersionManager.latest_schema_tag_condition(version) is is_latest @pytest.mark.parametrize("invalid_version", ["invalid_version", 0]) - def test_schema_version_manager_validate_version_invalid(self, invalid_version: str | int): + def test_schema_version_manager_validate_version_invalid(self, invalid_version: Version): with pytest.raises(InvalidVersion): SchemaVersionManager.validate_version(invalid_version) @@ -102,7 +101,7 @@ def test_schema_version_manager_resolve_version( @pytest.mark.parametrize("invalid_version", ["invalid_version", 0, -20, "-10", "100", 2000]) def test_schema_version_manager_resolve_version_invalid( self, - invalid_version: str | int, + invalid_version: Version, schema_versions_factory: SVFCallable, ): schema_versions = dict() diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 836f96c32..2bc8c898e 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -5,25 +5,18 @@ See LICENSE for details """ -from karapace.utils import catch_and_raise_error +from karapace.utils import intstr_conversion_guard import pytest -def test_catch_and_raise_error(): +def test_intstr_conversion_guard(): class RaiseMe(Exception): pass - @catch_and_raise_error(to_catch=(ValueError,), to_raise=RaiseMe) - def v(): + @intstr_conversion_guard(to_raise=RaiseMe) + def raise_value_error(): int("not a number") with pytest.raises(RaiseMe): - v() - - @catch_and_raise_error(to_catch=(ZeroDivisionError,), to_raise=RaiseMe) - def z(): - _ = 100 / 0 - - with pytest.raises(RaiseMe): - z() + raise_value_error() From 702f1423b616b2aef54cdacbd3269f63ca35b21d Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 29 May 2024 16:13:31 +0200 Subject: [PATCH 04/13] (refactor) do not instantiate `SchemaVersionManager` in `KarapaceSchemaRegistry` --- karapace/schema_registry.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 9521534e4..733e18d62 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -63,7 +63,6 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) - self.schema_version_manager = SchemaVersionManager() self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() @@ -204,7 +203,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version, for version_id, schema_version in schema_versions.items() if schema_version.deleted is False } - resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) + resolved_version = SchemaVersionManager.resolve_version(schema_versions=schema_versions, version=version) schema_version = schema_versions.get(resolved_version, None) if not schema_version: @@ -243,11 +242,11 @@ def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[R return schemas def subject_version_get(self, subject: Subject, version: Version, *, include_deleted: bool = False) -> JsonObject: - self.schema_version_manager.validate_version(version) + SchemaVersionManager.validate_version(version) schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) + resolved_version = SchemaVersionManager.resolve_version(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: @@ -275,11 +274,11 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del async def subject_version_referencedby_get( self, subject: Subject, version: Version, *, include_deleted: bool = False ) -> list: - self.schema_version_manager.validate_version(version) + SchemaVersionManager.validate_version(version) schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = self.schema_version_manager.resolve_version(schema_versions=schema_versions, version=version) + resolved_version = SchemaVersionManager.resolve_version(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: raise VersionNotFoundException() From 7f0b3402972de31db8aec4554954e5d1459f6ef5 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 3 Jun 2024 14:13:23 +0200 Subject: [PATCH 05/13] feature,tests: added `Version` class - this will replace the previously added `SchemaVersionManager` - it will handle the resolution and validation of versions - easy work between numeric and string version tags, keeping only the numeric version internally --- karapace/schema_models.py | 59 ++++++++++---------- karapace/utils.py | 2 +- tests/unit/test_schema_models.py | 94 +++++++++++++++++++------------- tests/unit/test_utils.py | 6 +- 4 files changed, 90 insertions(+), 71 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index d9020c351..7d544b7cc 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -23,9 +23,9 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_references import Reference from karapace.schema_type import SchemaType -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version -from karapace.utils import assert_never, intstr_conversion_guard, json_decode, json_encode, JSONDecodeError -from typing import Any, cast, Dict, Final, final, Mapping, Sequence +from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.utils import assert_never, intstr_version_guard, json_decode, json_encode, JSONDecodeError +from typing import Any, cast, ClassVar, Dict, Final, final, Mapping, Sequence import hashlib import logging @@ -390,33 +390,36 @@ class SchemaVersion: references: Sequence[Reference] | None -class SchemaVersionManager: - LATEST_SCHEMA_VERSION_TAG: Final = "latest" - MINUS_1_SCHEMA_VERSION_TAG: Final = "-1" +@dataclass(slots=True, frozen=True) +class VersionTEMP: + tag: str | int - @classmethod - def latest_schema_tag_condition(cls, version: Version) -> bool: - return (str(version) == cls.LATEST_SCHEMA_VERSION_TAG) or (str(version) == cls.MINUS_1_SCHEMA_VERSION_TAG) + LATEST_VERSION_TAG: Final | ClassVar[str] = "latest" + MINUS_1_VERSION_TAG: Final | ClassVar[str] = "-1" - @classmethod - @intstr_conversion_guard(to_raise=VersionNotFoundException()) - def resolve_version( - cls, - schema_versions: Mapping[ResolvedVersion, SchemaVersion], - version: Version, - ) -> ResolvedVersion | None: + @property + def is_latest(self) -> bool: + return (str(self.tag) == self.LATEST_VERSION_TAG) or (str(self.tag) == self.MINUS_1_VERSION_TAG) + + @property + def version(self) -> int: + version = int(self.tag) + if version <= 0: + raise ValueError("Only numeric version tags are directly resolved") + return version + + @property + @intstr_version_guard(to_raise=InvalidVersion()) + def resolved(self) -> str: + if self.is_latest: + return self.LATEST_VERSION_TAG + return str(self.version) + + @intstr_version_guard(to_raise=VersionNotFoundException()) + def resolve_from_schema_versions(self, schema_versions: Mapping[int, SchemaVersion]) -> int | None: max_version = max(schema_versions) - if cls.latest_schema_tag_condition(version): + if self.is_latest: return max_version - if (int(version) <= max_version) and (int(version) >= int(cls.MINUS_1_SCHEMA_VERSION_TAG)): - return ResolvedVersion(int(version)) - return None - - @classmethod - @intstr_conversion_guard(to_raise=InvalidVersion()) - def validate_version(cls, version: Version) -> Version | str | None: - if cls.latest_schema_tag_condition(version): - return cls.LATEST_SCHEMA_VERSION_TAG - if int(version) > 0: - return version + if self.version <= max_version: + return self.version return None diff --git a/karapace/utils.py b/karapace/utils.py index b2d6b55f9..ffe73401b 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -215,7 +215,7 @@ def convert_to_int(object_: dict, key: str, content_type: str) -> None: ) -def intstr_conversion_guard(to_raise: BaseException): +def intstr_version_guard(to_raise: BaseException): def wrapper(f): @functools.wraps(f) def catcher(*args, **kwargs): diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py index 9bbdbe981..5bf1ab157 100644 --- a/tests/unit/test_schema_models.py +++ b/tests/unit/test_schema_models.py @@ -7,18 +7,21 @@ from avro.schema import Schema as AvroSchema from karapace.errors import InvalidVersion, VersionNotFoundException -from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, SchemaVersionManager, TypedSchema +from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema, VersionTEMP from karapace.schema_type import SchemaType -from karapace.typing import ResolvedVersion, Version -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, Optional, Union import pytest # Schema versions factory fixture type -SVFCallable = Callable[[None], Callable[[ResolvedVersion, Dict[str, Any]], Dict[ResolvedVersion, SchemaVersion]]] +SVFCallable = Callable[[None], Callable[[int, Dict[str, Any]], Dict[int, SchemaVersion]]] -class TestSchemaVersionManager: +class TestVersionTEMP: + @pytest.fixture + def version(self): + return VersionTEMP(1) + @pytest.fixture def avro_schema(self) -> str: return '{"type":"record","name":"testRecord","fields":[{"type":"string","name":"test"}]}' @@ -32,7 +35,7 @@ def schema_versions_factory( self, avro_schema: str, avro_schema_parsed: AvroSchema, - ) -> Callable[[ResolvedVersion, Dict[str, Any]], Dict[ResolvedVersion, SchemaVersion]]: + ) -> Callable[[int, Dict[str, Any]], Dict[int, SchemaVersion]]: def schema_versions(resolved_version: int, schema_version_data: Optional[Dict[str, Any]] = None): schema_version_data = schema_version_data or dict() base_schema_version_data = dict( @@ -47,64 +50,77 @@ def schema_versions(resolved_version: int, schema_version_data: Optional[Dict[st ), references=None, ) - return {ResolvedVersion(resolved_version): SchemaVersion(**{**base_schema_version_data, **schema_version_data})} + return {resolved_version: SchemaVersion(**{**base_schema_version_data, **schema_version_data})} return schema_versions - def test_schema_version_manager_tags(self): - assert SchemaVersionManager.LATEST_SCHEMA_VERSION_TAG == "latest" - assert SchemaVersionManager.MINUS_1_SCHEMA_VERSION_TAG == "-1" + def test_tags(self, version: VersionTEMP): + assert version.LATEST_VERSION_TAG == "latest" + assert version.MINUS_1_VERSION_TAG == "-1" @pytest.mark.parametrize( "version, is_latest", - [("latest", True), ("-1", True), ("-20", False), (10, False)], + [(VersionTEMP("latest"), True), (VersionTEMP("-1"), True), (VersionTEMP("-20"), False), (VersionTEMP(10), False)], ) - def test_schema_version_manager_latest_schema_tag_condition( - self, - version: Version, - is_latest: bool, - ): - assert SchemaVersionManager.latest_schema_tag_condition(version) is is_latest - - @pytest.mark.parametrize("invalid_version", ["invalid_version", 0]) - def test_schema_version_manager_validate_version_invalid(self, invalid_version: Version): - with pytest.raises(InvalidVersion): - SchemaVersionManager.validate_version(invalid_version) + def test_is_latest(self, version: VersionTEMP, is_latest: bool): + assert version.is_latest is is_latest @pytest.mark.parametrize( - "version, validated_version", - [("latest", "latest"), (-1, "latest"), ("-1", "latest"), (10, 10)], + "version, resolved_version", + [ + (VersionTEMP("latest"), "latest"), + (VersionTEMP(-1), "latest"), + (VersionTEMP("-1"), "latest"), + (VersionTEMP(10), "10"), + ], ) - def test_schema_version_manager_validate_version( - self, - version: Version, - validated_version: Version, - ): - assert SchemaVersionManager.validate_version(version) == validated_version + def test_resolved(self, version: VersionTEMP, resolved_version: Union[str, int]): + assert version.resolved == resolved_version + + @pytest.mark.parametrize("invalid_version", [VersionTEMP("invalid_version"), VersionTEMP(0)]) + def test_resolved_invalid(self, invalid_version: VersionTEMP): + with pytest.raises(InvalidVersion): + assert not invalid_version.resolved @pytest.mark.parametrize( "version, resolved_version", - [("-1", 10), (-1, 10), (1, 1), (10, 10), ("latest", 10)], + [ + (VersionTEMP("-1"), 10), + (VersionTEMP(-1), 10), + (VersionTEMP(1), 1), + (VersionTEMP(10), 10), + (VersionTEMP("latest"), 10), + ], ) - def test_schema_version_manager_resolve_version( + def test_resolve_from_schema_versions( self, - version: Version, - resolved_version: ResolvedVersion, + version: VersionTEMP, + resolved_version: int, schema_versions_factory: SVFCallable, ): schema_versions = dict() schema_versions.update(schema_versions_factory(1)) schema_versions.update(schema_versions_factory(2)) schema_versions.update(schema_versions_factory(10)) - assert SchemaVersionManager.resolve_version(schema_versions, version) == resolved_version + assert version.resolve_from_schema_versions(schema_versions) == resolved_version - @pytest.mark.parametrize("invalid_version", ["invalid_version", 0, -20, "-10", "100", 2000]) - def test_schema_version_manager_resolve_version_invalid( + @pytest.mark.parametrize( + "invalid_version", + [ + VersionTEMP("invalid_version"), + VersionTEMP(0), + VersionTEMP(-20), + VersionTEMP("-10"), + VersionTEMP("100"), + VersionTEMP(2000), + ], + ) + def test_resolve_from_schema_versions_invalid( self, - invalid_version: Version, + invalid_version: VersionTEMP, schema_versions_factory: SVFCallable, ): schema_versions = dict() schema_versions.update(schema_versions_factory(1)) with pytest.raises(VersionNotFoundException): - SchemaVersionManager.resolve_version(schema_versions, invalid_version) + invalid_version.resolve_from_schema_versions(schema_versions) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 2bc8c898e..d32af82ab 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -5,16 +5,16 @@ See LICENSE for details """ -from karapace.utils import intstr_conversion_guard +from karapace.utils import intstr_version_guard import pytest -def test_intstr_conversion_guard(): +def test_intstr_version_guard(): class RaiseMe(Exception): pass - @intstr_conversion_guard(to_raise=RaiseMe) + @intstr_version_guard(to_raise=RaiseMe) def raise_value_error(): int("not a number") From c27af64f8e2d8978bb37f5dd63727b2e37b07698 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 3 Jun 2024 15:53:16 +0200 Subject: [PATCH 06/13] refactor: use `Version` class in `KarapaceSchemaRegistry` & API controller - update the signature of `subject_version_get` - update the signature of `subject_version_delete_local` - update the signature of `subject_version_referencedby_get` - change the various references/dependencies to build the version class --- karapace/schema_models.py | 16 +++++++++++----- karapace/schema_registry.py | 21 ++++++++++----------- karapace/schema_registry_apis.py | 31 ++++++++++++++++++------------- tests/unit/test_schema_models.py | 14 +++++++++----- 4 files changed, 48 insertions(+), 34 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 7d544b7cc..43216a3b3 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -397,16 +397,22 @@ class VersionTEMP: LATEST_VERSION_TAG: Final | ClassVar[str] = "latest" MINUS_1_VERSION_TAG: Final | ClassVar[str] = "-1" + def validate(self): + try: + version = int(self.tag) + if (version < int(self.MINUS_1_VERSION_TAG)) or (version == 0): + raise InvalidVersion(f"Invalid version {self.tag}") + except ValueError as exc: + if not self.is_latest: + raise InvalidVersion(f"Invalid version {self.tag}") from exc + @property def is_latest(self) -> bool: return (str(self.tag) == self.LATEST_VERSION_TAG) or (str(self.tag) == self.MINUS_1_VERSION_TAG) @property def version(self) -> int: - version = int(self.tag) - if version <= 0: - raise ValueError("Only numeric version tags are directly resolved") - return version + return int(self.tag) @property @intstr_version_guard(to_raise=InvalidVersion()) @@ -420,6 +426,6 @@ def resolve_from_schema_versions(self, schema_versions: Mapping[int, SchemaVersi max_version = max(schema_versions) if self.is_latest: return max_version - if self.version <= max_version: + if self.version <= max_version and self.version in schema_versions: return self.version return None diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 733e18d62..df2c57944 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -29,13 +29,13 @@ ParsedTypedSchema, SchemaType, SchemaVersion, - SchemaVersionManager, TypedSchema, ValidatedTypedSchema, + VersionTEMP, ) from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference -from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject, Version +from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject from typing import Sequence import asyncio @@ -194,16 +194,16 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ return version_list - async def subject_version_delete_local(self, subject: Subject, version: Version, permanent: bool) -> ResolvedVersion: + async def subject_version_delete_local(self, subject: Subject, version: VersionTEMP, permanent: bool) -> int: async with self.schema_lock: schema_versions = self.subject_get(subject, include_deleted=True) - if not permanent and isinstance(version, str) and version == "latest": + if not permanent and version.is_latest: schema_versions = { version_id: schema_version for version_id, schema_version in schema_versions.items() if schema_version.deleted is False } - resolved_version = SchemaVersionManager.resolve_version(schema_versions=schema_versions, version=version) + resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions) schema_version = schema_versions.get(resolved_version, None) if not schema_version: @@ -241,12 +241,11 @@ def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[R raise SchemasNotFoundException return schemas - def subject_version_get(self, subject: Subject, version: Version, *, include_deleted: bool = False) -> JsonObject: - SchemaVersionManager.validate_version(version) + def subject_version_get(self, subject: Subject, version: VersionTEMP, *, include_deleted: bool = False) -> JsonObject: schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = SchemaVersionManager.resolve_version(schema_versions=schema_versions, version=version) + resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: @@ -272,13 +271,13 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del return ret async def subject_version_referencedby_get( - self, subject: Subject, version: Version, *, include_deleted: bool = False + self, subject: Subject, version: VersionTEMP, *, include_deleted: bool = False ) -> list: - SchemaVersionManager.validate_version(version) + version.validate() schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = SchemaVersionManager.resolve_version(schema_versions=schema_versions, version=version) + resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: raise VersionNotFoundException() diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index d972dd2b6..eaddfbbff 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -35,9 +35,9 @@ ParsedTypedSchema, SchemaType, SchemaVersion, - SchemaVersionManager, TypedSchema, ValidatedTypedSchema, + VersionTEMP, ) from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry @@ -386,6 +386,7 @@ async def compatibility_check( self._check_authorization(user, Operation.Read, f"Subject:{subject}") + version = VersionTEMP(version) body = request.json schema_type = self._validate_schema_type(content_type=content_type, data=body) references = self._validate_references(content_type, schema_type, body) @@ -409,12 +410,12 @@ async def compatibility_check( try: old = self.schema_registry.subject_version_get(subject=subject, version=version) except InvalidVersion: - self._invalid_version(content_type, version) + self._invalid_version(content_type, version.tag) except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", + "message": f"Version {version.tag} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -790,6 +791,7 @@ async def subject_version_get( ) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") + version = VersionTEMP(version) deleted = request.query.get("deleted", "false").lower() == "true" try: subject_data = self.schema_registry.subject_version_get(subject, version, include_deleted=deleted) @@ -809,19 +811,20 @@ async def subject_version_get( self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", + "message": f"Version {version.tag} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, ) except InvalidVersion: - self._invalid_version(content_type, version) + self._invalid_version(content_type, version.tag) async def subject_version_delete( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None ) -> None: self._check_authorization(user, Operation.Write, f"Subject:{subject}") - version = SchemaVersionManager.validate_version(version) + version = VersionTEMP(version) + version.validate() permanent = request.query.get("permanent", "false").lower() == "true" are_we_master, master_url = await self.schema_registry.get_master() @@ -842,7 +845,7 @@ async def subject_version_delete( self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", + "message": f"Version {version.tag} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -863,7 +866,7 @@ async def subject_version_delete( body={ "error_code": SchemaErrorCodes.SCHEMAVERSION_NOT_SOFT_DELETED.value, "message": ( - f"Subject '{subject}' Version {version} was not deleted " + f"Subject '{subject}' Version {version.tag} was not deleted " "first before being permanently deleted" ), }, @@ -885,7 +888,7 @@ async def subject_version_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" + url = f"{master_url}/subjects/{subject}/versions/{version.tag}?permanent={permanent}" await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") async def subject_version_schema_get( @@ -893,16 +896,17 @@ async def subject_version_schema_get( ) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") + version = VersionTEMP(version) try: subject_data = self.schema_registry.subject_version_get(subject, version) self.r(subject_data["schema"], content_type) except InvalidVersion: - self._invalid_version(content_type, version) + self._invalid_version(content_type, version.tag) except VersionNotFoundException: self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", + "message": f"Version {version.tag} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -920,6 +924,7 @@ async def subject_version_schema_get( async def subject_version_referencedby_get(self, content_type, *, subject, version, user: User | None = None): self._check_authorization(user, Operation.Read, f"Subject:{subject}") + version = VersionTEMP(version) try: referenced_by = await self.schema_registry.subject_version_referencedby_get(subject, version) except (SubjectNotFoundException, SchemasNotFoundException): @@ -935,13 +940,13 @@ async def subject_version_referencedby_get(self, content_type, *, subject, versi self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version} not found.", + "message": f"Version {version.tag} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, ) except InvalidVersion: - self._invalid_version(content_type, version) + self._invalid_version(content_type, version.tag) self.r(referenced_by, content_type, status=HTTPStatus.OK) diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py index 5bf1ab157..3e24f656e 100644 --- a/tests/unit/test_schema_models.py +++ b/tests/unit/test_schema_models.py @@ -65,6 +65,15 @@ def test_tags(self, version: VersionTEMP): def test_is_latest(self, version: VersionTEMP, is_latest: bool): assert version.is_latest is is_latest + @pytest.mark.parametrize("version", [VersionTEMP("latest"), VersionTEMP(10), VersionTEMP(-1)]) + def test_validate(self, version: VersionTEMP): + version.validate() + + @pytest.mark.parametrize("invalid_version", [VersionTEMP("invalid_version"), VersionTEMP(0), VersionTEMP(-20)]) + def test_validate_invalid(self, invalid_version: VersionTEMP): + with pytest.raises(InvalidVersion): + assert not invalid_version.validate() + @pytest.mark.parametrize( "version, resolved_version", [ @@ -77,11 +86,6 @@ def test_is_latest(self, version: VersionTEMP, is_latest: bool): def test_resolved(self, version: VersionTEMP, resolved_version: Union[str, int]): assert version.resolved == resolved_version - @pytest.mark.parametrize("invalid_version", [VersionTEMP("invalid_version"), VersionTEMP(0)]) - def test_resolved_invalid(self, invalid_version: VersionTEMP): - with pytest.raises(InvalidVersion): - assert not invalid_version.resolved - @pytest.mark.parametrize( "version, resolved_version", [ From 833d8880b502122f1dade7d7ce97763dcf930cd6 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 3 Jun 2024 16:02:53 +0200 Subject: [PATCH 07/13] refactor: dropped usage of `ResolvedVersion` - replace `ResolvedVersion` references with builtin `int` to mimic the initial intention and encapsulate the `str`/`int` version logic to the `Version` class. --- karapace/errors.py | 3 +-- karapace/in_memory_database.py | 22 +++++++++++----------- karapace/schema_models.py | 4 ++-- karapace/schema_reader.py | 4 ++-- karapace/schema_references.py | 10 +++++----- karapace/schema_registry.py | 16 ++++++++-------- karapace/schema_registry_apis.py | 6 ++---- karapace/serialization.py | 20 ++++++++++---------- karapace/typing.py | 1 - tests/unit/test_protobuf_serialization.py | 10 +++++----- tests/unit/test_serialization.py | 10 +++------- 11 files changed, 49 insertions(+), 57 deletions(-) diff --git a/karapace/errors.py b/karapace/errors.py index 9d240070c..7853f2de1 100644 --- a/karapace/errors.py +++ b/karapace/errors.py @@ -3,7 +3,6 @@ See LICENSE for details """ from karapace.schema_references import Referents -from karapace.typing import ResolvedVersion class VersionNotFoundException(Exception): @@ -55,7 +54,7 @@ class SubjectNotSoftDeletedException(Exception): class ReferenceExistsException(Exception): - def __init__(self, referenced_by: Referents, version: ResolvedVersion) -> None: + def __init__(self, referenced_by: Referents, version: int) -> None: super().__init__() self.version = version self.referenced_by = referenced_by diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index 704a1eb6a..3c7facc4c 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, field from karapace.schema_models import SchemaVersion, TypedSchema from karapace.schema_references import Reference, Referents -from karapace.typing import ResolvedVersion, SchemaId, Subject +from karapace.typing import SchemaId, Subject from threading import Lock, RLock from typing import Iterable, Sequence @@ -20,7 +20,7 @@ @dataclass class SubjectData: - schemas: dict[ResolvedVersion, SchemaVersion] = field(default_factory=dict) + schemas: dict[int, SchemaVersion] = field(default_factory=dict) compatibility: str | None = None @@ -31,7 +31,7 @@ def __init__(self) -> None: self.subjects: dict[Subject, SubjectData] = {} self.schemas: dict[SchemaId, TypedSchema] = {} self.schema_lock_thread = RLock() - self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {} + self.referenced_by: dict[tuple[Subject, int], Referents] = {} # Content based deduplication of schemas. This is used to reduce memory # usage when the same schema is produce multiple times to the same or @@ -100,15 +100,15 @@ def _delete_subject_from_schema_id_on_subject(self, *, subject: Subject) -> None def _get_from_hash_cache(self, *, typed_schema: TypedSchema) -> TypedSchema: return self._hash_to_schema.setdefault(typed_schema.fingerprint(), typed_schema) - def get_next_version(self, *, subject: Subject) -> ResolvedVersion: - return ResolvedVersion(max(self.subjects[subject].schemas) + 1) + def get_next_version(self, *, subject: Subject) -> int: + return max(self.subjects[subject].schemas) + 1 def insert_schema_version( self, *, subject: Subject, schema_id: SchemaId, - version: ResolvedVersion, + version: int, deleted: bool, schema: TypedSchema, references: Sequence[Reference] | None, @@ -217,7 +217,7 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]: subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False) ] - def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[ResolvedVersion, SchemaVersion]: + def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[int, SchemaVersion]: if subject not in self.subjects: return {} if include_deleted: @@ -229,7 +229,7 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di if schema_version.deleted is False } - def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None: + def delete_subject(self, *, subject: Subject, version: int) -> None: with self.schema_lock_thread: for schema_version in self.subjects[subject].schemas.values(): if schema_version.version <= version: @@ -241,7 +241,7 @@ def delete_subject_hard(self, *, subject: Subject) -> None: del self.subjects[subject] self._delete_subject_from_schema_id_on_subject(subject=subject) - def delete_subject_schema(self, *, subject: Subject, version: ResolvedVersion) -> None: + def delete_subject_schema(self, *, subject: Subject, version: int) -> None: with self.schema_lock_thread: self.subjects[subject].schemas.pop(version, None) @@ -263,7 +263,7 @@ def num_schema_versions(self) -> tuple[int, int]: soft_deleted_versions += 1 return (live_versions, soft_deleted_versions) - def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, schema_id: SchemaId) -> None: + def insert_referenced_by(self, *, subject: Subject, version: int, schema_id: SchemaId) -> None: with self.schema_lock_thread: referents = self.referenced_by.get((subject, version), None) if referents: @@ -271,7 +271,7 @@ def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, sc else: self.referenced_by[(subject, version)] = Referents([schema_id]) - def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Referents | None: + def get_referenced_by(self, subject: Subject, version: int) -> Referents | None: with self.schema_lock_thread: return self.referenced_by.get((subject, version), None) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 43216a3b3..01a22663a 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -23,7 +23,7 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_references import Reference from karapace.schema_type import SchemaType -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonObject, SchemaId, Subject from karapace.utils import assert_never, intstr_version_guard, json_decode, json_encode, JSONDecodeError from typing import Any, cast, ClassVar, Dict, Final, final, Mapping, Sequence @@ -383,7 +383,7 @@ def parse( @dataclass class SchemaVersion: subject: Subject - version: ResolvedVersion + version: int deleted: bool schema_id: SchemaId schema: TypedSchema diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 88600f70c..d863fbe9e 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -39,7 +39,7 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonObject, SchemaId, Subject from karapace.utils import json_decode, JSONDecodeError from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -602,7 +602,7 @@ def remove_referenced_by( def get_referenced_by( self, subject: Subject, - version: ResolvedVersion, + version: int, ) -> Referents | None: return self.database.get_referenced_by(subject, version) diff --git a/karapace/schema_references.py b/karapace/schema_references.py index 497bd61b1..746a583ba 100644 --- a/karapace/schema_references.py +++ b/karapace/schema_references.py @@ -8,7 +8,7 @@ from __future__ import annotations from karapace.dataclasses import default_dataclass -from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonData, JsonObject, SchemaId, Subject from typing import cast, List, Mapping, NewType, TypeVar Referents = NewType("Referents", List[SchemaId]) @@ -36,7 +36,7 @@ class LatestVersionReference: name: str subject: Subject - def resolve(self, version: ResolvedVersion) -> Reference: + def resolve(self, version: int) -> Reference: return Reference( name=self.name, subject=self.subject, @@ -48,7 +48,7 @@ def resolve(self, version: ResolvedVersion) -> Reference: class Reference: name: str subject: Subject - version: ResolvedVersion + version: int def __post_init__(self) -> None: assert self.version != -1 @@ -68,7 +68,7 @@ def from_dict(data: JsonObject) -> Reference: return Reference( name=str(data["name"]), subject=Subject(str(data["subject"])), - version=ResolvedVersion(cast(int, data["version"])), + version=int(cast(int, data["version"])), ) @@ -88,6 +88,6 @@ def reference_from_mapping( else Reference( name=name, subject=subject, - version=ResolvedVersion(version), + version=int(version), ) ) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index df2c57944..de897092c 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -35,7 +35,7 @@ ) from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference -from karapace.typing import JsonObject, Mode, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonObject, Mode, SchemaId, Subject from typing import Sequence import asyncio @@ -134,7 +134,7 @@ def schemas_get(self, schema_id: SchemaId, *, fetch_max_id: bool = False) -> Typ return schema - async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ResolvedVersion]: + async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[int]: async with self.schema_lock: schema_versions = self.subject_get(subject, include_deleted=True) @@ -180,7 +180,7 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ try: schema_versions_live = self.subject_get(subject, include_deleted=False) except SchemasNotFoundException: - latest_version_id = ResolvedVersion(0) + latest_version_id = int(0) version_list = [] else: version_list = list(schema_versions_live) @@ -231,7 +231,7 @@ async def subject_version_delete_local(self, subject: Subject, version: VersionT self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references) return resolved_version - def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[ResolvedVersion, SchemaVersion]: + def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[int, SchemaVersion]: subject_found = self.database.find_subject(subject=subject) if not subject_found: raise SubjectNotFoundException() @@ -319,7 +319,7 @@ async def write_new_schema_local( all_schema_versions = self.database.find_subject_schemas(subject=subject, include_deleted=True) if not all_schema_versions: - version = ResolvedVersion(1) + version = int(1) schema_id = self.database.get_schema_id(new_schema) LOG.debug( "Registering new subject: %r, id: %r with version: %r with schema %r, schema_id: %r", @@ -408,8 +408,8 @@ async def write_new_schema_local( def get_subject_versions_for_schema( self, schema_id: SchemaId, *, include_deleted: bool = False - ) -> list[dict[str, Subject | ResolvedVersion]]: - subject_versions: list[dict[str, Subject | ResolvedVersion]] = [] + ) -> list[dict[str, Subject | int]]: + subject_versions: list[dict[str, Subject | int]] = [] schema_versions = self.database.find_schema_versions_by_schema_id( schema_id=schema_id, include_deleted=include_deleted, @@ -467,7 +467,7 @@ def resolve_references( ) -> tuple[Sequence[Reference], dict[str, Dependency]] | tuple[None, None]: return self.schema_reader.resolve_references(references) if references else (None, None) - def send_delete_subject_message(self, subject: Subject, version: ResolvedVersion) -> None: + def send_delete_subject_message(self, subject: Subject, version: int) -> None: key = {"subject": subject, "magic": 0, "keytype": "DELETE_SUBJECT"} value = {"subject": subject, "version": version} self.producer.send_message(key=key, value=value) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index eaddfbbff..c83be6a85 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -41,7 +41,7 @@ ) from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry -from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonData, JsonObject, SchemaId, Subject from karapace.utils import JSONDecodeError from typing import Any @@ -340,9 +340,7 @@ async def close(self) -> None: if self._auth is not None: stack.push_async_callback(self._auth.close) - def _subject_get( - self, subject: str, content_type: str, include_deleted: bool = False - ) -> dict[ResolvedVersion, SchemaVersion]: + def _subject_get(self, subject: str, content_type: str, include_deleted: bool = False) -> dict[int, SchemaVersion]: try: schema_versions = self.schema_registry.subject_get(subject, include_deleted) except SubjectNotFoundException: diff --git a/karapace/serialization.py b/karapace/serialization.py index 3164283a5..8797dd780 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -18,7 +18,7 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType +from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType from karapace.utils import json_decode, json_encode from typing import Any, Callable, MutableMapping from urllib.parse import quote @@ -131,9 +131,9 @@ async def post_new_schema( async def _get_schema_recursive( self, subject: Subject, - explored_schemas: set[tuple[Subject, ResolvedVersion | None]], - version: ResolvedVersion | None = None, - ) -> tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]: + explored_schemas: set[tuple[Subject, int | None]], + version: int | None = None, + ) -> tuple[SchemaId, ValidatedTypedSchema, int]: if (subject, version) in explored_schemas: raise InvalidSchema( f"The schema has at least a cycle in dependencies, " @@ -174,7 +174,7 @@ async def _get_schema_recursive( references=references, dependencies=dependencies, ), - ResolvedVersion(json_result["version"]), + int(json_result["version"]), ) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e @@ -183,21 +183,21 @@ async def _get_schema_recursive( async def get_schema( self, subject: Subject, - version: ResolvedVersion | None = None, - ) -> tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]: + version: int | None = None, + ) -> tuple[SchemaId, ValidatedTypedSchema, int]: """ Retrieves the schema and its dependencies for the specified subject. Args: subject (Subject): The subject for which to retrieve the schema. - version (Optional[ResolvedVersion]): The specific version of the schema to retrieve. + version (Optional[int]): The specific version of the schema to retrieve. If None, the latest available schema will be returned. Returns: - Tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]: A tuple containing: + Tuple[SchemaId, ValidatedTypedSchema, int]: A tuple containing: - SchemaId: The ID of the retrieved schema. - ValidatedTypedSchema: The retrieved schema, validated and typed. - - ResolvedVersion: The version of the schema that was retrieved. + - int: The version of the schema that was retrieved. """ return await self._get_schema_recursive(subject, set(), version) diff --git a/karapace/typing.py b/karapace/typing.py index 2d1a2f180..0132adcea 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -18,7 +18,6 @@ Subject = NewType("Subject", str) Version = Union[int, str] -ResolvedVersion = NewType("ResolvedVersion", int) # note: the SchemaID is a unique id among all the schemas (and each version should be assigned to a different id) # basically the same SchemaID refer always to the same TypedSchema. SchemaId = NewType("SchemaId", int) diff --git a/tests/unit/test_protobuf_serialization.py b/tests/unit/test_protobuf_serialization.py index db039c64f..b7b84839e 100644 --- a/tests/unit/test_protobuf_serialization.py +++ b/tests/unit/test_protobuf_serialization.py @@ -14,7 +14,7 @@ SchemaRegistrySerializer, START_BYTE, ) -from karapace.typing import ResolvedVersion, Subject +from karapace.typing import Subject from pathlib import Path from tests.utils import schema_protobuf, test_fail_objects_protobuf, test_objects_protobuf from unittest.mock import call, Mock @@ -45,7 +45,7 @@ async def test_happy_flow(default_config_path: Path): mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result( - (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), ResolvedVersion(1)) + (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), int(1)) ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future @@ -114,7 +114,7 @@ async def test_happy_flow_references(default_config_path: Path): schema_for_id_one_future.set_result((ref_schema, [Subject("stub")])) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result((1, ref_schema, ResolvedVersion(1))) + get_latest_schema_future.set_result((1, ref_schema, int(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) @@ -201,7 +201,7 @@ async def test_happy_flow_references_two(default_config_path: Path): schema_for_id_one_future.set_result((ref_schema_two, [Subject("mock")])) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result((1, ref_schema_two, ResolvedVersion(1))) + get_latest_schema_future.set_result((1, ref_schema_two, int(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) @@ -221,7 +221,7 @@ async def test_serialization_fails(default_config_path: Path): mock_protobuf_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result( - (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), ResolvedVersion(1)) + (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), int(1)) ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index 62567bc84..d8c6a698f 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -16,7 +16,7 @@ START_BYTE, write_value, ) -from karapace.typing import NameStrategy, ResolvedVersion, Subject, SubjectType +from karapace.typing import NameStrategy, Subject, SubjectType from tests.utils import schema_avro_json, test_objects_avro from unittest.mock import call, Mock @@ -121,9 +121,7 @@ async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySeriali async def test_happy_flow(default_config_path: Path): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result( - (1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), ResolvedVersion(1)) - ) + get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), int(1))) mock_registry_client.get_schema.return_value = get_latest_schema_future schema_for_id_one_future = asyncio.Future() schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) @@ -315,9 +313,7 @@ def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> No async def test_serialization_fails(default_config_path: Path): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result( - (1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), ResolvedVersion(1)) - ) + get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), int(1))) mock_registry_client.get_schema.return_value = get_latest_schema_future serializer = await make_ser_deser(default_config_path, mock_registry_client) From 802e471fdb53c70e87d8df7e0089b666f943d720 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 3 Jun 2024 16:33:48 +0200 Subject: [PATCH 08/13] refactor: drop union version usage for new `Version` class --- karapace/dependency.py | 4 +-- karapace/schema_models.py | 2 +- karapace/schema_registry.py | 15 +++------ karapace/schema_registry_apis.py | 21 +++++------- karapace/typing.py | 2 +- tests/unit/test_schema_models.py | 56 ++++++++++++++++---------------- 6 files changed, 44 insertions(+), 56 deletions(-) diff --git a/karapace/dependency.py b/karapace/dependency.py index 074263af7..52b7e965e 100644 --- a/karapace/dependency.py +++ b/karapace/dependency.py @@ -8,7 +8,7 @@ from __future__ import annotations from karapace.schema_references import Reference -from karapace.typing import JsonData, Subject, Version +from karapace.typing import JsonData, Subject from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -26,7 +26,7 @@ def __init__( self, name: str, subject: Subject, - version: Version, + version: int, target_schema: ValidatedTypedSchema, ) -> None: self.name = name diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 01a22663a..9f42fbfa9 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -391,7 +391,7 @@ class SchemaVersion: @dataclass(slots=True, frozen=True) -class VersionTEMP: +class Version: tag: str | int LATEST_VERSION_TAG: Final | ClassVar[str] = "latest" diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index de897092c..92822e754 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -25,14 +25,7 @@ from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher -from karapace.schema_models import ( - ParsedTypedSchema, - SchemaType, - SchemaVersion, - TypedSchema, - ValidatedTypedSchema, - VersionTEMP, -) +from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Version from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference from karapace.typing import JsonObject, Mode, SchemaId, Subject @@ -194,7 +187,7 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ return version_list - async def subject_version_delete_local(self, subject: Subject, version: VersionTEMP, permanent: bool) -> int: + async def subject_version_delete_local(self, subject: Subject, version: Version, permanent: bool) -> int: async with self.schema_lock: schema_versions = self.subject_get(subject, include_deleted=True) if not permanent and version.is_latest: @@ -241,7 +234,7 @@ def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[i raise SchemasNotFoundException return schemas - def subject_version_get(self, subject: Subject, version: VersionTEMP, *, include_deleted: bool = False) -> JsonObject: + def subject_version_get(self, subject: Subject, version: Version, *, include_deleted: bool = False) -> JsonObject: schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() @@ -271,7 +264,7 @@ def subject_version_get(self, subject: Subject, version: VersionTEMP, *, include return ret async def subject_version_referencedby_get( - self, subject: Subject, version: VersionTEMP, *, include_deleted: bool = False + self, subject: Subject, version: Version, *, include_deleted: bool = False ) -> list: version.validate() schema_versions = self.subject_get(subject, include_deleted=include_deleted) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index c83be6a85..5055d169e 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -31,14 +31,7 @@ from karapace.karapace import KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import ( - ParsedTypedSchema, - SchemaType, - SchemaVersion, - TypedSchema, - ValidatedTypedSchema, - VersionTEMP, -) +from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Version from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import JsonData, JsonObject, SchemaId, Subject @@ -384,7 +377,8 @@ async def compatibility_check( self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = VersionTEMP(version) + version = Version(version) + version.validate() body = request.json schema_type = self._validate_schema_type(content_type=content_type, data=body) references = self._validate_references(content_type, schema_type, body) @@ -789,7 +783,8 @@ async def subject_version_get( ) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = VersionTEMP(version) + version = Version(version) + version.validate() deleted = request.query.get("deleted", "false").lower() == "true" try: subject_data = self.schema_registry.subject_version_get(subject, version, include_deleted=deleted) @@ -821,7 +816,7 @@ async def subject_version_delete( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None ) -> None: self._check_authorization(user, Operation.Write, f"Subject:{subject}") - version = VersionTEMP(version) + version = Version(version) version.validate() permanent = request.query.get("permanent", "false").lower() == "true" @@ -894,7 +889,7 @@ async def subject_version_schema_get( ) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = VersionTEMP(version) + version = Version(version) try: subject_data = self.schema_registry.subject_version_get(subject, version) self.r(subject_data["schema"], content_type) @@ -922,7 +917,7 @@ async def subject_version_schema_get( async def subject_version_referencedby_get(self, content_type, *, subject, version, user: User | None = None): self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = VersionTEMP(version) + version = Version(version) try: referenced_by = await self.schema_registry.subject_version_referencedby_get(subject, version) except (SubjectNotFoundException, SchemasNotFoundException): diff --git a/karapace/typing.py b/karapace/typing.py index 0132adcea..922601056 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -17,7 +17,7 @@ ArgJsonData: TypeAlias = Union[JsonScalar, ArgJsonObject, ArgJsonArray] Subject = NewType("Subject", str) -Version = Union[int, str] + # note: the SchemaID is a unique id among all the schemas (and each version should be assigned to a different id) # basically the same SchemaID refer always to the same TypedSchema. SchemaId = NewType("SchemaId", int) diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py index 3e24f656e..6171ac9dc 100644 --- a/tests/unit/test_schema_models.py +++ b/tests/unit/test_schema_models.py @@ -7,7 +7,7 @@ from avro.schema import Schema as AvroSchema from karapace.errors import InvalidVersion, VersionNotFoundException -from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema, VersionTEMP +from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema, Version from karapace.schema_type import SchemaType from typing import Any, Callable, Dict, Optional, Union @@ -17,10 +17,10 @@ SVFCallable = Callable[[None], Callable[[int, Dict[str, Any]], Dict[int, SchemaVersion]]] -class TestVersionTEMP: +class TestVersion: @pytest.fixture def version(self): - return VersionTEMP(1) + return Version(1) @pytest.fixture def avro_schema(self) -> str: @@ -54,51 +54,51 @@ def schema_versions(resolved_version: int, schema_version_data: Optional[Dict[st return schema_versions - def test_tags(self, version: VersionTEMP): + def test_tags(self, version: Version): assert version.LATEST_VERSION_TAG == "latest" assert version.MINUS_1_VERSION_TAG == "-1" @pytest.mark.parametrize( "version, is_latest", - [(VersionTEMP("latest"), True), (VersionTEMP("-1"), True), (VersionTEMP("-20"), False), (VersionTEMP(10), False)], + [(Version("latest"), True), (Version("-1"), True), (Version("-20"), False), (Version(10), False)], ) - def test_is_latest(self, version: VersionTEMP, is_latest: bool): + def test_is_latest(self, version: Version, is_latest: bool): assert version.is_latest is is_latest - @pytest.mark.parametrize("version", [VersionTEMP("latest"), VersionTEMP(10), VersionTEMP(-1)]) - def test_validate(self, version: VersionTEMP): + @pytest.mark.parametrize("version", [Version("latest"), Version(10), Version(-1)]) + def test_validate(self, version: Version): version.validate() - @pytest.mark.parametrize("invalid_version", [VersionTEMP("invalid_version"), VersionTEMP(0), VersionTEMP(-20)]) - def test_validate_invalid(self, invalid_version: VersionTEMP): + @pytest.mark.parametrize("invalid_version", [Version("invalid_version"), Version(0), Version(-20)]) + def test_validate_invalid(self, invalid_version: Version): with pytest.raises(InvalidVersion): assert not invalid_version.validate() @pytest.mark.parametrize( "version, resolved_version", [ - (VersionTEMP("latest"), "latest"), - (VersionTEMP(-1), "latest"), - (VersionTEMP("-1"), "latest"), - (VersionTEMP(10), "10"), + (Version("latest"), "latest"), + (Version(-1), "latest"), + (Version("-1"), "latest"), + (Version(10), "10"), ], ) - def test_resolved(self, version: VersionTEMP, resolved_version: Union[str, int]): + def test_resolved(self, version: Version, resolved_version: Union[str, int]): assert version.resolved == resolved_version @pytest.mark.parametrize( "version, resolved_version", [ - (VersionTEMP("-1"), 10), - (VersionTEMP(-1), 10), - (VersionTEMP(1), 1), - (VersionTEMP(10), 10), - (VersionTEMP("latest"), 10), + (Version("-1"), 10), + (Version(-1), 10), + (Version(1), 1), + (Version(10), 10), + (Version("latest"), 10), ], ) def test_resolve_from_schema_versions( self, - version: VersionTEMP, + version: Version, resolved_version: int, schema_versions_factory: SVFCallable, ): @@ -111,17 +111,17 @@ def test_resolve_from_schema_versions( @pytest.mark.parametrize( "invalid_version", [ - VersionTEMP("invalid_version"), - VersionTEMP(0), - VersionTEMP(-20), - VersionTEMP("-10"), - VersionTEMP("100"), - VersionTEMP(2000), + Version("invalid_version"), + Version(0), + Version(-20), + Version("-10"), + Version("100"), + Version(2000), ], ) def test_resolve_from_schema_versions_invalid( self, - invalid_version: VersionTEMP, + invalid_version: Version, schema_versions_factory: SVFCallable, ): schema_versions = dict() From 1dc3eacfbc8177df7feb563e85d6cd6a0a40f9a5 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 3 Jun 2024 16:52:27 +0200 Subject: [PATCH 09/13] refactor: validate version on initialization --- karapace/schema_models.py | 3 +++ karapace/schema_registry.py | 1 - karapace/schema_registry_apis.py | 13 ++++++------- tests/unit/test_schema_models.py | 33 +++++++++++++++----------------- 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 9f42fbfa9..997db3a45 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -397,6 +397,9 @@ class Version: LATEST_VERSION_TAG: Final | ClassVar[str] = "latest" MINUS_1_VERSION_TAG: Final | ClassVar[str] = "-1" + def __post_init__(self): + self.validate() + def validate(self): try: version = int(self.tag) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 92822e754..8ab2ce26f 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -266,7 +266,6 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del async def subject_version_referencedby_get( self, subject: Subject, version: Version, *, include_deleted: bool = False ) -> list: - version.validate() schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 5055d169e..10e80d618 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -377,12 +377,11 @@ async def compatibility_check( self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = Version(version) - version.validate() body = request.json schema_type = self._validate_schema_type(content_type=content_type, data=body) references = self._validate_references(content_type, schema_type, body) try: + version = Version(version) references, new_schema_dependencies = self.schema_registry.resolve_references(references) new_schema = ValidatedTypedSchema.parse( schema_type=schema_type, @@ -783,10 +782,9 @@ async def subject_version_get( ) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = Version(version) - version.validate() deleted = request.query.get("deleted", "false").lower() == "true" try: + version = Version(version) subject_data = self.schema_registry.subject_version_get(subject, version, include_deleted=deleted) if "compatibility" in subject_data: del subject_data["compatibility"] @@ -816,13 +814,12 @@ async def subject_version_delete( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None ) -> None: self._check_authorization(user, Operation.Write, f"Subject:{subject}") - version = Version(version) - version.validate() permanent = request.query.get("permanent", "false").lower() == "true" are_we_master, master_url = await self.schema_registry.get_master() if are_we_master: try: + version = Version(version) resolved_version = await self.schema_registry.subject_version_delete_local(subject, version, permanent) self.r(str(resolved_version), content_type, status=HTTPStatus.OK) except (SubjectNotFoundException, SchemasNotFoundException): @@ -878,6 +875,8 @@ async def subject_version_delete( content_type=content_type, status=HTTPStatus.UNPROCESSABLE_ENTITY, ) + except InvalidVersion: + self._invalid_version(content_type, version.tag) elif not master_url: self.no_master_error(content_type) else: @@ -917,8 +916,8 @@ async def subject_version_schema_get( async def subject_version_referencedby_get(self, content_type, *, subject, version, user: User | None = None): self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = Version(version) try: + version = Version(version) referenced_by = await self.schema_registry.subject_version_referencedby_get(subject, version) except (SubjectNotFoundException, SchemasNotFoundException): self.r( diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py index 6171ac9dc..40e44c37f 100644 --- a/tests/unit/test_schema_models.py +++ b/tests/unit/test_schema_models.py @@ -60,19 +60,26 @@ def test_tags(self, version: Version): @pytest.mark.parametrize( "version, is_latest", - [(Version("latest"), True), (Version("-1"), True), (Version("-20"), False), (Version(10), False)], + [(Version("latest"), True), (Version("-1"), True), (Version(10), False)], ) def test_is_latest(self, version: Version, is_latest: bool): assert version.is_latest is is_latest @pytest.mark.parametrize("version", [Version("latest"), Version(10), Version(-1)]) def test_validate(self, version: Version): - version.validate() + assert version - @pytest.mark.parametrize("invalid_version", [Version("invalid_version"), Version(0), Version(-20)]) - def test_validate_invalid(self, invalid_version: Version): + def test_validate_invalid(self): with pytest.raises(InvalidVersion): - assert not invalid_version.validate() + Version("invalid_version") + with pytest.raises(InvalidVersion): + Version(0) + with pytest.raises(InvalidVersion): + Version("0") + with pytest.raises(InvalidVersion): + Version(-20) + with pytest.raises(InvalidVersion): + Version("-10") @pytest.mark.parametrize( "version, resolved_version", @@ -108,23 +115,13 @@ def test_resolve_from_schema_versions( schema_versions.update(schema_versions_factory(10)) assert version.resolve_from_schema_versions(schema_versions) == resolved_version - @pytest.mark.parametrize( - "invalid_version", - [ - Version("invalid_version"), - Version(0), - Version(-20), - Version("-10"), - Version("100"), - Version(2000), - ], - ) + @pytest.mark.parametrize("nonexisting_version", [Version("100"), Version(2000)]) def test_resolve_from_schema_versions_invalid( self, - invalid_version: Version, + nonexisting_version: Version, schema_versions_factory: SVFCallable, ): schema_versions = dict() schema_versions.update(schema_versions_factory(1)) with pytest.raises(VersionNotFoundException): - invalid_version.resolve_from_schema_versions(schema_versions) + nonexisting_version.resolve_from_schema_versions(schema_versions) From cf1facb3aab670c9ca25051c4658bae104ab78d2 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Mon, 3 Jun 2024 17:59:52 +0200 Subject: [PATCH 10/13] fix: invalid version errors should use path version string --- karapace/schema_models.py | 16 +++++++++---- karapace/schema_registry_apis.py | 29 ++++++++++++----------- tests/integration/test_schema.py | 25 ++++++++++++++++++- tests/integration/test_schema_protobuf.py | 2 +- tests/unit/test_schema_models.py | 8 +++++-- 5 files changed, 57 insertions(+), 23 deletions(-) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 997db3a45..96f520795 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -390,17 +390,17 @@ class SchemaVersion: references: Sequence[Reference] | None -@dataclass(slots=True, frozen=True) +@dataclass(frozen=True) class Version: tag: str | int - LATEST_VERSION_TAG: Final | ClassVar[str] = "latest" - MINUS_1_VERSION_TAG: Final | ClassVar[str] = "-1" + LATEST_VERSION_TAG: ClassVar[str] = "latest" + MINUS_1_VERSION_TAG: ClassVar[str] = "-1" - def __post_init__(self): + def __post_init__(self) -> None: self.validate() - def validate(self): + def validate(self) -> None: try: version = int(self.tag) if (version < int(self.MINUS_1_VERSION_TAG)) or (version == 0): @@ -432,3 +432,9 @@ def resolve_from_schema_versions(self, schema_versions: Mapping[int, SchemaVersi if self.version <= max_version and self.version in schema_versions: return self.version return None + + def __str__(self) -> str: + return str(self.tag) + + def __repr__(self) -> str: + return f"Version<{self.tag}>" diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 10e80d618..4d8fd1f7b 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -401,12 +401,12 @@ async def compatibility_check( try: old = self.schema_registry.subject_version_get(subject=subject, version=version) except InvalidVersion: - self._invalid_version(content_type, version.tag) + self._invalid_version(content_type, version) except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version.tag} not found.", + "message": f"Version {version} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -802,13 +802,13 @@ async def subject_version_get( self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version.tag} not found.", + "message": f"Version {version} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, ) except InvalidVersion: - self._invalid_version(content_type, version.tag) + self._invalid_version(content_type, version) async def subject_version_delete( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None @@ -835,7 +835,7 @@ async def subject_version_delete( self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version.tag} not found.", + "message": f"Version {version} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -845,7 +845,8 @@ async def subject_version_delete( body={ "error_code": SchemaErrorCodes.SCHEMAVERSION_SOFT_DELETED.value, "message": ( - f"Subject '{subject}' Version 1 was soft deleted.Set permanent=true to delete permanently" + f"Subject '{subject}' Version {version} was soft deleted. " + "Set permanent=true to delete permanently" ), }, content_type=content_type, @@ -856,7 +857,7 @@ async def subject_version_delete( body={ "error_code": SchemaErrorCodes.SCHEMAVERSION_NOT_SOFT_DELETED.value, "message": ( - f"Subject '{subject}' Version {version.tag} was not deleted " + f"Subject '{subject}' Version {version} was not deleted " "first before being permanently deleted" ), }, @@ -876,11 +877,11 @@ async def subject_version_delete( status=HTTPStatus.UNPROCESSABLE_ENTITY, ) except InvalidVersion: - self._invalid_version(content_type, version.tag) + self._invalid_version(content_type, version) elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}/versions/{version.tag}?permanent={permanent}" + url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") async def subject_version_schema_get( @@ -888,17 +889,17 @@ async def subject_version_schema_get( ) -> None: self._check_authorization(user, Operation.Read, f"Subject:{subject}") - version = Version(version) try: + version = Version(version) subject_data = self.schema_registry.subject_version_get(subject, version) self.r(subject_data["schema"], content_type) except InvalidVersion: - self._invalid_version(content_type, version.tag) + self._invalid_version(content_type, version) except VersionNotFoundException: self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version.tag} not found.", + "message": f"Version {version} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -932,13 +933,13 @@ async def subject_version_referencedby_get(self, content_type, *, subject, versi self.r( body={ "error_code": SchemaErrorCodes.VERSION_NOT_FOUND.value, - "message": f"Version {version.tag} not found.", + "message": f"Version {version} not found.", }, content_type=content_type, status=HTTPStatus.NOT_FOUND, ) except InvalidVersion: - self._invalid_version(content_type, version.tag) + self._invalid_version(content_type, version) self.r(referenced_by, content_type, status=HTTPStatus.OK) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index fa8d0ddae..4e45857fe 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2323,6 +2323,29 @@ async def test_version_number_validation(registry_async_client: Client) -> None: ) +async def test_get_schema_version_by_latest_tags(registry_async_client: Client) -> None: + """ + Creates a subject and schema. Tests that the endpoints + `subjects/{subject}/versions/latest` and `subjects/{subject}/versions/-1` return the latest schema. + """ + subject = create_subject_name_factory("test_subject")() + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": '{"type": "string"}'}) + assert res.status_code == 200 + schema_id = res.json()["id"] + + res = await registry_async_client.get(f"subjects/{subject}/versions") + assert res.status_code == 200 + schema_version = res.json()[0] + + version_endpoints = {f"subjects/{subject}/versions/latest", f"subjects/{subject}/versions/-1"} + for endpoint in version_endpoints: + res = await registry_async_client.get(endpoint) + res_data = res.json() + assert res.status_code == 200 + assert res_data["id"] == schema_id + assert res_data["version"] == schema_version + + async def test_common_endpoints(registry_async_client: Client) -> None: res = await registry_async_client.get("") assert res.status_code == 200 @@ -2624,7 +2647,7 @@ async def test_schema_hard_delete_version(registry_async_client: Client) -> None assert res.status_code == 404 assert res.json()["error_code"] == 40406 assert ( - res.json()["message"] == f"Subject '{subject}' Version 1 was soft deleted.Set permanent=true to delete permanently" + res.json()["message"] == f"Subject '{subject}' Version 1 was soft deleted. Set permanent=true to delete permanently" ) res = await registry_async_client.get(f"subjects/{subject}/versions/1") diff --git a/tests/integration/test_schema_protobuf.py b/tests/integration/test_schema_protobuf.py index ede01737a..10df70637 100644 --- a/tests/integration/test_schema_protobuf.py +++ b/tests/integration/test_schema_protobuf.py @@ -291,7 +291,7 @@ async def test_protobuf_schema_references(registry_async_client: Client) -> None assert res.status_code == 200 res = await registry_async_client.delete("subjects/test_schema/versions/1") myjson = res.json() - match_msg = "Subject 'test_schema' Version 1 was soft deleted.Set permanent=true to delete permanently" + match_msg = "Subject 'test_schema' Version 1 was soft deleted. Set permanent=true to delete permanently" assert res.status_code == 404 diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py index 40e44c37f..4c15b46d1 100644 --- a/tests/unit/test_schema_models.py +++ b/tests/unit/test_schema_models.py @@ -9,7 +9,7 @@ from karapace.errors import InvalidVersion, VersionNotFoundException from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema, Version from karapace.schema_type import SchemaType -from typing import Any, Callable, Dict, Optional, Union +from typing import Any, Callable, Dict, Optional import pytest @@ -58,6 +58,10 @@ def test_tags(self, version: Version): assert version.LATEST_VERSION_TAG == "latest" assert version.MINUS_1_VERSION_TAG == "-1" + def test_text_formating(self, version: Version): + assert f"{version}" == "1" + assert f"{version!r}" == "Version<1>" + @pytest.mark.parametrize( "version, is_latest", [(Version("latest"), True), (Version("-1"), True), (Version(10), False)], @@ -90,7 +94,7 @@ def test_validate_invalid(self): (Version(10), "10"), ], ) - def test_resolved(self, version: Version, resolved_version: Union[str, int]): + def test_resolved(self, version: Version, resolved_version: str): assert version.resolved == resolved_version @pytest.mark.parametrize( From 6570e75a7d69b416b1daa1686893cbc7aff1d155 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 4 Jun 2024 15:52:35 +0200 Subject: [PATCH 11/13] (feature) introduce strongly typed `Version` class - this comes with the factory provider `Versioner` --- karapace/schema_models.py | 56 +--------- karapace/schema_versioning.py | 60 +++++++++++ karapace/utils.py | 18 ---- tests/unit/test_schema_models.py | 131 ----------------------- tests/unit/test_schema_versioning.py | 154 +++++++++++++++++++++++++++ tests/unit/test_utils.py | 22 ---- 6 files changed, 217 insertions(+), 224 deletions(-) create mode 100644 karapace/schema_versioning.py delete mode 100644 tests/unit/test_schema_models.py create mode 100644 tests/unit/test_schema_versioning.py delete mode 100644 tests/unit/test_utils.py diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 96f520795..86ccccbd9 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -10,7 +10,7 @@ from jsonschema import Draft7Validator from jsonschema.exceptions import SchemaError from karapace.dependency import Dependency -from karapace.errors import InvalidSchema, InvalidVersion, VersionNotFoundException +from karapace.errors import InvalidSchema from karapace.protobuf.exception import ( Error as ProtobufError, IllegalArgumentException, @@ -24,8 +24,8 @@ from karapace.schema_references import Reference from karapace.schema_type import SchemaType from karapace.typing import JsonObject, SchemaId, Subject -from karapace.utils import assert_never, intstr_version_guard, json_decode, json_encode, JSONDecodeError -from typing import Any, cast, ClassVar, Dict, Final, final, Mapping, Sequence +from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError +from typing import Any, cast, Dict, Final, final, Mapping, Sequence import hashlib import logging @@ -388,53 +388,3 @@ class SchemaVersion: schema_id: SchemaId schema: TypedSchema references: Sequence[Reference] | None - - -@dataclass(frozen=True) -class Version: - tag: str | int - - LATEST_VERSION_TAG: ClassVar[str] = "latest" - MINUS_1_VERSION_TAG: ClassVar[str] = "-1" - - def __post_init__(self) -> None: - self.validate() - - def validate(self) -> None: - try: - version = int(self.tag) - if (version < int(self.MINUS_1_VERSION_TAG)) or (version == 0): - raise InvalidVersion(f"Invalid version {self.tag}") - except ValueError as exc: - if not self.is_latest: - raise InvalidVersion(f"Invalid version {self.tag}") from exc - - @property - def is_latest(self) -> bool: - return (str(self.tag) == self.LATEST_VERSION_TAG) or (str(self.tag) == self.MINUS_1_VERSION_TAG) - - @property - def version(self) -> int: - return int(self.tag) - - @property - @intstr_version_guard(to_raise=InvalidVersion()) - def resolved(self) -> str: - if self.is_latest: - return self.LATEST_VERSION_TAG - return str(self.version) - - @intstr_version_guard(to_raise=VersionNotFoundException()) - def resolve_from_schema_versions(self, schema_versions: Mapping[int, SchemaVersion]) -> int | None: - max_version = max(schema_versions) - if self.is_latest: - return max_version - if self.version <= max_version and self.version in schema_versions: - return self.version - return None - - def __str__(self) -> str: - return str(self.tag) - - def __repr__(self) -> str: - return f"Version<{self.tag}>" diff --git a/karapace/schema_versioning.py b/karapace/schema_versioning.py new file mode 100644 index 000000000..265fb25aa --- /dev/null +++ b/karapace/schema_versioning.py @@ -0,0 +1,60 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.errors import InvalidVersion, VersionNotFoundException +from karapace.schema_models import SchemaVersion +from typing import ClassVar, Mapping, Union + +VersionTag = Union[str, int] + + +class Version(int): + LATEST_VERSION_TAG: ClassVar[str] = "latest" + MINUS_1_VERSION_TAG: ClassVar[int] = -1 + + @property + def is_latest(self) -> bool: + return self == self.MINUS_1_VERSION_TAG + + def from_schema_versions(self, schema_versions: Mapping[Version, SchemaVersion]) -> Version: + max_version = max(schema_versions) + if self.is_latest: + return max_version + if self <= max_version and self in schema_versions: + return self + raise VersionNotFoundException() + + @classmethod + def resolve_tag(cls, tag: VersionTag) -> int: + return cls.MINUS_1_VERSION_TAG if tag == cls.LATEST_VERSION_TAG else int(tag) + + @classmethod + def V(cls, tag: VersionTag) -> Version: + cls.validate_tag(tag=tag) + return Version(version=Version.resolve_tag(tag)) + + @classmethod + def validate_tag(cls, tag: VersionTag) -> None: + try: + version = cls.resolve_tag(tag=tag) + if (version < cls.MINUS_1_VERSION_TAG) or (version == 0): + raise InvalidVersion(f"Invalid version {tag}") + except ValueError as exc: + if tag != cls.LATEST_VERSION_TAG: + raise InvalidVersion(f"Invalid version {tag}") from exc + + def __new__(cls, version: int) -> Version: + if not isinstance(version, int): + raise InvalidVersion(f"Invalid version {version}") + if (version < cls.MINUS_1_VERSION_TAG) or (version == 0): + raise InvalidVersion(f"Invalid version {version}") + return super().__new__(cls, version) + + def __str__(self) -> str: + return f"{int(self)}" + + def __repr__(self) -> str: + return f"Version={int(self)}" diff --git a/karapace/utils.py b/karapace/utils.py index ffe73401b..39a1ae2ec 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -19,7 +19,6 @@ from types import MappingProxyType from typing import AnyStr, cast, IO, Literal, NoReturn, overload, TypeVar -import functools import importlib import kafka.client_async import logging @@ -215,23 +214,6 @@ def convert_to_int(object_: dict, key: str, content_type: str) -> None: ) -def intstr_version_guard(to_raise: BaseException): - def wrapper(f): - @functools.wraps(f) - def catcher(*args, **kwargs): - try: - value = f(*args, **kwargs) - if not value: - raise to_raise - return value - except ValueError as exc: - raise to_raise from exc - - return catcher - - return wrapper - - class KarapaceKafkaClient(KafkaClient): def __init__(self, **configs): kafka.client_async.BrokerConnection = KarapaceBrokerConnection diff --git a/tests/unit/test_schema_models.py b/tests/unit/test_schema_models.py deleted file mode 100644 index 4c15b46d1..000000000 --- a/tests/unit/test_schema_models.py +++ /dev/null @@ -1,131 +0,0 @@ -""" -karapace - Test schema models - -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from avro.schema import Schema as AvroSchema -from karapace.errors import InvalidVersion, VersionNotFoundException -from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema, Version -from karapace.schema_type import SchemaType -from typing import Any, Callable, Dict, Optional - -import pytest - -# Schema versions factory fixture type -SVFCallable = Callable[[None], Callable[[int, Dict[str, Any]], Dict[int, SchemaVersion]]] - - -class TestVersion: - @pytest.fixture - def version(self): - return Version(1) - - @pytest.fixture - def avro_schema(self) -> str: - return '{"type":"record","name":"testRecord","fields":[{"type":"string","name":"test"}]}' - - @pytest.fixture - def avro_schema_parsed(self, avro_schema: str) -> AvroSchema: - return parse_avro_schema_definition(avro_schema) - - @pytest.fixture - def schema_versions_factory( - self, - avro_schema: str, - avro_schema_parsed: AvroSchema, - ) -> Callable[[int, Dict[str, Any]], Dict[int, SchemaVersion]]: - def schema_versions(resolved_version: int, schema_version_data: Optional[Dict[str, Any]] = None): - schema_version_data = schema_version_data or dict() - base_schema_version_data = dict( - subject="test-topic", - version=resolved_version, - deleted=False, - schema_id=1, - schema=TypedSchema( - schema_type=SchemaType.AVRO, - schema_str=avro_schema, - schema=avro_schema_parsed, - ), - references=None, - ) - return {resolved_version: SchemaVersion(**{**base_schema_version_data, **schema_version_data})} - - return schema_versions - - def test_tags(self, version: Version): - assert version.LATEST_VERSION_TAG == "latest" - assert version.MINUS_1_VERSION_TAG == "-1" - - def test_text_formating(self, version: Version): - assert f"{version}" == "1" - assert f"{version!r}" == "Version<1>" - - @pytest.mark.parametrize( - "version, is_latest", - [(Version("latest"), True), (Version("-1"), True), (Version(10), False)], - ) - def test_is_latest(self, version: Version, is_latest: bool): - assert version.is_latest is is_latest - - @pytest.mark.parametrize("version", [Version("latest"), Version(10), Version(-1)]) - def test_validate(self, version: Version): - assert version - - def test_validate_invalid(self): - with pytest.raises(InvalidVersion): - Version("invalid_version") - with pytest.raises(InvalidVersion): - Version(0) - with pytest.raises(InvalidVersion): - Version("0") - with pytest.raises(InvalidVersion): - Version(-20) - with pytest.raises(InvalidVersion): - Version("-10") - - @pytest.mark.parametrize( - "version, resolved_version", - [ - (Version("latest"), "latest"), - (Version(-1), "latest"), - (Version("-1"), "latest"), - (Version(10), "10"), - ], - ) - def test_resolved(self, version: Version, resolved_version: str): - assert version.resolved == resolved_version - - @pytest.mark.parametrize( - "version, resolved_version", - [ - (Version("-1"), 10), - (Version(-1), 10), - (Version(1), 1), - (Version(10), 10), - (Version("latest"), 10), - ], - ) - def test_resolve_from_schema_versions( - self, - version: Version, - resolved_version: int, - schema_versions_factory: SVFCallable, - ): - schema_versions = dict() - schema_versions.update(schema_versions_factory(1)) - schema_versions.update(schema_versions_factory(2)) - schema_versions.update(schema_versions_factory(10)) - assert version.resolve_from_schema_versions(schema_versions) == resolved_version - - @pytest.mark.parametrize("nonexisting_version", [Version("100"), Version(2000)]) - def test_resolve_from_schema_versions_invalid( - self, - nonexisting_version: Version, - schema_versions_factory: SVFCallable, - ): - schema_versions = dict() - schema_versions.update(schema_versions_factory(1)) - with pytest.raises(VersionNotFoundException): - nonexisting_version.resolve_from_schema_versions(schema_versions) diff --git a/tests/unit/test_schema_versioning.py b/tests/unit/test_schema_versioning.py new file mode 100644 index 000000000..a59c6a3df --- /dev/null +++ b/tests/unit/test_schema_versioning.py @@ -0,0 +1,154 @@ +""" +karapace - Test schema models + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from avro.schema import Schema as AvroSchema +from karapace.errors import InvalidVersion, VersionNotFoundException +from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema +from karapace.schema_type import SchemaType +from karapace.schema_versioning import Version, VersionTag +from typing import Any, Callable, Dict, Optional + +import operator +import pytest + +# Schema versions factory fixture type +SVFCallable = Callable[[None], Callable[[int, Dict[str, Any]], Dict[int, SchemaVersion]]] + + +class TestVersion: + @pytest.fixture + def version(self): + return Version(1) + + @pytest.fixture + def avro_schema(self) -> str: + return '{"type":"record","name":"testRecord","fields":[{"type":"string","name":"test"}]}' + + @pytest.fixture + def avro_schema_parsed(self, avro_schema: str) -> AvroSchema: + return parse_avro_schema_definition(avro_schema) + + @pytest.fixture + def schema_versions_factory( + self, + avro_schema: str, + avro_schema_parsed: AvroSchema, + ) -> Callable[[Version, Dict[str, Any]], Dict[Version, SchemaVersion]]: + def schema_versions(version: Version, schema_version_data: Optional[Dict[str, Any]] = None): + schema_version_data = schema_version_data or dict() + base_schema_version_data = dict( + subject="test-topic", + version=version, + deleted=False, + schema_id=1, + schema=TypedSchema( + schema_type=SchemaType.AVRO, + schema_str=avro_schema, + schema=avro_schema_parsed, + ), + references=None, + ) + return {version: SchemaVersion(**{**base_schema_version_data, **schema_version_data})} + + return schema_versions + + def test_version(self, version: Version): + assert version == 1 + assert isinstance(version, Version) + assert issubclass(Version, int) + + def test_tags(self, version: Version): + assert version.LATEST_VERSION_TAG == "latest" + assert version.MINUS_1_VERSION_TAG == -1 + + @pytest.mark.parametrize("invalid_version", ["string", -10, 0]) + def test_invalid_version(self, invalid_version: VersionTag): + with pytest.raises(InvalidVersion): + Version(invalid_version) + + @pytest.mark.parametrize( + "version, is_latest", + [(Version(-1), True), (Version(1), False)], + ) + def test_is_latest(self, version: Version, is_latest: bool): + assert version.is_latest is is_latest + + @pytest.mark.parametrize("tag, resolved", [("latest", -1), (10, 10), ("20", 20)]) + def test_resolve_tag(self, tag: VersionTag, resolved: int): + assert Version.resolve_tag(tag=tag) == resolved + + def test_text_formating(self, version: Version): + assert f"{version}" == "1" + assert f"{version!r}" == "Version=1" + + @pytest.mark.parametrize( + "version, to_compare, comparer, valid", + [ + (Version(1), Version(1), operator.eq, True), + (Version(1), Version(2), operator.eq, False), + (Version(2), Version(1), operator.gt, True), + (Version(2), Version(1), operator.lt, False), + (Version(2), Version(2), operator.ge, True), + (Version(2), Version(1), operator.ge, True), + (Version(1), Version(1), operator.le, True), + (Version(1), Version(2), operator.le, True), + ], + ) + def test_comparisons( + self, + version: Version, + to_compare: Version, + comparer: Callable[[Version, Version], bool], + valid: bool, + ): + assert comparer(version, to_compare) is valid + + @pytest.mark.parametrize( + "version, resolved_version", + [ + (Version(-1), Version(10)), + (Version(1), Version(1)), + (Version(10), Version(10)), + ], + ) + def test_from_schema_versions( + self, + version: Version, + resolved_version: Version, + schema_versions_factory: SVFCallable, + ): + schema_versions = dict() + schema_versions.update(schema_versions_factory(Version(1))) + schema_versions.update(schema_versions_factory(Version(2))) + schema_versions.update(schema_versions_factory(Version(10))) + assert version.from_schema_versions(schema_versions) == resolved_version + + @pytest.mark.parametrize("nonexisting_version", [Version(100), Version(2000)]) + def test_from_schema_versions_nonexisting( + self, + nonexisting_version: Version, + schema_versions_factory: SVFCallable, + ): + schema_versions = dict() + schema_versions.update(schema_versions_factory(Version(1))) + with pytest.raises(VersionNotFoundException): + nonexisting_version.from_schema_versions(schema_versions) + + @pytest.mark.parametrize("tag, resolved", [("latest", -1), (10, 10), ("20", 20), (-1, -1), ("-1", -1)]) + def test_factory_V(self, tag: VersionTag, resolved: int): + version = Version.V(tag=tag) + assert version == resolved + assert isinstance(version, Version) + + @pytest.mark.parametrize("tag", ["latest", 10, -1, "-1"]) + def test_validate(self, tag: VersionTag, version: Version): + version.validate_tag(tag=tag) + + @pytest.mark.parametrize("tag", ["invalid_version", 0, -20, "0"]) + def test_validate_invalid(self, tag: VersionTag, version: Version): + with pytest.raises(InvalidVersion): + version.validate_tag(tag=tag) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py deleted file mode 100644 index d32af82ab..000000000 --- a/tests/unit/test_utils.py +++ /dev/null @@ -1,22 +0,0 @@ -""" -karapace - Test utils - -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from karapace.utils import intstr_version_guard - -import pytest - - -def test_intstr_version_guard(): - class RaiseMe(Exception): - pass - - @intstr_version_guard(to_raise=RaiseMe) - def raise_value_error(): - int("not a number") - - with pytest.raises(RaiseMe): - raise_value_error() From 9c02e377562ecab3d2d2f3a10b0a7d8bd07a4a3f Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 4 Jun 2024 17:02:00 +0200 Subject: [PATCH 12/13] refactor: use new strongly typed `Version` --- karapace/dependency.py | 4 +- karapace/errors.py | 13 +- karapace/in_memory_database.py | 26 ++-- karapace/schema_models.py | 36 ++++- karapace/schema_reader.py | 4 +- karapace/schema_references.py | 12 +- karapace/schema_registry.py | 28 ++-- karapace/schema_registry_apis.py | 16 +- karapace/schema_versioning.py | 60 -------- karapace/serialization.py | 22 +-- karapace/typing.py | 28 +++- tests/unit/test_protobuf_serialization.py | 10 +- ...ma_versioning.py => test_schema_models.py} | 141 ++++++++++-------- tests/unit/test_serialization.py | 6 +- 14 files changed, 210 insertions(+), 196 deletions(-) delete mode 100644 karapace/schema_versioning.py rename tests/unit/{test_schema_versioning.py => test_schema_models.py} (66%) diff --git a/karapace/dependency.py b/karapace/dependency.py index 52b7e965e..074263af7 100644 --- a/karapace/dependency.py +++ b/karapace/dependency.py @@ -8,7 +8,7 @@ from __future__ import annotations from karapace.schema_references import Reference -from karapace.typing import JsonData, Subject +from karapace.typing import JsonData, Subject, Version from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -26,7 +26,7 @@ def __init__( self, name: str, subject: Subject, - version: int, + version: Version, target_schema: ValidatedTypedSchema, ) -> None: self.name = name diff --git a/karapace/errors.py b/karapace/errors.py index 7853f2de1..b5c3ced38 100644 --- a/karapace/errors.py +++ b/karapace/errors.py @@ -2,7 +2,14 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.schema_references import Referents + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from karapace.schema_references import Referents + from karapace.typing import Version class VersionNotFoundException(Exception): @@ -54,10 +61,10 @@ class SubjectNotSoftDeletedException(Exception): class ReferenceExistsException(Exception): - def __init__(self, referenced_by: Referents, version: int) -> None: + def __init__(self, referenced_by: Referents, version: Version) -> None: super().__init__() - self.version = version self.referenced_by = referenced_by + self.version = version class SubjectSoftDeletedException(Exception): diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index 3c7facc4c..81bf474d4 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -7,9 +7,9 @@ from __future__ import annotations from dataclasses import dataclass, field -from karapace.schema_models import SchemaVersion, TypedSchema +from karapace.schema_models import SchemaVersion, TypedSchema, Versioner from karapace.schema_references import Reference, Referents -from karapace.typing import SchemaId, Subject +from karapace.typing import SchemaId, Subject, Version from threading import Lock, RLock from typing import Iterable, Sequence @@ -20,7 +20,7 @@ @dataclass class SubjectData: - schemas: dict[int, SchemaVersion] = field(default_factory=dict) + schemas: dict[Version, SchemaVersion] = field(default_factory=dict) compatibility: str | None = None @@ -31,7 +31,7 @@ def __init__(self) -> None: self.subjects: dict[Subject, SubjectData] = {} self.schemas: dict[SchemaId, TypedSchema] = {} self.schema_lock_thread = RLock() - self.referenced_by: dict[tuple[Subject, int], Referents] = {} + self.referenced_by: dict[tuple[Subject, Version], Referents] = {} # Content based deduplication of schemas. This is used to reduce memory # usage when the same schema is produce multiple times to the same or @@ -100,15 +100,15 @@ def _delete_subject_from_schema_id_on_subject(self, *, subject: Subject) -> None def _get_from_hash_cache(self, *, typed_schema: TypedSchema) -> TypedSchema: return self._hash_to_schema.setdefault(typed_schema.fingerprint(), typed_schema) - def get_next_version(self, *, subject: Subject) -> int: - return max(self.subjects[subject].schemas) + 1 + def get_next_version(self, *, subject: Subject) -> Version: + return Versioner.V(max(self.subjects[subject].schemas) + 1) def insert_schema_version( self, *, subject: Subject, schema_id: SchemaId, - version: int, + version: Version, deleted: bool, schema: TypedSchema, references: Sequence[Reference] | None, @@ -217,19 +217,19 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]: subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False) ] - def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[int, SchemaVersion]: + def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]: if subject not in self.subjects: return {} if include_deleted: return self.subjects[subject].schemas with self.schema_lock_thread: return { - version_id: schema_version + Versioner.V(version_id): schema_version for version_id, schema_version in self.subjects[subject].schemas.items() if schema_version.deleted is False } - def delete_subject(self, *, subject: Subject, version: int) -> None: + def delete_subject(self, *, subject: Subject, version: Version) -> None: with self.schema_lock_thread: for schema_version in self.subjects[subject].schemas.values(): if schema_version.version <= version: @@ -241,7 +241,7 @@ def delete_subject_hard(self, *, subject: Subject) -> None: del self.subjects[subject] self._delete_subject_from_schema_id_on_subject(subject=subject) - def delete_subject_schema(self, *, subject: Subject, version: int) -> None: + def delete_subject_schema(self, *, subject: Subject, version: Version) -> None: with self.schema_lock_thread: self.subjects[subject].schemas.pop(version, None) @@ -263,7 +263,7 @@ def num_schema_versions(self) -> tuple[int, int]: soft_deleted_versions += 1 return (live_versions, soft_deleted_versions) - def insert_referenced_by(self, *, subject: Subject, version: int, schema_id: SchemaId) -> None: + def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None: with self.schema_lock_thread: referents = self.referenced_by.get((subject, version), None) if referents: @@ -271,7 +271,7 @@ def insert_referenced_by(self, *, subject: Subject, version: int, schema_id: Sch else: self.referenced_by[(subject, version)] = Referents([schema_id]) - def get_referenced_by(self, subject: Subject, version: int) -> Referents | None: + def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None: with self.schema_lock_thread: return self.referenced_by.get((subject, version), None) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index 86ccccbd9..d21917025 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -10,7 +10,7 @@ from jsonschema import Draft7Validator from jsonschema.exceptions import SchemaError from karapace.dependency import Dependency -from karapace.errors import InvalidSchema +from karapace.errors import InvalidSchema, InvalidVersion, VersionNotFoundException from karapace.protobuf.exception import ( Error as ProtobufError, IllegalArgumentException, @@ -23,7 +23,7 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_references import Reference from karapace.schema_type import SchemaType -from karapace.typing import JsonObject, SchemaId, Subject +from karapace.typing import JsonObject, SchemaId, Subject, Version, VersionTag from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError from typing import Any, cast, Dict, Final, final, Mapping, Sequence @@ -383,8 +383,38 @@ def parse( @dataclass class SchemaVersion: subject: Subject - version: int + version: Version deleted: bool schema_id: SchemaId schema: TypedSchema references: Sequence[Reference] | None + + +class Versioner: + @classmethod + def V(cls, tag: VersionTag) -> Version: + cls.validate_tag(tag=tag) + return Version(version=cls.resolve_tag(tag)) + + @classmethod + def validate_tag(cls, tag: VersionTag) -> None: + try: + version = cls.resolve_tag(tag=tag) + if (version < Version.MINUS_1_VERSION_TAG) or (version == 0): + raise InvalidVersion(f"Invalid version {tag}") + except ValueError as exc: + if tag != Version.LATEST_VERSION_TAG: + raise InvalidVersion(f"Invalid version {tag}") from exc + + @staticmethod + def resolve_tag(tag: VersionTag) -> int: + return Version.MINUS_1_VERSION_TAG if tag == Version.LATEST_VERSION_TAG else int(tag) + + @staticmethod + def from_schema_versions(schema_versions: Mapping[Version, SchemaVersion], version: Version) -> Version: + max_version = max(schema_versions) + if version.is_latest: + return max_version + if version in schema_versions and version <= max_version: + return version + raise VersionNotFoundException() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index d863fbe9e..993a83368 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -39,7 +39,7 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, SchemaId, Subject +from karapace.typing import JsonObject, SchemaId, Subject, Version from karapace.utils import json_decode, JSONDecodeError from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -602,7 +602,7 @@ def remove_referenced_by( def get_referenced_by( self, subject: Subject, - version: int, + version: Version, ) -> Referents | None: return self.database.get_referenced_by(subject, version) diff --git a/karapace/schema_references.py b/karapace/schema_references.py index 746a583ba..9973b0ccb 100644 --- a/karapace/schema_references.py +++ b/karapace/schema_references.py @@ -8,7 +8,7 @@ from __future__ import annotations from karapace.dataclasses import default_dataclass -from karapace.typing import JsonData, JsonObject, SchemaId, Subject +from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from typing import cast, List, Mapping, NewType, TypeVar Referents = NewType("Referents", List[SchemaId]) @@ -36,7 +36,7 @@ class LatestVersionReference: name: str subject: Subject - def resolve(self, version: int) -> Reference: + def resolve(self, version: Version) -> Reference: return Reference( name=self.name, subject=self.subject, @@ -48,10 +48,10 @@ def resolve(self, version: int) -> Reference: class Reference: name: str subject: Subject - version: int + version: Version def __post_init__(self) -> None: - assert self.version != -1 + assert self.version != Version.MINUS_1_VERSION_TAG def __repr__(self) -> str: return f"{{name='{self.name}', subject='{self.subject}', version={self.version}}}" @@ -68,7 +68,7 @@ def from_dict(data: JsonObject) -> Reference: return Reference( name=str(data["name"]), subject=Subject(str(data["subject"])), - version=int(cast(int, data["version"])), + version=Version(cast(int, data["version"])), ) @@ -88,6 +88,6 @@ def reference_from_mapping( else Reference( name=name, subject=subject, - version=int(version), + version=Version(version), ) ) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 8ab2ce26f..2ad5f3059 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -25,10 +25,10 @@ from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher -from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Version +from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_reader import KafkaSchemaReader from karapace.schema_references import LatestVersionReference, Reference -from karapace.typing import JsonObject, Mode, SchemaId, Subject +from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version from typing import Sequence import asyncio @@ -127,7 +127,7 @@ def schemas_get(self, schema_id: SchemaId, *, fetch_max_id: bool = False) -> Typ return schema - async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[int]: + async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[Version]: async with self.schema_lock: schema_versions = self.subject_get(subject, include_deleted=True) @@ -173,7 +173,7 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ try: schema_versions_live = self.subject_get(subject, include_deleted=False) except SchemasNotFoundException: - latest_version_id = int(0) + latest_version_id = Versioner.V(Version.MINUS_1_VERSION_TAG) version_list = [] else: version_list = list(schema_versions_live) @@ -187,7 +187,7 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[ return version_list - async def subject_version_delete_local(self, subject: Subject, version: Version, permanent: bool) -> int: + async def subject_version_delete_local(self, subject: Subject, version: Version, permanent: bool) -> Version: async with self.schema_lock: schema_versions = self.subject_get(subject, include_deleted=True) if not permanent and version.is_latest: @@ -196,7 +196,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version, for version_id, schema_version in schema_versions.items() if schema_version.deleted is False } - resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions) + resolved_version = Versioner.from_schema_versions(schema_versions=schema_versions, version=version) schema_version = schema_versions.get(resolved_version, None) if not schema_version: @@ -224,7 +224,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version, self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references) return resolved_version - def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[int, SchemaVersion]: + def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[Version, SchemaVersion]: subject_found = self.database.find_subject(subject=subject) if not subject_found: raise SubjectNotFoundException() @@ -238,7 +238,7 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions) + resolved_version = Versioner.from_schema_versions(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: @@ -269,7 +269,7 @@ async def subject_version_referencedby_get( schema_versions = self.subject_get(subject, include_deleted=include_deleted) if not schema_versions: raise SubjectNotFoundException() - resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions) + resolved_version = Versioner.from_schema_versions(schema_versions=schema_versions, version=version) schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None) if not schema_data: raise VersionNotFoundException() @@ -311,7 +311,7 @@ async def write_new_schema_local( all_schema_versions = self.database.find_subject_schemas(subject=subject, include_deleted=True) if not all_schema_versions: - version = int(1) + version = Version(1) schema_id = self.database.get_schema_id(new_schema) LOG.debug( "Registering new subject: %r, id: %r with version: %r with schema %r, schema_id: %r", @@ -400,8 +400,8 @@ async def write_new_schema_local( def get_subject_versions_for_schema( self, schema_id: SchemaId, *, include_deleted: bool = False - ) -> list[dict[str, Subject | int]]: - subject_versions: list[dict[str, Subject | int]] = [] + ) -> list[dict[str, Subject | Version]]: + subject_versions: list[dict[str, Subject | Version]] = [] schema_versions = self.database.find_schema_versions_by_schema_id( schema_id=schema_id, include_deleted=include_deleted, @@ -423,7 +423,7 @@ def send_schema_message( subject: Subject, schema: TypedSchema | None, schema_id: int, - version: int, + version: Version, deleted: bool, references: Sequence[Reference] | None, ) -> None: @@ -459,7 +459,7 @@ def resolve_references( ) -> tuple[Sequence[Reference], dict[str, Dependency]] | tuple[None, None]: return self.schema_reader.resolve_references(references) if references else (None, None) - def send_delete_subject_message(self, subject: Subject, version: int) -> None: + def send_delete_subject_message(self, subject: Subject, version: Version) -> None: key = {"subject": subject, "magic": 0, "keytype": "DELETE_SUBJECT"} value = {"subject": subject, "version": version} self.producer.send_message(key=key, value=value) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 4d8fd1f7b..99d942bdd 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -31,10 +31,10 @@ from karapace.karapace import KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME -from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Version +from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry -from karapace.typing import JsonData, JsonObject, SchemaId, Subject +from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version from karapace.utils import JSONDecodeError from typing import Any @@ -333,7 +333,7 @@ async def close(self) -> None: if self._auth is not None: stack.push_async_callback(self._auth.close) - def _subject_get(self, subject: str, content_type: str, include_deleted: bool = False) -> dict[int, SchemaVersion]: + def _subject_get(self, subject: str, content_type: str, include_deleted: bool = False) -> dict[Version, SchemaVersion]: try: schema_versions = self.schema_registry.subject_get(subject, include_deleted) except SubjectNotFoundException: @@ -381,7 +381,7 @@ async def compatibility_check( schema_type = self._validate_schema_type(content_type=content_type, data=body) references = self._validate_references(content_type, schema_type, body) try: - version = Version(version) + version = Versioner.V(version) references, new_schema_dependencies = self.schema_registry.resolve_references(references) new_schema = ValidatedTypedSchema.parse( schema_type=schema_type, @@ -784,7 +784,7 @@ async def subject_version_get( deleted = request.query.get("deleted", "false").lower() == "true" try: - version = Version(version) + version = Versioner.V(version) subject_data = self.schema_registry.subject_version_get(subject, version, include_deleted=deleted) if "compatibility" in subject_data: del subject_data["compatibility"] @@ -819,7 +819,7 @@ async def subject_version_delete( are_we_master, master_url = await self.schema_registry.get_master() if are_we_master: try: - version = Version(version) + version = Versioner.V(version) resolved_version = await self.schema_registry.subject_version_delete_local(subject, version, permanent) self.r(str(resolved_version), content_type, status=HTTPStatus.OK) except (SubjectNotFoundException, SchemasNotFoundException): @@ -890,7 +890,7 @@ async def subject_version_schema_get( self._check_authorization(user, Operation.Read, f"Subject:{subject}") try: - version = Version(version) + version = Versioner.V(version) subject_data = self.schema_registry.subject_version_get(subject, version) self.r(subject_data["schema"], content_type) except InvalidVersion: @@ -918,7 +918,7 @@ async def subject_version_referencedby_get(self, content_type, *, subject, versi self._check_authorization(user, Operation.Read, f"Subject:{subject}") try: - version = Version(version) + version = Versioner.V(version) referenced_by = await self.schema_registry.subject_version_referencedby_get(subject, version) except (SubjectNotFoundException, SchemasNotFoundException): self.r( diff --git a/karapace/schema_versioning.py b/karapace/schema_versioning.py deleted file mode 100644 index 265fb25aa..000000000 --- a/karapace/schema_versioning.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -Copyright (c) 2023 Aiven Ltd -See LICENSE for details -""" -from __future__ import annotations - -from karapace.errors import InvalidVersion, VersionNotFoundException -from karapace.schema_models import SchemaVersion -from typing import ClassVar, Mapping, Union - -VersionTag = Union[str, int] - - -class Version(int): - LATEST_VERSION_TAG: ClassVar[str] = "latest" - MINUS_1_VERSION_TAG: ClassVar[int] = -1 - - @property - def is_latest(self) -> bool: - return self == self.MINUS_1_VERSION_TAG - - def from_schema_versions(self, schema_versions: Mapping[Version, SchemaVersion]) -> Version: - max_version = max(schema_versions) - if self.is_latest: - return max_version - if self <= max_version and self in schema_versions: - return self - raise VersionNotFoundException() - - @classmethod - def resolve_tag(cls, tag: VersionTag) -> int: - return cls.MINUS_1_VERSION_TAG if tag == cls.LATEST_VERSION_TAG else int(tag) - - @classmethod - def V(cls, tag: VersionTag) -> Version: - cls.validate_tag(tag=tag) - return Version(version=Version.resolve_tag(tag)) - - @classmethod - def validate_tag(cls, tag: VersionTag) -> None: - try: - version = cls.resolve_tag(tag=tag) - if (version < cls.MINUS_1_VERSION_TAG) or (version == 0): - raise InvalidVersion(f"Invalid version {tag}") - except ValueError as exc: - if tag != cls.LATEST_VERSION_TAG: - raise InvalidVersion(f"Invalid version {tag}") from exc - - def __new__(cls, version: int) -> Version: - if not isinstance(version, int): - raise InvalidVersion(f"Invalid version {version}") - if (version < cls.MINUS_1_VERSION_TAG) or (version == 0): - raise InvalidVersion(f"Invalid version {version}") - return super().__new__(cls, version) - - def __str__(self) -> str: - return f"{int(self)}" - - def __repr__(self) -> str: - return f"Version={int(self)}" diff --git a/karapace/serialization.py b/karapace/serialization.py index 8797dd780..81c51cabc 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -16,9 +16,9 @@ from karapace.protobuf.exception import ProtobufTypeException from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter from karapace.protobuf.schema import ProtobufSchema -from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema +from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType +from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType, Version from karapace.utils import json_decode, json_encode from typing import Any, Callable, MutableMapping from urllib.parse import quote @@ -131,9 +131,9 @@ async def post_new_schema( async def _get_schema_recursive( self, subject: Subject, - explored_schemas: set[tuple[Subject, int | None]], - version: int | None = None, - ) -> tuple[SchemaId, ValidatedTypedSchema, int]: + explored_schemas: set[tuple[Subject, Version | None]], + version: Version | None = None, + ) -> tuple[SchemaId, ValidatedTypedSchema, Version]: if (subject, version) in explored_schemas: raise InvalidSchema( f"The schema has at least a cycle in dependencies, " @@ -174,7 +174,7 @@ async def _get_schema_recursive( references=references, dependencies=dependencies, ), - int(json_result["version"]), + Versioner.V(json_result["version"]), ) except InvalidSchema as e: raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e @@ -183,21 +183,21 @@ async def _get_schema_recursive( async def get_schema( self, subject: Subject, - version: int | None = None, - ) -> tuple[SchemaId, ValidatedTypedSchema, int]: + version: Version | None = None, + ) -> tuple[SchemaId, ValidatedTypedSchema, Version]: """ Retrieves the schema and its dependencies for the specified subject. Args: subject (Subject): The subject for which to retrieve the schema. - version (Optional[int]): The specific version of the schema to retrieve. + version (Optional[Version]): The specific version of the schema to retrieve. If None, the latest available schema will be returned. Returns: - Tuple[SchemaId, ValidatedTypedSchema, int]: A tuple containing: + Tuple[SchemaId, ValidatedTypedSchema, Version]: A tuple containing: - SchemaId: The ID of the retrieved schema. - ValidatedTypedSchema: The retrieved schema, validated and typed. - - int: The version of the schema that was retrieved. + - Version: The version of the schema that was retrieved. """ return await self._get_schema_recursive(subject, set(), version) diff --git a/karapace/typing.py b/karapace/typing.py index 922601056..40b29fa2d 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -2,8 +2,11 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from enum import Enum, unique -from typing import Dict, List, Mapping, NewType, Sequence, Union +from karapace.errors import InvalidVersion +from typing import ClassVar, Dict, List, Mapping, NewType, Sequence, Union from typing_extensions import TypeAlias JsonArray: TypeAlias = List["JsonData"] @@ -17,6 +20,7 @@ ArgJsonData: TypeAlias = Union[JsonScalar, ArgJsonObject, ArgJsonArray] Subject = NewType("Subject", str) +VersionTag = Union[str, int] # note: the SchemaID is a unique id among all the schemas (and each version should be assigned to a different id) # basically the same SchemaID refer always to the same TypedSchema. @@ -53,3 +57,25 @@ class SubjectType(StrEnum, Enum): @unique class Mode(StrEnum): readwrite = "READWRITE" + + +class Version(int): + LATEST_VERSION_TAG: ClassVar[str] = "latest" + MINUS_1_VERSION_TAG: ClassVar[int] = -1 + + def __new__(cls, version: int) -> Version: + if not isinstance(version, int): + raise InvalidVersion(f"Invalid version {version}") + if (version < cls.MINUS_1_VERSION_TAG) or (version == 0): + raise InvalidVersion(f"Invalid version {version}") + return super().__new__(cls, version) + + def __str__(self) -> str: + return f"{int(self)}" + + def __repr__(self) -> str: + return f"Version={int(self)}" + + @property + def is_latest(self) -> bool: + return self == self.MINUS_1_VERSION_TAG diff --git a/tests/unit/test_protobuf_serialization.py b/tests/unit/test_protobuf_serialization.py index b7b84839e..ee2586d63 100644 --- a/tests/unit/test_protobuf_serialization.py +++ b/tests/unit/test_protobuf_serialization.py @@ -5,7 +5,7 @@ from karapace.config import read_config from karapace.dependency import Dependency from karapace.protobuf.kotlin_wrapper import trim_margin -from karapace.schema_models import ParsedTypedSchema, SchemaType +from karapace.schema_models import ParsedTypedSchema, SchemaType, Versioner from karapace.schema_references import Reference from karapace.serialization import ( InvalidMessageHeader, @@ -45,7 +45,7 @@ async def test_happy_flow(default_config_path: Path): mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result( - (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), int(1)) + (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), Versioner.V(1)) ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future @@ -114,7 +114,7 @@ async def test_happy_flow_references(default_config_path: Path): schema_for_id_one_future.set_result((ref_schema, [Subject("stub")])) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result((1, ref_schema, int(1))) + get_latest_schema_future.set_result((1, ref_schema, Versioner.V(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) @@ -201,7 +201,7 @@ async def test_happy_flow_references_two(default_config_path: Path): schema_for_id_one_future.set_result((ref_schema_two, [Subject("mock")])) mock_protobuf_registry_client.get_schema_for_id.return_value = schema_for_id_one_future get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result((1, ref_schema_two, int(1))) + get_latest_schema_future.set_result((1, ref_schema_two, Versioner.V(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) @@ -221,7 +221,7 @@ async def test_serialization_fails(default_config_path: Path): mock_protobuf_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result( - (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), int(1)) + (1, ParsedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)), Versioner.V(1)) ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future diff --git a/tests/unit/test_schema_versioning.py b/tests/unit/test_schema_models.py similarity index 66% rename from tests/unit/test_schema_versioning.py rename to tests/unit/test_schema_models.py index a59c6a3df..fc1590da9 100644 --- a/tests/unit/test_schema_versioning.py +++ b/tests/unit/test_schema_models.py @@ -7,9 +7,9 @@ from avro.schema import Schema as AvroSchema from karapace.errors import InvalidVersion, VersionNotFoundException -from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema +from karapace.schema_models import parse_avro_schema_definition, SchemaVersion, TypedSchema, Versioner from karapace.schema_type import SchemaType -from karapace.schema_versioning import Version, VersionTag +from karapace.typing import Version, VersionTag from typing import Any, Callable, Dict, Optional import operator @@ -22,39 +22,7 @@ class TestVersion: @pytest.fixture def version(self): - return Version(1) - - @pytest.fixture - def avro_schema(self) -> str: - return '{"type":"record","name":"testRecord","fields":[{"type":"string","name":"test"}]}' - - @pytest.fixture - def avro_schema_parsed(self, avro_schema: str) -> AvroSchema: - return parse_avro_schema_definition(avro_schema) - - @pytest.fixture - def schema_versions_factory( - self, - avro_schema: str, - avro_schema_parsed: AvroSchema, - ) -> Callable[[Version, Dict[str, Any]], Dict[Version, SchemaVersion]]: - def schema_versions(version: Version, schema_version_data: Optional[Dict[str, Any]] = None): - schema_version_data = schema_version_data or dict() - base_schema_version_data = dict( - subject="test-topic", - version=version, - deleted=False, - schema_id=1, - schema=TypedSchema( - schema_type=SchemaType.AVRO, - schema_str=avro_schema, - schema=avro_schema_parsed, - ), - references=None, - ) - return {version: SchemaVersion(**{**base_schema_version_data, **schema_version_data})} - - return schema_versions + return Versioner.V(1) def test_version(self, version: Version): assert version == 1 @@ -68,19 +36,15 @@ def test_tags(self, version: Version): @pytest.mark.parametrize("invalid_version", ["string", -10, 0]) def test_invalid_version(self, invalid_version: VersionTag): with pytest.raises(InvalidVersion): - Version(invalid_version) + Versioner.V(invalid_version) @pytest.mark.parametrize( "version, is_latest", - [(Version(-1), True), (Version(1), False)], + [(Versioner.V(-1), True), (Versioner.V(1), False)], ) def test_is_latest(self, version: Version, is_latest: bool): assert version.is_latest is is_latest - @pytest.mark.parametrize("tag, resolved", [("latest", -1), (10, 10), ("20", 20)]) - def test_resolve_tag(self, tag: VersionTag, resolved: int): - assert Version.resolve_tag(tag=tag) == resolved - def test_text_formating(self, version: Version): assert f"{version}" == "1" assert f"{version!r}" == "Version=1" @@ -88,14 +52,14 @@ def test_text_formating(self, version: Version): @pytest.mark.parametrize( "version, to_compare, comparer, valid", [ - (Version(1), Version(1), operator.eq, True), - (Version(1), Version(2), operator.eq, False), - (Version(2), Version(1), operator.gt, True), - (Version(2), Version(1), operator.lt, False), - (Version(2), Version(2), operator.ge, True), - (Version(2), Version(1), operator.ge, True), - (Version(1), Version(1), operator.le, True), - (Version(1), Version(2), operator.le, True), + (Versioner.V(1), Versioner.V(1), operator.eq, True), + (Versioner.V(1), Versioner.V(2), operator.eq, False), + (Versioner.V(2), Versioner.V(1), operator.gt, True), + (Versioner.V(2), Versioner.V(1), operator.lt, False), + (Versioner.V(2), Versioner.V(2), operator.ge, True), + (Versioner.V(2), Versioner.V(1), operator.ge, True), + (Versioner.V(1), Versioner.V(1), operator.le, True), + (Versioner.V(1), Versioner.V(2), operator.le, True), ], ) def test_comparisons( @@ -107,12 +71,50 @@ def test_comparisons( ): assert comparer(version, to_compare) is valid + +class TestVersioner: + @pytest.fixture + def avro_schema(self) -> str: + return '{"type":"record","name":"testRecord","fields":[{"type":"string","name":"test"}]}' + + @pytest.fixture + def avro_schema_parsed(self, avro_schema: str) -> AvroSchema: + return parse_avro_schema_definition(avro_schema) + + @pytest.fixture + def schema_versions_factory( + self, + avro_schema: str, + avro_schema_parsed: AvroSchema, + ) -> Callable[[Version, Dict[str, Any]], Dict[Version, SchemaVersion]]: + def schema_versions(version: Version, schema_version_data: Optional[Dict[str, Any]] = None): + schema_version_data = schema_version_data or dict() + base_schema_version_data = dict( + subject="test-topic", + version=version, + deleted=False, + schema_id=1, + schema=TypedSchema( + schema_type=SchemaType.AVRO, + schema_str=avro_schema, + schema=avro_schema_parsed, + ), + references=None, + ) + return {version: SchemaVersion(**{**base_schema_version_data, **schema_version_data})} + + return schema_versions + + @pytest.mark.parametrize("tag, resolved", [("latest", -1), (10, 10), ("20", 20)]) + def test_resolve_tag(self, tag: VersionTag, resolved: int): + assert Versioner.resolve_tag(tag=tag) == resolved + @pytest.mark.parametrize( "version, resolved_version", [ - (Version(-1), Version(10)), - (Version(1), Version(1)), - (Version(10), Version(10)), + (Versioner.V(-1), Versioner.V(10)), + (Versioner.V(1), Versioner.V(1)), + (Versioner.V(10), Versioner.V(10)), ], ) def test_from_schema_versions( @@ -122,33 +124,42 @@ def test_from_schema_versions( schema_versions_factory: SVFCallable, ): schema_versions = dict() - schema_versions.update(schema_versions_factory(Version(1))) - schema_versions.update(schema_versions_factory(Version(2))) - schema_versions.update(schema_versions_factory(Version(10))) - assert version.from_schema_versions(schema_versions) == resolved_version + schema_versions.update(schema_versions_factory(Versioner.V(1))) + schema_versions.update(schema_versions_factory(Versioner.V(2))) + schema_versions.update(schema_versions_factory(Versioner.V(10))) + assert Versioner.from_schema_versions(schema_versions, version) == resolved_version - @pytest.mark.parametrize("nonexisting_version", [Version(100), Version(2000)]) + @pytest.mark.parametrize("nonexisting_version", [Versioner.V(100), Versioner.V(2000)]) def test_from_schema_versions_nonexisting( self, nonexisting_version: Version, schema_versions_factory: SVFCallable, ): schema_versions = dict() - schema_versions.update(schema_versions_factory(Version(1))) + schema_versions.update(schema_versions_factory(Versioner.V(1))) with pytest.raises(VersionNotFoundException): - nonexisting_version.from_schema_versions(schema_versions) + Versioner.from_schema_versions(schema_versions, nonexisting_version) - @pytest.mark.parametrize("tag, resolved", [("latest", -1), (10, 10), ("20", 20), (-1, -1), ("-1", -1)]) + @pytest.mark.parametrize( + "tag, resolved", + [ + ("latest", Versioner.V(-1)), + (10, Versioner.V(10)), + ("20", Versioner.V(20)), + (-1, Versioner.V(-1)), + ("-1", Versioner.V(-1)), + ], + ) def test_factory_V(self, tag: VersionTag, resolved: int): - version = Version.V(tag=tag) + version = Versioner.V(tag=tag) assert version == resolved assert isinstance(version, Version) @pytest.mark.parametrize("tag", ["latest", 10, -1, "-1"]) - def test_validate(self, tag: VersionTag, version: Version): - version.validate_tag(tag=tag) + def test_validate(self, tag: VersionTag): + Versioner.validate_tag(tag=tag) @pytest.mark.parametrize("tag", ["invalid_version", 0, -20, "0"]) - def test_validate_invalid(self, tag: VersionTag, version: Version): + def test_validate_invalid(self, tag: VersionTag): with pytest.raises(InvalidVersion): - version.validate_tag(tag=tag) + Versioner.validate_tag(tag=tag) diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index d8c6a698f..a21d3bc00 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -4,7 +4,7 @@ """ from karapace.client import Path from karapace.config import DEFAULTS, read_config -from karapace.schema_models import SchemaType, ValidatedTypedSchema +from karapace.schema_models import SchemaType, ValidatedTypedSchema, Versioner from karapace.serialization import ( flatten_unions, get_subject_name, @@ -121,7 +121,7 @@ async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySeriali async def test_happy_flow(default_config_path: Path): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), int(1))) + get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), Versioner.V(1))) mock_registry_client.get_schema.return_value = get_latest_schema_future schema_for_id_one_future = asyncio.Future() schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) @@ -313,7 +313,7 @@ def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> No async def test_serialization_fails(default_config_path: Path): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() - get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), int(1))) + get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), Versioner.V(1))) mock_registry_client.get_schema.return_value = get_latest_schema_future serializer = await make_ser_deser(default_config_path, mock_registry_client) From c36b130207a00405967503db26cf949394d7119d Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 5 Jun 2024 14:55:49 +0200 Subject: [PATCH 13/13] refactor: use inline version object - we create the `Version` objects in the API controller using the `Versioner.V()` factory method, but we've moved this statement inline as an argument to the dependent functions. This prevents us from overriding the `version` path parameter. --- karapace/schema_registry_apis.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 99d942bdd..ef4168313 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -381,7 +381,6 @@ async def compatibility_check( schema_type = self._validate_schema_type(content_type=content_type, data=body) references = self._validate_references(content_type, schema_type, body) try: - version = Versioner.V(version) references, new_schema_dependencies = self.schema_registry.resolve_references(references) new_schema = ValidatedTypedSchema.parse( schema_type=schema_type, @@ -399,7 +398,7 @@ async def compatibility_check( status=HTTPStatus.UNPROCESSABLE_ENTITY, ) try: - old = self.schema_registry.subject_version_get(subject=subject, version=version) + old = self.schema_registry.subject_version_get(subject=subject, version=Versioner.V(version)) except InvalidVersion: self._invalid_version(content_type, version) except (VersionNotFoundException, SchemasNotFoundException, SubjectNotFoundException): @@ -784,8 +783,7 @@ async def subject_version_get( deleted = request.query.get("deleted", "false").lower() == "true" try: - version = Versioner.V(version) - subject_data = self.schema_registry.subject_version_get(subject, version, include_deleted=deleted) + subject_data = self.schema_registry.subject_version_get(subject, Versioner.V(version), include_deleted=deleted) if "compatibility" in subject_data: del subject_data["compatibility"] self.r(subject_data, content_type) @@ -819,8 +817,9 @@ async def subject_version_delete( are_we_master, master_url = await self.schema_registry.get_master() if are_we_master: try: - version = Versioner.V(version) - resolved_version = await self.schema_registry.subject_version_delete_local(subject, version, permanent) + resolved_version = await self.schema_registry.subject_version_delete_local( + subject, Versioner.V(version), permanent + ) self.r(str(resolved_version), content_type, status=HTTPStatus.OK) except (SubjectNotFoundException, SchemasNotFoundException): self.r( @@ -890,8 +889,7 @@ async def subject_version_schema_get( self._check_authorization(user, Operation.Read, f"Subject:{subject}") try: - version = Versioner.V(version) - subject_data = self.schema_registry.subject_version_get(subject, version) + subject_data = self.schema_registry.subject_version_get(subject, Versioner.V(version)) self.r(subject_data["schema"], content_type) except InvalidVersion: self._invalid_version(content_type, version) @@ -918,8 +916,7 @@ async def subject_version_referencedby_get(self, content_type, *, subject, versi self._check_authorization(user, Operation.Read, f"Subject:{subject}") try: - version = Versioner.V(version) - referenced_by = await self.schema_registry.subject_version_referencedby_get(subject, version) + referenced_by = await self.schema_registry.subject_version_referencedby_get(subject, Versioner.V(version)) except (SubjectNotFoundException, SchemasNotFoundException): self.r( body={