Skip to content

Commit

Permalink
refactor: use new strongly typed Version
Browse files Browse the repository at this point in the history
  • Loading branch information
nosahama committed Jun 4, 2024
1 parent 6570e75 commit 9c02e37
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 196 deletions.
4 changes: 2 additions & 2 deletions karapace/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from karapace.schema_references import Reference
from karapace.typing import JsonData, Subject
from karapace.typing import JsonData, Subject, Version
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand All @@ -26,7 +26,7 @@ def __init__(
self,
name: str,
subject: Subject,
version: int,
version: Version,
target_schema: ValidatedTypedSchema,
) -> None:
self.name = name
Expand Down
13 changes: 10 additions & 3 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.schema_references import Referents

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from karapace.schema_references import Referents
from karapace.typing import Version


class VersionNotFoundException(Exception):
Expand Down Expand Up @@ -54,10 +61,10 @@ class SubjectNotSoftDeletedException(Exception):


class ReferenceExistsException(Exception):
def __init__(self, referenced_by: Referents, version: int) -> None:
def __init__(self, referenced_by: Referents, version: Version) -> None:
super().__init__()
self.version = version
self.referenced_by = referenced_by
self.version = version


class SubjectSoftDeletedException(Exception):
Expand Down
26 changes: 13 additions & 13 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from __future__ import annotations

from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_models import SchemaVersion, TypedSchema, Versioner
from karapace.schema_references import Reference, Referents
from karapace.typing import SchemaId, Subject
from karapace.typing import SchemaId, Subject, Version
from threading import Lock, RLock
from typing import Iterable, Sequence

Expand All @@ -20,7 +20,7 @@

@dataclass
class SubjectData:
schemas: dict[int, SchemaVersion] = field(default_factory=dict)
schemas: dict[Version, SchemaVersion] = field(default_factory=dict)
compatibility: str | None = None


Expand All @@ -31,7 +31,7 @@ def __init__(self) -> None:
self.subjects: dict[Subject, SubjectData] = {}
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, int], Referents] = {}
self.referenced_by: dict[tuple[Subject, Version], Referents] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -100,15 +100,15 @@ def _delete_subject_from_schema_id_on_subject(self, *, subject: Subject) -> None
def _get_from_hash_cache(self, *, typed_schema: TypedSchema) -> TypedSchema:
return self._hash_to_schema.setdefault(typed_schema.fingerprint(), typed_schema)

def get_next_version(self, *, subject: Subject) -> int:
return max(self.subjects[subject].schemas) + 1
def get_next_version(self, *, subject: Subject) -> Version:
return Versioner.V(max(self.subjects[subject].schemas) + 1)

def insert_schema_version(
self,
*,
subject: Subject,
schema_id: SchemaId,
version: int,
version: Version,
deleted: bool,
schema: TypedSchema,
references: Sequence[Reference] | None,
Expand Down Expand Up @@ -217,19 +217,19 @@ def find_subjects(self, *, include_deleted: bool) -> list[Subject]:
subject for subject in self.subjects if self.find_subject_schemas(subject=subject, include_deleted=False)
]

def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[int, SchemaVersion]:
def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> dict[Version, SchemaVersion]:
if subject not in self.subjects:
return {}
if include_deleted:
return self.subjects[subject].schemas
with self.schema_lock_thread:
return {
version_id: schema_version
Versioner.V(version_id): schema_version
for version_id, schema_version in self.subjects[subject].schemas.items()
if schema_version.deleted is False
}

def delete_subject(self, *, subject: Subject, version: int) -> None:
def delete_subject(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
if schema_version.version <= version:
Expand All @@ -241,7 +241,7 @@ def delete_subject_hard(self, *, subject: Subject) -> None:
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: int) -> None:
def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)

Expand All @@ -263,15 +263,15 @@ def num_schema_versions(self) -> tuple[int, int]:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: int, schema_id: SchemaId) -> None:
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
else:
self.referenced_by[(subject, version)] = Referents([schema_id])

def get_referenced_by(self, subject: Subject, version: int) -> Referents | None:
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

Expand Down
36 changes: 33 additions & 3 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jsonschema import Draft7Validator
from jsonschema.exceptions import SchemaError
from karapace.dependency import Dependency
from karapace.errors import InvalidSchema
from karapace.errors import InvalidSchema, InvalidVersion, VersionNotFoundException
from karapace.protobuf.exception import (
Error as ProtobufError,
IllegalArgumentException,
Expand All @@ -23,7 +23,7 @@
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_references import Reference
from karapace.schema_type import SchemaType
from karapace.typing import JsonObject, SchemaId, Subject
from karapace.typing import JsonObject, SchemaId, Subject, Version, VersionTag
from karapace.utils import assert_never, json_decode, json_encode, JSONDecodeError
from typing import Any, cast, Dict, Final, final, Mapping, Sequence

Expand Down Expand Up @@ -383,8 +383,38 @@ def parse(
@dataclass
class SchemaVersion:
subject: Subject
version: int
version: Version
deleted: bool
schema_id: SchemaId
schema: TypedSchema
references: Sequence[Reference] | None


class Versioner:
@classmethod
def V(cls, tag: VersionTag) -> Version:
cls.validate_tag(tag=tag)
return Version(version=cls.resolve_tag(tag))

@classmethod
def validate_tag(cls, tag: VersionTag) -> None:
try:
version = cls.resolve_tag(tag=tag)
if (version < Version.MINUS_1_VERSION_TAG) or (version == 0):
raise InvalidVersion(f"Invalid version {tag}")
except ValueError as exc:
if tag != Version.LATEST_VERSION_TAG:
raise InvalidVersion(f"Invalid version {tag}") from exc

@staticmethod
def resolve_tag(tag: VersionTag) -> int:
return Version.MINUS_1_VERSION_TAG if tag == Version.LATEST_VERSION_TAG else int(tag)

@staticmethod
def from_schema_versions(schema_versions: Mapping[Version, SchemaVersion], version: Version) -> Version:
max_version = max(schema_versions)
if version.is_latest:
return max_version
if version in schema_versions and version <= max_version:
return version
raise VersionNotFoundException()
4 changes: 2 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, Subject
from karapace.typing import JsonObject, SchemaId, Subject, Version
from karapace.utils import json_decode, JSONDecodeError
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand Down Expand Up @@ -602,7 +602,7 @@ def remove_referenced_by(
def get_referenced_by(
self,
subject: Subject,
version: int,
version: Version,
) -> Referents | None:
return self.database.get_referenced_by(subject, version)

Expand Down
12 changes: 6 additions & 6 deletions karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData, JsonObject, SchemaId, Subject
from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version
from typing import cast, List, Mapping, NewType, TypeVar

Referents = NewType("Referents", List[SchemaId])
Expand Down Expand Up @@ -36,7 +36,7 @@ class LatestVersionReference:
name: str
subject: Subject

def resolve(self, version: int) -> Reference:
def resolve(self, version: Version) -> Reference:
return Reference(
name=self.name,
subject=self.subject,
Expand All @@ -48,10 +48,10 @@ def resolve(self, version: int) -> Reference:
class Reference:
name: str
subject: Subject
version: int
version: Version

def __post_init__(self) -> None:
assert self.version != -1
assert self.version != Version.MINUS_1_VERSION_TAG

def __repr__(self) -> str:
return f"{{name='{self.name}', subject='{self.subject}', version={self.version}}}"
Expand All @@ -68,7 +68,7 @@ def from_dict(data: JsonObject) -> Reference:
return Reference(
name=str(data["name"]),
subject=Subject(str(data["subject"])),
version=int(cast(int, data["version"])),
version=Version(cast(int, data["version"])),
)


Expand All @@ -88,6 +88,6 @@ def reference_from_mapping(
else Reference(
name=name,
subject=subject,
version=int(version),
version=Version(version),
)
)
28 changes: 14 additions & 14 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from karapace.master_coordinator import MasterCoordinator
from karapace.messaging import KarapaceProducer
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Version
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, Mode, SchemaId, Subject
from karapace.typing import JsonObject, Mode, SchemaId, Subject, Version
from typing import Sequence

import asyncio
Expand Down Expand Up @@ -127,7 +127,7 @@ def schemas_get(self, schema_id: SchemaId, *, fetch_max_id: bool = False) -> Typ

return schema

async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[int]:
async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[Version]:
async with self.schema_lock:
schema_versions = self.subject_get(subject, include_deleted=True)

Expand Down Expand Up @@ -173,7 +173,7 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[
try:
schema_versions_live = self.subject_get(subject, include_deleted=False)
except SchemasNotFoundException:
latest_version_id = int(0)
latest_version_id = Versioner.V(Version.MINUS_1_VERSION_TAG)
version_list = []
else:
version_list = list(schema_versions_live)
Expand All @@ -187,7 +187,7 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[

return version_list

async def subject_version_delete_local(self, subject: Subject, version: Version, permanent: bool) -> int:
async def subject_version_delete_local(self, subject: Subject, version: Version, permanent: bool) -> Version:
async with self.schema_lock:
schema_versions = self.subject_get(subject, include_deleted=True)
if not permanent and version.is_latest:
Expand All @@ -196,7 +196,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version,
for version_id, schema_version in schema_versions.items()
if schema_version.deleted is False
}
resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions)
resolved_version = Versioner.from_schema_versions(schema_versions=schema_versions, version=version)
schema_version = schema_versions.get(resolved_version, None)

if not schema_version:
Expand Down Expand Up @@ -224,7 +224,7 @@ async def subject_version_delete_local(self, subject: Subject, version: Version,
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
return resolved_version

def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[int, SchemaVersion]:
def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[Version, SchemaVersion]:
subject_found = self.database.find_subject(subject=subject)
if not subject_found:
raise SubjectNotFoundException()
Expand All @@ -238,7 +238,7 @@ def subject_version_get(self, subject: Subject, version: Version, *, include_del
schema_versions = self.subject_get(subject, include_deleted=include_deleted)
if not schema_versions:
raise SubjectNotFoundException()
resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions)
resolved_version = Versioner.from_schema_versions(schema_versions=schema_versions, version=version)
schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None)

if not schema_data:
Expand Down Expand Up @@ -269,7 +269,7 @@ async def subject_version_referencedby_get(
schema_versions = self.subject_get(subject, include_deleted=include_deleted)
if not schema_versions:
raise SubjectNotFoundException()
resolved_version = version.resolve_from_schema_versions(schema_versions=schema_versions)
resolved_version = Versioner.from_schema_versions(schema_versions=schema_versions, version=version)
schema_data: SchemaVersion | None = schema_versions.get(resolved_version, None)
if not schema_data:
raise VersionNotFoundException()
Expand Down Expand Up @@ -311,7 +311,7 @@ async def write_new_schema_local(

all_schema_versions = self.database.find_subject_schemas(subject=subject, include_deleted=True)
if not all_schema_versions:
version = int(1)
version = Version(1)
schema_id = self.database.get_schema_id(new_schema)
LOG.debug(
"Registering new subject: %r, id: %r with version: %r with schema %r, schema_id: %r",
Expand Down Expand Up @@ -400,8 +400,8 @@ async def write_new_schema_local(

def get_subject_versions_for_schema(
self, schema_id: SchemaId, *, include_deleted: bool = False
) -> list[dict[str, Subject | int]]:
subject_versions: list[dict[str, Subject | int]] = []
) -> list[dict[str, Subject | Version]]:
subject_versions: list[dict[str, Subject | Version]] = []
schema_versions = self.database.find_schema_versions_by_schema_id(
schema_id=schema_id,
include_deleted=include_deleted,
Expand All @@ -423,7 +423,7 @@ def send_schema_message(
subject: Subject,
schema: TypedSchema | None,
schema_id: int,
version: int,
version: Version,
deleted: bool,
references: Sequence[Reference] | None,
) -> None:
Expand Down Expand Up @@ -459,7 +459,7 @@ def resolve_references(
) -> tuple[Sequence[Reference], dict[str, Dependency]] | tuple[None, None]:
return self.schema_reader.resolve_references(references) if references else (None, None)

def send_delete_subject_message(self, subject: Subject, version: int) -> None:
def send_delete_subject_message(self, subject: Subject, version: Version) -> None:
key = {"subject": subject, "magic": 0, "keytype": "DELETE_SUBJECT"}
value = {"subject": subject, "version": version}
self.producer.send_message(key=key, value=value)
Loading

0 comments on commit 9c02e37

Please sign in to comment.