Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Codec for ZStandard Compression #801

Merged
merged 3 commits into from
Dec 6, 2021
Merged

Add Codec for ZStandard Compression #801

merged 3 commits into from
Dec 6, 2021

Conversation

ods
Copy link
Collaborator

@ods ods commented Dec 4, 2021

Changes

KIP-110

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

@codecov
Copy link

codecov bot commented Dec 4, 2021

Codecov Report

Merging #801 (1e52c2c) into master (67f55c0) will decrease coverage by 0.03%.
The diff coverage is 94.54%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #801      +/-   ##
==========================================
- Coverage   97.92%   97.89%   -0.04%     
==========================================
  Files          31       31              
  Lines        5309     5358      +49     
==========================================
+ Hits         5199     5245      +46     
- Misses        110      113       +3     
Flag Coverage Δ
cext 88.54% <83.63%> (-0.11%) ⬇️
integration 97.85% <94.54%> (-0.04%) ⬇️
purepy 97.42% <94.54%> (-0.02%) ⬇️
unit 52.09% <63.63%> (+0.14%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
aiokafka/producer/sender.py 98.68% <82.35%> (-0.64%) ⬇️
aiokafka/producer/message_accumulator.py 98.90% <100.00%> (ø)
aiokafka/producer/producer.py 98.53% <100.00%> (+0.02%) ⬆️
aiokafka/protocol/produce.py 96.34% <100.00%> (+1.51%) ⬆️
aiokafka/record/default_records.py 96.08% <100.00%> (+0.07%) ⬆️
aiokafka/structs.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 67f55c0...1e52c2c. Read the comment docs.

Copy link
Member

@tvoinarovskyi tvoinarovskyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

@ods ods merged commit 5370a4c into aio-libs:master Dec 6, 2021
@ods ods deleted the zstd branch December 6, 2021 14:12
@jigar23
Copy link

jigar23 commented Jun 29, 2022

Can we please get a release out with this change? Thanks!

@qdamian qdamian mentioned this pull request Jun 29, 2022
@ods
Copy link
Collaborator Author

ods commented Jul 1, 2022

Sure, we're working on it

@DeLaboreMercurio
Copy link

We are currently running into a scenario where AIOKafka fails to consume messages when ZSTD is set as broker-level compression, instead of at consumer-level. Is there any plans to address this, @ods ? Thanks!

@ods
Copy link
Collaborator Author

ods commented Jul 4, 2022

@DeLaboreMercurio The new release is on the way.

@Rohit-Singh3
Copy link

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

error log:

Traceback (most recent call last):
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in
consume_from_topic(topic_to_consume)
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic
for message in consumer:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next
return self.next_v2()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
for record in batch:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter
self._maybe_uncompress()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress
self._assert_has_codec(compression_type)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec
raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Process finished with exit code 1

My Code for consuming a kafka topic:

from kafka import KafkaConsumer
def consume_from_topic(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
group_id='zstd-11-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True
)
try:
for message in consumer:
v = message.value
k = message.key.decode("utf-8")
log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v)
print(log)

except KeyboardInterrupt:
consumer.close()
if name == "main":
topic_to_consume = "Integrate-Package-Zstd-ESP.info"
consume_from_topic(topic_to_consume)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants