Skip to content

Commit

Permalink
Fix offset resolution on end of topic
Browse files Browse the repository at this point in the history
  • Loading branch information
aandres committed Jul 1, 2024
1 parent 65296cc commit 4bc573c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
2 changes: 1 addition & 1 deletion beavers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ def _resolve_offset_for_time(
)
return {
confluent_kafka.TopicPartition(topic=tp.topic, partition=tp.partition): (
tp.offset,
tp.offset if tp.offset > 0 else (watermarks[tp][1] - 1),
watermarks[tp][1] - 1,
)
for tp in offset_for_time
Expand Down
20 changes: 20 additions & 0 deletions tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
_get_previous_start_of_day,
_poll_all,
_ProducerManager,
_resolve_offset_for_time,
_resolve_topic_offsets,
_resolve_topics_offsets,
_RuntimeSinkTopic,
Expand Down Expand Up @@ -1364,3 +1365,22 @@ def test_runtime_sink_topic():
def test_coverage():
with mock.patch("confluent_kafka.Consumer", autospec=True):
KafkaDriver.create(Dag(), {}, {}, {}, {}, 1_000)


def test_resolve_offset_for_time_minus_one():
tp1 = TopicPartition("topic-1", 0)
consumer = MockConsumer()
timestamp = pd.to_datetime("2022-01-01", utc=True)
millis = timestamp.value // 1_000_000

# If the watermark is before the given timestamp, kafka returns -1
# In this case we need to go to the end of the topic
consumer._offsets_for_time[(tp1, millis)] = TopicPartition("topic-1", 0, -1)
assert _resolve_offset_for_time(
timestamp, consumer, {tp1: (40, 80)}, timeout=1.0
) == {tp1: (79, 79)}

consumer._offsets_for_time[(tp1, millis)] = TopicPartition("topic-1", 0, 70)
assert _resolve_offset_for_time(
timestamp, consumer, {tp1: (40, 80)}, timeout=1.0
) == {tp1: (70, 79)}

0 comments on commit 4bc573c

Please sign in to comment.