Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: consolidate schema version resolution and validation logic #885

Merged
merged 13 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.schema_references import Referents
from karapace.typing import ResolvedVersion

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from karapace.schema_references import Referents
from karapace.typing import Version


class VersionNotFoundException(Exception):
Expand Down Expand Up @@ -55,10 +61,10 @@ class SubjectNotSoftDeletedException(Exception):


class ReferenceExistsException(Exception):
def __init__(self, referenced_by: Referents, version: ResolvedVersion) -> None:
def __init__(self, referenced_by: Referents, version: Version) -> None:
super().__init__()
self.version = version
self.referenced_by = referenced_by
self.version = version


class SubjectSoftDeletedException(Exception):
Expand Down
26 changes: 13 additions & 13 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from __future__ import annotations

from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_models import SchemaVersion, TypedSchema, Versioner
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.typing import SchemaId, Subject, Version
from threading import Lock, RLock
from typing import Iterable, Sequence

Expand All @@ -20,7 +20,7 @@

@dataclass
class SubjectData:
schemas: dict[ResolvedVersion, SchemaVersion] = field(default_factory=dict)
schemas: dict[Version, SchemaVersion] = field(default_factory=dict)
compatibility: str | None = None


Expand All @@ -31,7 +31,7 @@ def __init__(self) -> None:
self.subjects: dict[Subject, SubjectData] = {}
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.referenced_by: dict[tuple[Subject, Version], Referents] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -100,15 +100,15 @@ def _delete_subject_from_schema_id_on_subject(self, *, subject: Subject) -> None
def _get_from_hash_cache(self, *, typed_schema: TypedSchema) -> TypedSchema:
return self._hash_to_schema.setdefault(typed_schema.fingerprint(), typed_schema)

def get_next_version(self, *, subject: Subject) -> ResolvedVersion:
return ResolvedVersion(max(self.subjects[subject].schemas) + 1)
def get_next_version(self, *, subject: Subject) -> Version:
return Versioner.V(max(self.subjects[subject].schemas) + 1)

def insert_schema_version(
self,
*,
subject: Subject,
schema_id: SchemaId,
version: ResolvedVersion,
version: Version,
deleted: bool,
schema: TypedSchema,
references: Sequence[Reference] | None,
Expand Down Expand Up @@ -217,19 +217,19 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False)
]

def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[ResolvedVersion, SchemaVersion]:
def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]:
if subject not in self.subjects:
return {}
if include_deleted:
return self.subjects[subject].schemas
with self.schema_lock_thread:
return {
version_id: schema_version
Versioner.V(version_id): schema_version
nosahama marked this conversation as resolved.
Show resolved Hide resolved
for version_id, schema_version in self.subjects[subject].schemas.items()
if schema_version.deleted is False
}

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
def delete_subject(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
if schema_version.version <= version:
Expand All @@ -241,7 +241,7 @@ def delete_subject_hard(self, *, subject: Subject) -> None:
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: ResolvedVersion) -> None:
def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)

Expand All @@ -263,15 +263,15 @@ def num_schema_versions(self) -> tuple[int, int]:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, schema_id: SchemaId) -> None:
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
else:
self.referenced_by[(subject, version)] = Referents([schema_id])

def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Referents | None:
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

Expand Down
36 changes: 33 additions & 3 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jsonschema import Draft7Validator
from jsonschema.exceptions import SchemaError
from karapace.dependency import Dependency
from karapace.errors import InvalidSchema
from karapace.errors import InvalidSchema, InvalidVersion, VersionNotFoundException
from karapace.protobuf.exception import (
Error as ProtobufError,
IllegalArgumentException,
Expand All @@ -23,7 +23,7 @@
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_references import Reference
from karapace.schema_type import SchemaType
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, SchemaId, Subject, Version, VersionTag
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Dict, Final, final, Mapping, Sequence

Expand Down Expand Up @@ -383,8 +383,38 @@ def parse(
@dataclass
class SchemaVersion:
subject: Subject
version: ResolvedVersion
version: Version
deleted: bool
schema_id: SchemaId
schema: TypedSchema
references: Sequence[Reference] | None


class Versioner:
@classmethod
def V(cls, tag: VersionTag) -> Version:
nosahama marked this conversation as resolved.
Show resolved Hide resolved
cls.validate_tag(tag=tag)
return Version(version=cls.resolve_tag(tag))

@classmethod
def validate_tag(cls, tag: VersionTag) -> None:
try:
version = cls.resolve_tag(tag=tag)
if (version < Version.MINUS_1_VERSION_TAG) or (version == 0):
raise InvalidVersion(f"Invalid version {tag}")
except ValueError as exc:
if tag != Version.LATEST_VERSION_TAG:
raise InvalidVersion(f"Invalid version {tag}") from exc

@staticmethod
def resolve_tag(tag: VersionTag) -> int:
return Version.MINUS_1_VERSION_TAG if tag == Version.LATEST_VERSION_TAG else int(tag)

@staticmethod
def from_schema_versions(schema_versions: Mapping[Version, SchemaVersion], version: Version) -> Version:
max_version = max(schema_versions)
if version.is_latest:
return max_version
if version in schema_versions and version <= max_version:
return version
raise VersionNotFoundException()
4 changes: 2 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, SchemaId, Subject, Version
from karapace.utils import json_decode, JSONDecodeError
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand Down Expand Up @@ -602,7 +602,7 @@ def remove_referenced_by(
def get_referenced_by(
self,
subject: Subject,
version: ResolvedVersion,
version: Version,
) -> Referents | None:
return self.database.get_referenced_by(subject, version)

Expand Down
12 changes: 6 additions & 6 deletions karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version
from typing import cast, List, Mapping, NewType, TypeVar

Referents = NewType("Referents", List[SchemaId])
Expand Down Expand Up @@ -36,7 +36,7 @@ class LatestVersionReference:
name: str
subject: Subject

def resolve(self, version: ResolvedVersion) -> Reference:
def resolve(self, version: Version) -> Reference:
return Reference(
name=self.name,
subject=self.subject,
Expand All @@ -48,10 +48,10 @@ def resolve(self, version: ResolvedVersion) -> Reference:
class Reference:
name: str
subject: Subject
version: ResolvedVersion
version: Version

def __post_init__(self) -> None:
assert self.version != -1
assert self.version != Version.MINUS_1_VERSION_TAG

def __repr__(self) -> str:
return f"{{name='{self.name}', subject='{self.subject}', version={self.version}}}"
Expand All @@ -68,7 +68,7 @@ def from_dict(data: JsonObject) -> Reference:
return Reference(
name=str(data["name"]),
subject=Subject(str(data["subject"])),
version=ResolvedVersion(cast(int, data["version"])),
version=Version(cast(int, data["version"])),
nosahama marked this conversation as resolved.
Show resolved Hide resolved
)


Expand All @@ -88,6 +88,6 @@ def reference_from_mapping(
else Reference(
name=name,
subject=subject,
version=ResolvedVersion(version),
version=Version(version),
)
)
Loading
Loading