Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests,schema_reader: kafka message handling error tests #941

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@
from karapace.protobuf.schema import ProtobufSchema


class IllegalStateException(Exception):
pass


class IllegalArgumentException(Exception):
pass


class Error(Exception):
"""Base class for errors in this module."""

Expand All @@ -28,10 +20,18 @@ class ProtobufException(Error):
"""Generic Protobuf schema error."""


class ProtobufTypeException(Error):
class ProtobufTypeException(ProtobufException):
"""Generic Protobuf type error."""


class IllegalStateException(ProtobufException):
pass


class IllegalArgumentException(ProtobufException):
pass


class ProtobufUnresolvedDependencyException(ProtobufException):
"""a Protobuf schema has unresolved dependency"""

Expand Down
8 changes: 4 additions & 4 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from karapace.config import Config
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException
from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
Expand Down Expand Up @@ -399,7 +399,7 @@ def handle_messages(self) -> None:

try:
self.handle_msg(key, value)
except (InvalidSchema, TypeError) as exc:
except (InvalidSchema, InvalidVersion, TypeError) as exc:
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue
finally:
Expand Down Expand Up @@ -486,8 +486,8 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None:

def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
if value is None:
LOG.warning("DELETE_SUBJECT record doesnt have a value, should have")
return
LOG.warning("DELETE_SUBJECT record does not have a value, should have")
raise ValueError("DELETE_SUBJECT record does not have a value, should have")

subject = value["subject"]
version = Version(value["version"])
Expand Down
197 changes: 197 additions & 0 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
from karapace.errors import CorruptKafkaRecordException, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.consumer import KafkaConsumer
from karapace.key_format import KeyFormatter
Expand All @@ -18,11 +19,15 @@
KafkaSchemaReader,
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP,
MAX_MESSAGES_TO_CONSUME_ON_STARTUP,
MessageType,
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from tests.base_testcase import BaseTestCase
from tests.utils import schema_protobuf_invalid
from typing import Callable, List, Tuple
from unittest.mock import Mock

import confluent_kafka
Expand Down Expand Up @@ -318,3 +323,195 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None:
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have"


@dataclass
class KafkaMessageHandlingErrorTestCase(BaseTestCase):
key: bytes
value: bytes
schema_type: SchemaType
message_type: MessageType
expected_error: ShutdownException
expected_log_message: str


@pytest.fixture(name="schema_reader_with_consumer_messages_factory")
def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[Tuple[List[Message]]], KafkaSchemaReader]:
def factory(consumer_messages: Tuple[List[Message]]) -> KafkaSchemaReader:
key_formatter_mock = Mock(spec=KeyFormatter)
consumer_mock = Mock(spec=KafkaConsumer)

consumer_mock.consume.side_effect = consumer_messages
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 4)

# Update the config to run the schema reader in strict mode so errors can be raised
config = DEFAULTS.copy()
config["kafka_schema_reader_strict_mode"] = True

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=config,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP
return schema_reader

return factory


@pytest.fixture(name="message_factory")
def fixture_message_factory() -> Callable[[bytes, bytes, int], Message]:
def factory(key: bytes, value: bytes, offset: int = 1) -> Message:
message = Mock(spec=Message)
message.key.return_value = key
message.value.return_value = value
message.offset.return_value = offset
message.error.return_value = None
return message

return factory


@pytest.mark.parametrize(
"test_case",
[
KafkaMessageHandlingErrorTestCase(
test_name="Message key is not valid JSON",
key=b'{subject1::::"test""version":1"magic":1}',
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.key() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Keytype is missing from message key",
key=b'{"subject":"test","version":1,"magic":1}',
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'subject': 'test', 'version': 1, 'magic': 1}-"
"{'value': 'value does not matter at this stage, just correct JSON'} "
"has been discarded because doesn't contain the `keytype` key in the key"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Keytype is invalid on message key",
key=b'{"keytype":"NOT_A_VALID_KEY_TYPE","subject":"test","version":1,"magic":1}',
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=None,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'NOT_A_VALID_KEY_TYPE', 'subject': 'test', 'version': 1, 'magic': 1}-"
"{'value': 'value does not matter at this stage, just correct JSON'} "
"has been discarded because the NOT_A_VALID_KEY_TYPE is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Config message value is not valid JSON",
key=b'{"keytype":"CONFIG","subject":null,"magic":0}',
value=(b'no-valid-jason"compatibilityLevel": "BACKWARD""'),
schema_type=None,
message_type=MessageType.config,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.value() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Config message value is not valid config setting",
key=b'{"keytype":"CONFIG","subject":null,"magic":0}',
value=b'{"not_the_key_name":"INVALID_CONFIG"}',
schema_type=None,
message_type=MessageType.config,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'CONFIG', 'subject': None, 'magic': 0}-"
"{'not_the_key_name': 'INVALID_CONFIG'} has been discarded because the CONFIG is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Version in schema message value is not valid",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"subject": "test", "version": "invalid-version", "id": 1, "deleted": false,'
b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": '
b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}'
),
schema_type=SchemaType.AVRO,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'SCHEMA', 'subject': 'test', 'version': 1, 'magic': 1}-"
"{'subject': 'test', 'version': 'invalid-version', 'id': 1, 'deleted': False, 'schema': "
'\'{"name": "test", "type": "record", "fields": [{"name": "test_field", "type": ["string", "int"]}]}\'} '
"has been discarded because the SCHEMA is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Message value is not valid JSON",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'no-valid-json"version": 1, "id": 1, "deleted": false,'
b'"schema": "{\\"name\\": \\"test\\", \\"type\\": \\"record\\", \\"fields\\": '
b'[{\\"name\\": \\"test_field\\", \\"type\\": [\\"string\\", \\"int\\"]}]}"}'
),
schema_type=SchemaType.AVRO,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.value() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Delete subject message value is missing `subject` field",
key=b'{"keytype":"DELETE_SUBJECT","subject":"test","version":1,"magic":1}',
value=b'{"not-subject-key":"test","version":1}',
schema_type=None,
message_type=MessageType.delete_subject,
expected_error=CorruptKafkaRecordException,
expected_log_message=(
"The message {'keytype': 'DELETE_SUBJECT', 'subject': 'test', 'version': 1, 'magic': 1}-"
"{'not-subject-key': 'test', 'version': 1} has been discarded because the DELETE_SUBJECT is not managed"
),
),
KafkaMessageHandlingErrorTestCase(
test_name="Protobuf schema is invalid",
key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}',
value=(
b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":'
+ json.dumps(schema_protobuf_invalid).encode()
+ b"}"
),
schema_type=SchemaType.PROTOBUF,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Schema is not valid ProtoBuf definition",
),
],
)
def test_message_error_handling(
caplog: LogCaptureFixture,
test_case: KafkaMessageHandlingErrorTestCase,
schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader],
message_factory: Callable[[bytes, bytes, int], Message],
) -> None:
message = message_factory(key=test_case.key, value=test_case.value)
consumer_messages = ([message],)
schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages)

with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"):
with pytest.raises(test_case.expected_error):
schema_reader.handle_messages()

assert schema_reader.offset == 1
assert not schema_reader.ready
for log in caplog.records:
assert log.name == "karapace.schema_reader"
assert log.levelname == "WARNING"
assert log.message == test_case.expected_log_message
16 changes: 16 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@
{"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}},
]

schema_protobuf_invalid = """
|o3"
|
|opti -- om.codingharbour.protobuf";
|option java_outer_classname = "TestEnumOrder";
|
|message Message {
| int32
| speed =;
|}
|Enum
| HIGH = 0
| MIDDLE = ;
"""
schema_protobuf_invalid = trim_margin(schema_protobuf_invalid)

schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)}

second_schema_json = json.dumps(
Expand Down
Loading