Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer fails when consuming messages compressed with ZSTD at the broker level #843

Open
DeLaboreMercurio opened this issue Jul 4, 2022 · 14 comments

Comments

@DeLaboreMercurio
Copy link

Describe the bug
When consuming messages from a broker where compression has been set at the broker level with ZSTD compression, the Consumer fails with an UnknownError. This does not happen when ZSTD is set at a producer level instead. This behaviour is observed both in the latest pypi release, as well as in the latest version from the master branch in this repository

Expected behaviour
Consumer should be able to consume messages with ZSTD compression, when its set at a broker level, just the same as when its set at a producer level.

Environment (please complete the following information):

  • aiokafka version: 0.7.2
  • kafka-python version: 2.0.2

Reproducible example
When setting up the Broker, add to the server.properties file

compression.type=zstd

On the contrary, when if compression.type is omitted in the server.properties file and instead you create a producer with zstd compression, it works fine.

@ods
Copy link
Collaborator

ods commented Jul 5, 2022

Have you tried with code from master branch? It should have been fixed in #801

@SebasYanik
Copy link

Yes, its happening with the latest version from the master branch. Just reconfirmed it. Here's the full log output.

2022-07-05 10:32:21,989 my_fancy_service.aio_kafka INFO     Starting Kafka consumer for topic test.
2022-07-05 10:32:21,989 aiokafka     DEBUG    Attempting to bootstrap via node at localhost:9092
2022-07-05 10:32:21,991 aiokafka.conn DEBUG    <AIOKafkaConnection host=localhost port=9092> Request 1: MetadataRequest_v0(topics=[])
2022-07-05 10:32:21,995 aiokafka.conn DEBUG    <AIOKafkaConnection host=localhost port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.0.126', port=9092)], topics=[(error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:21,996 aiokafka.cluster DEBUG    Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2022-07-05 10:32:21,996 aiokafka.conn DEBUG    Closing connection at localhost:9092
2022-07-05 10:32:21,996 aiokafka     DEBUG    Received cluster metadata: ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2022-07-05 10:32:21,996 aiokafka     DEBUG    Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:21,997 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-05 10:32:21,998 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-05 10:32:21,999 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 2: MetadataRequest_v0(topics=[])
2022-07-05 10:32:22,001 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.0.126', port=9092)], topics=[(error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:22,002 aiokafka.conn DEBUG    Closing connection at 192.168.0.126:9092
2022-07-05 10:32:22,002 aiokafka.consumer.group_coordinator INFO     Metadata for topic has changed from {} to {'test': 1}. 
2022-07-05 10:32:22,003 aiokafka.consumer.fetcher DEBUG    Updating fetch positions for partitions [TopicPartition(topic='test', partition=0)]
2022-07-05 10:32:22,003 aiokafka     DEBUG    Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:22,004 aiokafka.consumer.fetcher DEBUG    No committed offset found for TopicPartition(topic='test', partition=0)
2022-07-05 10:32:22,004 aiokafka.consumer.fetcher DEBUG    Resetting offset for partition TopicPartition(topic='test', partition=0) using latest strategy.
2022-07-05 10:32:22,004 aiokafka     DEBUG    Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:22,005 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-05 10:32:22,006 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-05 10:32:22,006 aiokafka     DEBUG    Sending metadata request MetadataRequest_v1(topics=['test']) to node 0
2022-07-05 10:32:22,006 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 2: MetadataRequest_v1(topics=['test'])
2022-07-05 10:32:22,007 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 3: OffsetRequest_v1(replica_id=-1, topics=[(topic='test', partitions=[(partition=0, timestamp=-1)])])
2022-07-05 10:32:22,007 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 2: MetadataResponse_v1(brokers=[(node_id=0, host='192.168.0.126', port=9092, rack=None)], controller_id=0, topics=[(error_code=0, topic='test', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:22,007 aiokafka.cluster DEBUG    Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-05 10:32:22,013 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 3: OffsetResponse_v1(topics=[(topic='test', partitions=[(partition=0, error_code=0, timestamp=-1, offset=323)])])
2022-07-05 10:32:22,013 aiokafka.consumer.fetcher DEBUG    Handling ListOffsetResponse response for TopicPartition(topic='test', partition=0). Fetched offset 323, timestamp -1
2022-07-05 10:32:22,013 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:22,013 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 4: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:32,018 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 4: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:32,018 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:32:32,019 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:32,020 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 5: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:42,023 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 5: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:42,023 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:32:42,024 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:42,024 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 6: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:52,027 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 6: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:52,027 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:32:52,028 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:52,029 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:33:02,033 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 7: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:33:02,033 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:33:02,034 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:33:02,034 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 8: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])

@DeLaboreMercurio
Copy link
Author

Do we have an update on this?

@bigEvilBanana
Copy link

Hey guys! I'm just joining the club with the same issue. Any updates?

@theultimate1
Copy link

Is anyone else working on this because I am trying to integrate this into faust but if it's not added here. I will work on first adding support here and then integrating it into faust later

@theultimate1
Copy link

Hey, so the zstd error comes because of a fetch request versioning error. Kafka python and by extension this package sends fetch request version 4 and ZSTD support requires version 10 at least. this requires the need for fetch sessions,removed_topics and log_start_offset,current_leader_epoch to be added to the fetch request. I already tested to see if this issue becomes resolved when upgrading the version with default values for those parameters. It does work. I plan to add these features to kafka-python to put into a pull request asap. However, in aiokafka's case I just wanted to check in to see if fetch sessions will interfere the asynchronous nature of this module. this KIP has information on fetch sessions. I plan to see for myself if it will interfere anyway but it would be much more efficient if someone can do the check for me.

@Rohit-Singh3
Copy link

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

error log:

Traceback (most recent call last):
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in
consume_from_topic(topic_to_consume)
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic
for message in consumer:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next
return self.next_v2()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
for record in batch:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter
self._maybe_uncompress()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress
self._assert_has_codec(compression_type)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec
raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Process finished with exit code 1

My Code for consuming a kafka topic:

from kafka import KafkaConsumer
def consume_from_topic(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
group_id='zstd-11-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True
)
try:
for message in consumer:
v = message.value
k = message.key.decode("utf-8")
log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v)
print(log)

except KeyboardInterrupt:
consumer.close()
if name == "main":
topic_to_consume = "Integrate-Package-Zstd-ESP.info"
consume_from_topic(topic_to_consume)

@ods
Copy link
Collaborator

ods commented Aug 6, 2023

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Did you specify extra when installing aiokafka? Either aiokafka[zstd] of aiokafka[all] should be used to support ZStandard codec.

@bcwalrus
Copy link

bcwalrus commented Oct 9, 2023

@theultimate1 I hit the same problem. Your approach to update Fetch to v10 is spot on. Wondering if you could find time to make a PR soon? Many thanks!

@theultimate1
Copy link

theultimate1 commented Oct 9, 2023

I will try to get some actual work done for this. been busy lol. This is definitely on my todo.

@davidvanwyk
Copy link

@theultimate1 - if I can lend a hand let me know (but you'll need to brief me on what needs doing/what you've discovered). This is currently a slight blocker for ideal operation on my side.

@theultimate1
Copy link

yup I am cleaning up right now. will put in a PR by this week

@vsel
Copy link

vsel commented Jul 18, 2024

@theultimate1 Hi! Is your PR was merged?

@trim21
Copy link

trim21 commented Sep 25, 2024

if client.api_version >= (0, 11):
req_version = 4
elif client.api_version >= (0, 10, 1):
req_version = 3
elif client.api_version >= (0, 10):
req_version = 2
else:
req_version = 1
self._fetch_request_class = FetchRequest[req_version]

need to support at least fetch request v10 to consume zstd message

wbarnha/kafka-python-ng#78

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants