Skip to content

Commit

Permalink
Fix no-payload commit in consumer
Browse files Browse the repository at this point in the history
The confluent-kafka `Consumer.commit` method can be called with no
parameters, then the current partition assignment's offsets are used
(see the documentation at
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.commit).

The previous behaviour was erroneous, raising a `TypeError`, thus an
HTTP 500 when in the REST proxy's consumer manager a commit is performed
with an empty payload.
  • Loading branch information
Mátyás Kuti committed Mar 5, 2024
1 parent 4ae29e6 commit 6ffb30a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
5 changes: 4 additions & 1 deletion karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ def commit( # type: ignore[override]
if message is not None:
return super().commit(message=message, asynchronous=False)

return super().commit(offsets=offsets, asynchronous=False)
if offsets is not None:
return super().commit(offsets=offsets, asynchronous=False)

return super().commit(asynchronous=False)
except KafkaException as exc:
raise_from_kafkaexception(exc)

Expand Down
25 changes: 25 additions & 0 deletions tests/integration/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ def test_commit_offsets(
assert committed_partition.partition == 0
assert committed_partition.offset == 2

def test_commit_offsets_empty(
self,
producer: KafkaProducer,
consumer: KafkaConsumer,
new_topic: NewTopic,
) -> None:
consumer.subscribe([new_topic.topic])
first_fut = producer.send(new_topic.topic)
second_fut = producer.send(new_topic.topic)
producer.flush()
first_fut.result()
second_fut.result()
consumer.poll(timeout=POLL_TIMEOUT_S)
consumer.poll(timeout=POLL_TIMEOUT_S)

[topic_partition] = consumer.commit(offsets=None, message=None) # default parameters
[committed_partition] = consumer.committed([TopicPartition(new_topic.topic, partition=0)])

assert topic_partition.topic == new_topic.topic
assert topic_partition.partition == 0
assert topic_partition.offset == 2
assert committed_partition.topic == new_topic.topic
assert committed_partition.partition == 0
assert committed_partition.offset == 2

def test_commit_raises_for_unknown_partition(
self,
consumer: KafkaConsumer,
Expand Down

0 comments on commit 6ffb30a

Please sign in to comment.