From 0816ea3bde7ec0b667b3d6b62935ebc2d7228adf Mon Sep 17 00:00:00 2001 From: aandres Date: Wed, 3 Jul 2024 15:34:25 +0100 Subject: [PATCH 1/2] Add log message for resolved offsets --- beavers/kafka.py | 10 +++++++++- scripts/README.md | 3 ++- tests/test_kafka.py | 43 +++++++++++++++++++++++++++++++------------ 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/beavers/kafka.py b/beavers/kafka.py index f9dcf1e..101f6c4 100644 --- a/beavers/kafka.py +++ b/beavers/kafka.py @@ -275,6 +275,15 @@ def create( consumer = confluent_kafka.Consumer(consumer_config) cutoff = pd.Timestamp.utcnow() offsets = _resolve_topics_offsets(consumer, source_topics, cutoff, timeout) + for tp, (start, end) in offsets.items(): + logger.debug( + "Replay offsets: %s:%s %d -> %d = %d", + tp.topic, + tp.partition, + start, + end, + max(0, end - start), + ) consumer.assign( [ confluent_kafka.TopicPartition( @@ -610,7 +619,6 @@ def _resolve_topic_offsets( ) for p in topic_meta_data.partitions.values() } - if source_topic.offset_policy == OffsetPolicy.LATEST: return {tp: (end, end - 1) for tp, (start, end) in watermarks.items()} elif source_topic.offset_policy == OffsetPolicy.EARLIEST: diff --git a/scripts/README.md b/scripts/README.md index 4df62d8..6dff812 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -18,6 +18,7 @@ kafka-console-producer --topic right --bootstrap-server=localhost:9092 kafka-console-consumer \ --topic=both \ --bootstrap-server=localhost:9092 \ - --property print.key=true + --property print.key=true \ + --from-beginning python -m scripts.kafka_test_bench --batch-size=2 ``` diff --git a/tests/test_kafka.py b/tests/test_kafka.py index c6e0a5d..3d6b886 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -1,11 +1,12 @@ """ Unit tests for tradewell.beavers.kafka """ + import dataclasses import io import logging import queue -from typing import AnyStr, Callable, Optional, Tuple, Union +from typing import AnyStr, Callable, Optional, Sequence, Tuple, Union import confluent_kafka import mock @@ -25,6 +26,7 @@ _ConsumerManager, _get_message_ns, _get_previous_start_of_day, + _PartitionInfo, _poll_all, _ProducerManager, _resolve_offset_for_time, @@ -940,14 +942,10 @@ def test_poll_all(): class PassThroughKafkaMessageDeserializer: messages: list[confluent_kafka.Message] = dataclasses.field(default_factory=list) - def append_message(self, message: confluent_kafka.Message): - self.messages.append(message) - - def flush(self) -> list[confluent_kafka.Message]: - """Convert queued messages to data""" - results = self.messages.copy() - self.messages.clear() - return results + def __call__( + self, messages: Sequence[confluent_kafka.Message] + ) -> Sequence[confluent_kafka.Message]: + return messages def test_from_xxx(): @@ -1336,10 +1334,31 @@ def test_producer_manager_create(): assert producer_manager._producer is not None -def test_consumer_manager_create(): - with mock.patch("confluent_kafka.Consumer", autospec=True): - consumer_manager = _ConsumerManager.create({}, [], 500, timeout=None) +def test_consumer_manager_create_with_topics(): + with mock.patch( + "confluent_kafka.Consumer", + new=lambda *_: MockConsumer( + {"topic-1": topic_metadata("topic-1", [partition_metadata(0)])} + ), + ): + consumer_manager = _ConsumerManager.create( + {}, + [ + SourceTopic( + "topic-1", + PassThroughKafkaMessageDeserializer(), + OffsetPolicy.LATEST, + ) + ], + 500, + timeout=None, + ) assert consumer_manager._consumer is not None + assert consumer_manager._partition_info == { + TopicPartition("topic-1", 0): _PartitionInfo( + current_offset=0, live_offset=-1, timestamp_ns=0, paused=False + ) + } def test_consumer_manager_create_partition_eof(): From 13e4e6c1f2ceb5b3ba6d83fcecab536e92eb5fec Mon Sep 17 00:00:00 2001 From: aandres Date: Wed, 3 Jul 2024 16:00:13 +0100 Subject: [PATCH 2/2] Add change logs --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b183f74..fdadfc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Add constructor to mock consumer ([370d5d6](https://github.com/tradewelltech/beavers/commit/370d5d68eb60662a110026ab7844fc3d9c6bf59b) by aandres). +- Add log message for resolved offsets ([0816ea3](https://github.com/tradewelltech/beavers/commit/0816ea3bde7ec0b667b3d6b62935ebc2d7228adf) by aandres). ### Fixed