Skip to content

Commit

Permalink
Merge branch 'main' into avro_references2
Browse files Browse the repository at this point in the history
  • Loading branch information
libretto committed Sep 23, 2024
2 parents 9b6e5f1 + 42c7174 commit 653561e
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 126 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ jobs:
- run: make unit-tests
env:
COVERAGE_FILE: ".coverage.${{ matrix.python-version }}"
PYTEST_ARGS: "--cov=karapace --cov-append"
PYTEST_ARGS: "--cov=karapace --cov-append --numprocesses 4"
- run: make integration-tests
env:
COVERAGE_FILE: ".coverage.${{ matrix.python-version }}"
PYTEST_ARGS: "--cov=karapace --cov-append --random-order"
PYTEST_ARGS: "--cov=karapace --cov-append --random-order --numprocesses 4"

- name: Archive logs
uses: actions/upload-artifact@v4
Expand All @@ -56,6 +56,7 @@ jobs:
with:
name: "coverage-${{ matrix.python-version }}"
path: ".coverage.${{ matrix.python-version }}"
include-hidden-files: true

coverage:
name: Coverage report
Expand Down
1 change: 1 addition & 0 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
session_timeout_ms=config["session_timeout_ms"],
metadata_max_age_ms=config["metadata_max_age_ms"],
)
try:
Expand Down
18 changes: 9 additions & 9 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@
from karapace.protobuf.schema import ProtobufSchema


class IllegalStateException(Exception):
pass


class IllegalArgumentException(Exception):
pass


class Error(Exception):
"""Base class for errors in this module."""

Expand All @@ -28,10 +20,18 @@ class ProtobufException(Error):
"""Generic Protobuf schema error."""


class ProtobufTypeException(Error):
class ProtobufTypeException(ProtobufException):
"""Generic Protobuf type error."""


class IllegalStateException(ProtobufException):
pass


class IllegalArgumentException(ProtobufException):
pass


class ProtobufUnresolvedDependencyException(ProtobufException):
"""a Protobuf schema has unresolved dependency"""

Expand Down
8 changes: 4 additions & 4 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from karapace.config import Config
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException
from karapace.errors import InvalidReferences, InvalidSchema, InvalidVersion, ShutdownException
from karapace.in_memory_database import InMemoryDatabase
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
Expand Down Expand Up @@ -399,7 +399,7 @@ def handle_messages(self) -> None:

try:
self.handle_msg(key, value)
except (InvalidSchema, TypeError) as exc:
except (InvalidSchema, InvalidVersion, TypeError) as exc:
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue
finally:
Expand Down Expand Up @@ -486,8 +486,8 @@ def _handle_msg_config(self, key: dict, value: dict | None) -> None:

def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
if value is None:
LOG.warning("DELETE_SUBJECT record doesnt have a value, should have")
return
LOG.warning("DELETE_SUBJECT record does not have a value, should have")
raise ValueError("DELETE_SUBJECT record does not have a value, should have")

subject = value["subject"]
version = Version(value["version"])
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ flask==3.0.3
# flask-cors
# flask-login
# locust
flask-cors==4.0.1
flask-cors==4.0.2
# via locust
flask-login==0.6.3
# via locust
Expand Down
109 changes: 109 additions & 0 deletions tests/integration/backup/test_session_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from aiokafka.errors import NoBrokersAvailable
from confluent_kafka.admin import NewTopic
from karapace.backup.api import BackupVersion, create_backup
from karapace.config import Config, DEFAULTS, set_config_defaults
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka_utils import kafka_producer_from_config
from pathlib import Path
from tests.integration.conftest import create_kafka_server
from tests.integration.utils.config import KafkaDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive

import pytest

SESSION_TIMEOUT_MS = 65000
GROUP_MIN_SESSION_TIMEOUT_MS = 60000
GROUP_MAX_SESSION_TIMEOUT_MS = 70000


# use a dedicated kafka server with specific values for
# group.min.session.timeout.ms and group.max.session.timeout.ms
@pytest.fixture(scope="function", name="kafka_server_session_timeout")
def fixture_kafka_server(
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
tmp_path_factory: pytest.TempPathFactory,
):
# use custom data and log dir to avoid conflict with other kafka servers
session_datadir = tmp_path_factory.mktemp("kafka_server_min_data")
session_logdir = tmp_path_factory.mktemp("kafka_server_min_log")
kafka_config_extra = {
"group.min.session.timeout.ms": GROUP_MIN_SESSION_TIMEOUT_MS,
"group.max.session.timeout.ms": GROUP_MAX_SESSION_TIMEOUT_MS,
}
yield from create_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
kafka_config_extra,
)


def test_producer_with_custom_kafka_properties_does_not_fail(
kafka_server_session_timeout: KafkaServers,
new_topic: NewTopic,
tmp_path: Path,
) -> None:
"""
This test checks wether the custom properties are accepted by kafka.
We know by the implementation of the consumer startup code that if
`group.session.min.timeout.ms` > `session.timeout.ms` the consumer
will raise an exception during the startup.
This test ensures that the `session.timeout.ms` can be injected in
the kafka config so that the exception isn't raised
"""
config = set_config_defaults(
Config(bootstrap_uri=kafka_server_session_timeout.bootstrap_servers, session_timeout_ms=SESSION_TIMEOUT_MS)
)

admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)

with kafka_producer_from_config(config) as producer:
producer.send(
new_topic.topic,
key=b"foo",
value=b"bar",
partition=0,
headers=[
("some-header", b"some header value"),
("other-header", b"some other header value"),
],
timestamp=1683474657,
)
producer.flush()

# without performing the backup the exception isn't raised.
create_backup(
config=config,
backup_location=tmp_path / "backup",
topic_name=new_topic.topic,
version=BackupVersion.V3,
replication_factor=1,
)


def test_producer_with_custom_kafka_properties_fail(
kafka_server_session_timeout: KafkaServers,
new_topic: NewTopic,
) -> None:
"""
This test checks wether the custom properties are accepted by kafka.
We know by the implementation of the consumer startup code that if
`group.session.min.timeout.ms` > `session.timeout.ms` the consumer
will raise an exception during the startup.
This test ensures that the `session.timeout.ms` can be injected in
the kafka config so that the exception isn't raised
"""
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)

with pytest.raises(NoBrokersAvailable):
with kafka_producer_from_config(DEFAULTS) as producer:
_ = producer
Loading

0 comments on commit 653561e

Please sign in to comment.