Skip to content

Commit

Permalink
Merge pull request #941 from Aiven-Open/nosahama/avro-message-handlin…
Browse files Browse the repository at this point in the history
…g-error-tests

tests,schema_reader: kafka message handling error tests
  • Loading branch information
eliax1996 authored Sep 13, 2024
2 parents 800f3cb + e3899bd commit 661a7a4
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 13 deletions.
18 changes: 9 additions & 9 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@
from karapace.protobuf.schema import ProtobufSchema


class IllegalStateException(Exception):
pass


class IllegalArgumentException(Exception):
pass


class Error(Exception):
"""Base class for errors in this module."""

Expand All @@ -28,10 +20,18 @@ class ProtobufException(Error):
"""Generic Protobuf schema error."""


class ProtobufTypeException(Error):
class ProtobufTypeException(ProtobufException):
"""Generic Protobuf type error."""


class IllegalStateException(ProtobufException):
pass


class IllegalArgumentException(ProtobufException):
pass


class ProtobufUnresolvedDependencyException(ProtobufException):
"""a Protobuf schema has unresolved dependency"""

Expand Down
8 changes: 4 additions & 4 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from karapace.config import Config
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException
from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
Expand Down Expand Up @@ -399,7 +399,7 @@ def handle_messages(self) -> None:

try:
self.handle_msg(key, value)
except (InvalidSchema, TypeError) as exc:
except (InvalidSchema, InvalidVersion, TypeError) as exc:
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue
finally:
Expand Down Expand Up @@ -486,8 +486,8 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None:

def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
if value is None:
LOG.warning("DELETE_SUBJECT record doesnt have a value, should have")
return
LOG.warning("DELETE_SUBJECT record does not have a value, should have")
raise ValueError("DELETE_SUBJECT record does not have a value, should have")

subject = value["subject"]
version = Version(value["version"])
Expand Down
197 changes: 197 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.errors import CorruptKafkaRecordException, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
Expand All @@ -18,11 +19,15 @@
KafkaSchemaReader,
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP,
MAX_MESSAGES_TO_CONSUME_ON_STARTUP,
MessageType,
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from tests.base_testcase import BaseTestCase
from tests.utils import schema_protobuf_invalid
from typing import Callable, List, Tuple
from unittest.mock import Mock

import confluent_kafka
Expand Down Expand Up @@ -318,3 +323,195 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None:
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have"


@dataclass
class KafkaMessageHandlingErrorTestCase(BaseTestCase):
key: bytes
value: bytes
schema_type: SchemaType
message_type: MessageType
expected_error: ShutdownException
expected_log_message: str


@pytest.fixture(name="schema_reader_with_consumer_messages_factory")
def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[Tuple[List[Message]]], KafkaSchemaReader]:
def factory(consumer_messages: Tuple[List[Message]]) -> KafkaSchemaReader:
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)

consumer_mock.consume.side_effect = consumer_messages
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 4)

# Update the config to run the schema reader in strict mode so errors can be raised
config = DEFAULTS.copy()
config["kafka_schema_reader_strict_mode"] = True

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=config,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP
return schema_reader

return factory


@pytest.fixture(name="message_factory")
def fixture_message_factory() -> Callable[[bytes, bytes, int], Message]:
def factory(key: bytes, value: bytes, offset: int = 1) -> Message:
message = Mock(spec=Message)
message.key.return_value = key
message.value.return_value = value
message.offset.return_value = offset
message.error.return_value = None
return message

return factory


@pytest.mark.parametrize(
"test_case",
[
KafkaMessageHandlingErrorTestCase(
test_name="Message key is not valid JSON",
key=b'{subject1::::"test""version":1"magic":1}',
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.key() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Keytype is missing from message key",
key=b'{"subject":"test","version":1,"magic":1}',
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'subject': 'test', 'version': 1, 'magic': 1}-"
"{'value': 'value does not matter at this stage, just correct JSON'} "
"has been discarded because doesn't contain the `keytype` key in the key"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Keytype is invalid on message key",
key=b'{"keytype":"NOT_A_VALID_KEY_TYPE","subject":"test","version":1,"magic":1}',
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=None,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'NOT_A_VALID_KEY_TYPE', 'subject': 'test', 'version': 1, 'magic': 1}-"
"{'value': 'value does not matter at this stage, just correct JSON'} "
"has been discarded because the NOT_A_VALID_KEY_TYPE is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Config message value is not valid JSON",
key=b'{"keytype":"CONFIG","subject":null,"magic":0}',
value=(b'no-valid-jason"compatibilityLevel": "BACKWARD""'),
schema_type=None,
message_type=MessageType.config,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.value() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Config message value is not valid config setting",
key=b'{"keytype":"CONFIG","subject":null,"magic":0}',
value=b'{"not_the_key_name":"INVALID_CONFIG"}',
schema_type=None,
message_type=MessageType.config,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'CONFIG', 'subject': None, 'magic': 0}-"
"{'not_the_key_name': 'INVALID_CONFIG'} has been discarded because the CONFIG is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Version in schema message value is not valid",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"subject": "test", "version": "invalid-version", "id": 1, "deleted": false,'
b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": '
b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}'
),
schema_type=SchemaType.AVRO,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'SCHEMA', 'subject': 'test', 'version': 1, 'magic': 1}-"
"{'subject': 'test', 'version': 'invalid-version', 'id': 1, 'deleted': False, 'schema': "
'\'{"name": "test", "type": "record", "fields": [{"name": "test_field", "type": ["string", "int"]}]}\'} '
"has been discarded because the SCHEMA is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Message value is not valid JSON",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'no-valid-json"version": 1, "id": 1, "deleted": false,'
b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": '
b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}'
),
schema_type=SchemaType.AVRO,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.value() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Delete subject message value is missing `subject` field",
key=b'{"keytype":"DELETE_SUBJECT","subject":"test","version":1,"magic":1}',
value=b'{"not-subject-key":"test","version":1}',
schema_type=None,
message_type=MessageType.delete_subject,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'DELETE_SUBJECT', 'subject': 'test', 'version': 1, 'magic': 1}-"
"{'not-subject-key': 'test', 'version': 1} has been discarded because the DELETE_SUBJECT is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Protobuf schema is invalid",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":'
+ json.dumps(schema_protobuf_invalid).encode()
+ b"}"
),
schema_type=SchemaType.PROTOBUF,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Schema is not valid ProtoBuf definition",
),
],
)
def test_message_error_handling(
caplog: LogCaptureFixture,
test_case: KafkaMessageHandlingErrorTestCase,
schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader],
message_factory: Callable[[bytes, bytes, int], Message],
) -> None:
message = message_factory(key=test_case.key, value=test_case.value)
consumer_messages = ([message],)
schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages)

with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"):
with pytest.raises(test_case.expected_error):
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready
for log in caplog.records:
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
assert log.message == test_case.expected_log_message
16 changes: 16 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@
{"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}},
]

schema_protobuf_invalid = """
|o3"
|
|opti -- om.codingharbour.protobuf";
|option java_outer_classname = "TestEnumOrder";
|
|message Message {
| int32
| speed =;
|}
|Enum
| HIGH = 0
| MIDDLE = ;
"""
schema_protobuf_invalid = trim_margin(schema_protobuf_invalid)

schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)}

second_schema_json = json.dumps(
Expand Down

0 comments on commit 661a7a4

Please sign in to comment.