Skip to content

Commit

Permalink
Fix asynch fetch polling without delay
Browse files Browse the repository at this point in the history
First fetch min bytes negative values was misconfiguration even for
kafka-python earlier, but worsened with aiokafka.  Add extra
configuration for aiokafka timeout settings tuning.

Fixes #488
  • Loading branch information
tvainika committed Nov 11, 2022
1 parent 74c39e6 commit 0b66b15
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- ``0``
- Disconnect idle consumers after timeout seconds if not used. Inactivity leads to consumer leaving consumer group and consumer state. 0 (default) means no auto-disconnect.
* - ``fetch_min_bytes``
- ``-1``
- Rest proxy consumers minimum bytes to be fetched per request. ``-1`` means no limit
- ``1``
- Rest proxy consumers minimum bytes to be fetched per request.
* - ``group_id``
- ``schema-registry``
- The Kafka group name used for selecting a master service to coordinate the storing of Schemas.
Expand Down
2 changes: 1 addition & 1 deletion karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"consumer_request_timeout_ms": 11000,
"consumer_request_max_bytes": 67108864,
"consumer_idle_disconnect_timeout": 0,
"fetch_min_bytes": -1,
"fetch_min_bytes": 1,
"group_id": "schema-registry",
"host": "127.0.0.1",
"port": 8081,
Expand Down
5 changes: 4 additions & 1 deletion karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,11 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
sasl_plain_username=self.config["sasl_plain_username"],
sasl_plain_password=self.config["sasl_plain_password"],
group_id=group_name,
fetch_min_bytes=fetch_min_bytes,
fetch_min_bytes=max(1, fetch_min_bytes), # Discard earlier negative values
fetch_max_bytes=self.config["consumer_request_max_bytes"],
fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms", 500), # Copy aiokafka default 500 ms
# This will cause delay if subscription is changed.
consumer_timeout_ms=self.config.get("consumer_timeout_ms", 200), # Copy aiokafka default 200 ms
request_timeout_ms=request_timeout_ms,
enable_auto_commit=request_data["auto.commit.enable"],
auto_offset_reset=request_data["auto.offset.reset"],
Expand Down

0 comments on commit 0b66b15

Please sign in to comment.