From 20f5057c3900d296821e73cfd902e071de12fd0a Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sat, 4 Dec 2021 12:46:53 +0300 Subject: [PATCH 1/3] Add log_start_offset to message protocol parsing Based on: https://github.com/dpkp/kafka-python/pull/2020/files --- aiokafka/producer/message_accumulator.py | 8 ++- aiokafka/producer/sender.py | 32 +++++++-- aiokafka/protocol/produce.py | 80 ++++++++++++++++++++- aiokafka/structs.py | 1 + stubs/kafka/producer/future.pyi | 2 +- stubs/kafka/producer/record_accumulator.pyi | 2 +- stubs/kafka/protocol/produce.pyi | 15 ++++ tests/test_sender.py | 4 +- 8 files changed, 130 insertions(+), 14 deletions(-) diff --git a/aiokafka/producer/message_accumulator.py b/aiokafka/producer/message_accumulator.py index e720a346..a20be471 100644 --- a/aiokafka/producer/message_accumulator.py +++ b/aiokafka/producer/message_accumulator.py @@ -143,7 +143,7 @@ def append(self, key, value, timestamp_ms, _create_future=create_future, self._msg_futures.append((future, metadata)) return future - def done(self, base_offset, timestamp=None, + def done(self, base_offset, timestamp=None, log_start_offset=None, _record_metadata_class=RecordMetadata): """Resolve all pending futures""" tp = self._tp @@ -157,7 +157,8 @@ def done(self, base_offset, timestamp=None, # Set main batch future if not self.future.done(): self.future.set_result(_record_metadata_class( - topic, partition, tp, base_offset, timestamp, timestamp_type)) + topic, partition, tp, base_offset, timestamp, timestamp_type, + log_start_offset)) # Set message futures for future, metadata in self._msg_futures: @@ -169,7 +170,8 @@ def done(self, base_offset, timestamp=None, timestamp = metadata.timestamp offset = base_offset + metadata.offset future.set_result(_record_metadata_class( - topic, partition, tp, offset, timestamp, timestamp_type)) + topic, partition, tp, offset, timestamp, timestamp_type, + log_start_offset)) def done_noack(self): """ Resolve all pending futures to None """ diff --git a/aiokafka/producer/sender.py b/aiokafka/producer/sender.py index f3355b41..bc6c8f4e 100644 --- a/aiokafka/producer/sender.py +++ b/aiokafka/producer/sender.py @@ -677,7 +677,15 @@ def create_request(self): (tp.partition, batch.get_data_buffer()) ) - if self._client.api_version >= (0, 11): + if self._client.api_version >= (2, 1): + version = 7 + elif self._client.api_version >= (2, 0): + version = 6 + elif self._client.api_version >= (1, 1): + version = 5 + elif self._client.api_version >= (1, 0): + version = 4 + elif self._client.api_version >= (0, 11): version = 3 elif self._client.api_version >= (0, 10): version = 2 @@ -737,12 +745,26 @@ async def do(self, node_id): def handle_response(self, response): for topic, partitions in response.topics: for partition_info in partitions: + global_error = None + log_start_offset = None if response.API_VERSION < 2: partition, error_code, offset = partition_info # Mimic CREATE_TIME to take user provided timestamp timestamp = -1 - else: + elif 2 <= response.API_VERSION <= 4: partition, error_code, offset, timestamp = partition_info + elif 5 <= response.API_VERSION <= 7: + ( + partition, error_code, offset, timestamp, + log_start_offset + ) = partition_info + else: + # the ignored parameter is record_error of type + # list[(batch_index: int, error_message: str)] + ( + partition, error_code, offset, timestamp, + log_start_offset, _, global_error + ) = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = self._batches.get(tp) @@ -750,7 +772,7 @@ def handle_response(self, response): continue if error is Errors.NoError: - batch.done(offset, timestamp) + batch.done(offset, timestamp, log_start_offset) elif error is DuplicateSequenceNumber: # If we have received a duplicate sequence error, # it means that the sequence number has advanced @@ -761,7 +783,7 @@ def handle_response(self, response): # The only thing we can do is to return success to # the user and not return a valid offset and # timestamp. - batch.done(offset, timestamp) + batch.done(offset, timestamp, log_start_offset) elif not self._can_retry(error(), batch): if error is InvalidProducerEpoch: exc = ProducerFenced() @@ -773,7 +795,7 @@ def handle_response(self, response): else: log.warning( "Got error produce response on topic-partition" - " %s, retrying. Error: %s", tp, error) + " %s, retrying. Error: %s", tp, global_error or error) # Ok, we can retry this batch if getattr(error, "invalid_metadata", False): self._client.force_metadata_update() diff --git a/aiokafka/protocol/produce.py b/aiokafka/protocol/produce.py index eb94fcd7..3d3725ba 100644 --- a/aiokafka/protocol/produce.py +++ b/aiokafka/protocol/produce.py @@ -80,6 +80,51 @@ class ProduceResponse_v5(Response): ) +class ProduceResponse_v6(Response): + """ + The version number is bumped to indicate that on quota violation brokers + send out responses before throttling. + """ + API_KEY = 0 + API_VERSION = 6 + SCHEMA = ProduceResponse_v5.SCHEMA + + +class ProduceResponse_v7(Response): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 0 + API_VERSION = 7 + SCHEMA = ProduceResponse_v6.SCHEMA + + +class ProduceResponse_v8(Response): + """ + V8 bumped up to add two new fields record_errors offset list and error_message + (See KIP-467) + """ + API_KEY = 0 + API_VERSION = 8 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64), + ('log_start_offset', Int64)), + ('record_errors', (Array( + ('batch_index', Int32), + ('batch_index_error_message', String('utf-8')) + ))), + ('error_message', String('utf-8')) + ))), + ('throttle_time_ms', Int32) + ) + + class ProduceRequest(Request): API_KEY = 0 @@ -152,11 +197,42 @@ class ProduceRequest_v5(ProduceRequest): SCHEMA = ProduceRequest_v4.SCHEMA +class ProduceRequest_v6(ProduceRequest): + """ + The version number is bumped to indicate that on quota violation brokers + send out responses before throttling. + """ + API_VERSION = 6 + RESPONSE_TYPE = ProduceResponse_v6 + SCHEMA = ProduceRequest_v5.SCHEMA + + +class ProduceRequest_v7(ProduceRequest): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_VERSION = 7 + RESPONSE_TYPE = ProduceResponse_v7 + SCHEMA = ProduceRequest_v6.SCHEMA + + +class ProduceRequest_v8(ProduceRequest): + """ + V8 bumped up to add two new fields record_errors offset list and error_message + to PartitionResponse (See KIP-467) + """ + API_VERSION = 8 + RESPONSE_TYPE = ProduceResponse_v8 + SCHEMA = ProduceRequest_v7.SCHEMA + + ProduceRequest = [ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, - ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5 + ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5, + ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8, ] ProduceResponse = [ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, - ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5 + ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5, + ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8, ] diff --git a/aiokafka/structs.py b/aiokafka/structs.py index ea651c75..566c875c 100644 --- a/aiokafka/structs.py +++ b/aiokafka/structs.py @@ -26,6 +26,7 @@ class RecordMetadata(NamedTuple): offset: int timestamp: Optional[int] # Timestamp in millis, None for older Brokers timestamp_type: int + log_start_offset: Optional[int] KT = TypeVar("KT") diff --git a/stubs/kafka/producer/future.pyi b/stubs/kafka/producer/future.pyi index 4e2ef7bb..48ed0c8e 100644 --- a/stubs/kafka/producer/future.pyi +++ b/stubs/kafka/producer/future.pyi @@ -14,4 +14,4 @@ class FutureRecordMetadata(Future): def __init__(self, produce_future: Any, relative_offset: Any, timestamp_ms: Any, checksum: Any, serialized_key_size: Any, serialized_value_size: Any, serialized_header_size: Any) -> None: ... def get(self, timeout: Optional[Any] = ...): ... -RecordMetadata = namedtuple('RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) +RecordMetadata = namedtuple('RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) diff --git a/stubs/kafka/producer/record_accumulator.pyi b/stubs/kafka/producer/record_accumulator.pyi index 5678fa7a..6517dbe1 100644 --- a/stubs/kafka/producer/record_accumulator.pyi +++ b/stubs/kafka/producer/record_accumulator.pyi @@ -26,7 +26,7 @@ class ProducerBatch: @property def record_count(self): ... def try_append(self, timestamp_ms: Any, key: Any, value: Any, headers: Any): ... - def done(self, base_offset: Optional[Any] = ..., timestamp_ms: Optional[Any] = ..., exception: Optional[Any] = ...) -> None: ... + def done(self, base_offset: Optional[Any] = ..., timestamp_ms: Optional[Any] = ..., exception: Optional[Any] = ..., log_start_offset: Optional[int] = ..., global_error: Optional[str] = ...) -> None: ... def maybe_expire(self, request_timeout_ms: Any, retry_backoff_ms: Any, linger_ms: Any, is_full: Any): ... def in_retry(self): ... def set_retry(self) -> None: ... diff --git a/stubs/kafka/protocol/produce.pyi b/stubs/kafka/protocol/produce.pyi index 920586b9..4834399b 100644 --- a/stubs/kafka/protocol/produce.pyi +++ b/stubs/kafka/protocol/produce.pyi @@ -33,6 +33,21 @@ class ProduceResponse_v5(Response): API_VERSION: int = ... SCHEMA: Any = ... +class ProduceResponse_v6(Response): + API_KEY: int = ... + API_VERSION: int = ... + SCHEMA: Any = ... + +class ProduceResponse_v7(Response): + API_KEY: int = ... + API_VERSION: int = ... + SCHEMA: Any = ... + +class ProduceResponse_v8(Response): + API_KEY: int = ... + API_VERSION: int = ... + SCHEMA: Any = ... + class ProduceRequest(Request, metaclass=abc.ABCMeta): API_KEY: int = ... def expect_response(self): ... diff --git a/tests/test_sender.py b/tests/test_sender.py index 2d436e05..d1f57dbf 100644 --- a/tests/test_sender.py +++ b/tests/test_sender.py @@ -779,7 +779,7 @@ def create_response(error_type): # Special case for DuplicateSequenceNumber resp = create_response(NoError) send_handler.handle_response(resp) - batch_mock.done.assert_called_with(100, 200) + batch_mock.done.assert_called_with(100, 200, None) self.assertEqual(send_handler._to_reenqueue, []) @run_until_complete @@ -805,7 +805,7 @@ def create_response(error_type): # Special case for DuplicateSequenceNumber resp = create_response(DuplicateSequenceNumber) send_handler.handle_response(resp) - batch_mock.done.assert_called_with(0, -1) + batch_mock.done.assert_called_with(0, -1, None) batch_mock.failure.assert_not_called() self.assertEqual(send_handler._to_reenqueue, []) From d06160bf13b2e7408a02c961a04a4403cff7cf23 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 28 Nov 2021 17:42:20 +0300 Subject: [PATCH 2/3] Add zstd compression support --- .github/workflows/publish.yml | 2 +- .github/workflows/tests.yml | 4 +- .travis.yml_bak | 1 + README.rst | 2 +- aiokafka/producer/producer.py | 23 +++++---- aiokafka/record/_crecords/consts.pxi | 3 +- aiokafka/record/_crecords/default_records.pyx | 11 ++++- aiokafka/record/default_records.py | 11 ++++- docs/index.rst | 8 +++ requirements-ci.txt | 1 + requirements-win-test.txt | 1 + setup.py | 3 ++ stubs/kafka/protocol/message.pyi | 1 + stubs/kafka/record/default_records.pyi | 1 + tests/record/test_default_records.py | 7 ++- tests/test_consumer.py | 49 +++++++++---------- 16 files changed, 82 insertions(+), 46 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index a6a2b9de..1bf22336 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -183,7 +183,7 @@ jobs: - name: Install system dependencies run: | sudo apt-get update - sudo apt-get install -y libsnappy-dev libkrb5-dev + sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev - name: Install python dependencies run: | pip install --upgrade pip setuptools wheel diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d8ea1d43..87ebecad 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -27,7 +27,7 @@ jobs: - name: Install system dependencies run: | sudo apt-get update - sudo apt-get install -y libsnappy-dev libkrb5-dev + sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev - name: Get pip cache dir id: pip-cache @@ -270,7 +270,7 @@ jobs: - name: Install system dependencies run: | sudo apt-get update - sudo apt-get install -y libsnappy-dev libkrb5-dev krb5-user + sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev krb5-user - name: Get pip cache dir id: pip-cache diff --git a/.travis.yml_bak b/.travis.yml_bak index 7f4099fd..1dbf2607 100644 --- a/.travis.yml_bak +++ b/.travis.yml_bak @@ -134,6 +134,7 @@ addons: apt: packages: - libsnappy-dev + - libzstd-dev - krb5-user install: diff --git a/README.rst b/README.rst index 84f25e7b..1dfc16f3 100644 --- a/README.rst +++ b/README.rst @@ -81,7 +81,7 @@ generate ssh keys for some tests. Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+):: - sudo apt-get install -y libsnappy-dev + sudo apt-get install -y libsnappy-dev libzstd-dev make setup Running tests with coverage:: diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 0f56c746..3d4ed430 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -5,11 +5,12 @@ import warnings from kafka.partitioner.default import DefaultPartitioner -from kafka.codec import has_gzip, has_snappy, has_lz4 +from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from aiokafka.client import AIOKafkaClient from aiokafka.errors import ( MessageSizeTooLargeError, UnsupportedVersionError, IllegalOperation) +from aiokafka.record.default_records import DefaultRecordBatch from aiokafka.record.legacy_records import LegacyRecordBatchBuilder from aiokafka.structs import TopicPartition from aiokafka.util import ( @@ -86,10 +87,10 @@ class AIOKafkaProducer: If unset, defaults to *acks=1*. If ``enable_idempotence`` is ``True`` defaults to *acks=all* compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. - Compression is of full batches of data, so the efficacy of batching - will also impact the compression ratio (more batching means better - compression). Default: None. + the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or + None. Compression is of full batches of data, so the efficacy of + batching will also impact the compression ratio (more batching + means better compression). Default: None. max_batch_size (int): Maximum size of buffered data per partition. After this amount `send` coroutine will block until batch is drained. @@ -167,9 +168,10 @@ class AIOKafkaProducer: _PRODUCER_CLIENT_ID_SEQUENCE = 0 _COMPRESSORS = { - 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), - 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), - 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + 'gzip': (has_gzip, DefaultRecordBatch.CODEC_GZIP), + 'snappy': (has_snappy, DefaultRecordBatch.CODEC_SNAPPY), + 'lz4': (has_lz4, DefaultRecordBatch.CODEC_LZ4), + 'zstd': (has_zstd, DefaultRecordBatch.CODEC_ZSTD), } _closed = None # Serves as an uninitialized flag for __del__ @@ -203,7 +205,7 @@ def __init__(self, *, loop=None, bootstrap_servers='localhost', if acks not in (0, 1, -1, 'all', _missing): raise ValueError("Invalid ACKS parameter") - if compression_type not in ('gzip', 'snappy', 'lz4', None): + if compression_type not in ('gzip', 'snappy', 'lz4', 'zstd', None): raise ValueError("Invalid compression type!") if compression_type: checker, compression_attrs = self._COMPRESSORS[compression_type] @@ -298,6 +300,9 @@ async def start(self): if self._compression_type == 'lz4': assert self.client.api_version >= (0, 8, 2), \ 'LZ4 Requires >= Kafka 0.8.2 Brokers' + elif self._compression_type == 'zstd': + assert self.client.api_version >= (2, 1, 0), \ + 'Zstd Requires >= Kafka 2.1.0 Brokers' if self._txn_manager is not None and self.client.api_version < (0, 11): raise UnsupportedVersionError( diff --git a/aiokafka/record/_crecords/consts.pxi b/aiokafka/record/_crecords/consts.pxi index 7fd44e8d..eb9c9cdb 100644 --- a/aiokafka/record/_crecords/consts.pxi +++ b/aiokafka/record/_crecords/consts.pxi @@ -5,6 +5,7 @@ DEF _ATTR_CODEC_NONE = 0x00 DEF _ATTR_CODEC_GZIP = 0x01 DEF _ATTR_CODEC_SNAPPY = 0x02 DEF _ATTR_CODEC_LZ4 = 0x03 +DEF _ATTR_CODEC_ZSTD = 0x04 DEF _TIMESTAMP_TYPE_MASK = 0x08 DEF _TRANSACTIONAL_MASK = 0x10 @@ -21,4 +22,4 @@ DEF _LEGACY_RECORD_FREELIST_SIZE = 100 DEF _DEFAULT_RECORD_METADATA_FREELIST_SIZE = 20 DEF _DEFAULT_RECORD_BATCH_FREELIST_SIZE = 100 -DEF _DEFAULT_RECORD_FREELIST_SIZE = 100 \ No newline at end of file +DEF _DEFAULT_RECORD_FREELIST_SIZE = 100 diff --git a/aiokafka/record/_crecords/default_records.pyx b/aiokafka/record/_crecords/default_records.pyx index 3dfd660a..73ec764a 100644 --- a/aiokafka/record/_crecords/default_records.pyx +++ b/aiokafka/record/_crecords/default_records.pyx @@ -57,8 +57,8 @@ from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, - gzip_decode, snappy_decode, lz4_decode + gzip_encode, snappy_encode, lz4_encode, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, zstd_decode ) import kafka.codec as codecs @@ -116,6 +116,8 @@ cdef _assert_has_codec(char compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == _ATTR_CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == _ATTR_CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" else: raise UnsupportedCodecError( f"Unknown compression codec {compression_type:#04x}") @@ -134,6 +136,7 @@ cdef class DefaultRecordBatch: CODEC_GZIP = _ATTR_CODEC_GZIP CODEC_SNAPPY = _ATTR_CODEC_SNAPPY CODEC_LZ4 = _ATTR_CODEC_LZ4 + CODEC_ZSTD = _ATTR_CODEC_ZSTD def __init__(self, object buffer): PyObject_GetBuffer(buffer, &self._buffer, PyBUF_SIMPLE) @@ -240,6 +243,8 @@ cdef class DefaultRecordBatch: uncompressed = snappy_decode(data.tobytes()) elif compression_type == _ATTR_CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) + elif compression_type == _ATTR_CODEC_ZSTD: + uncompressed = zstd_decode(data.tobytes()) PyBuffer_Release(&self._buffer) PyObject_GetBuffer(uncompressed, &self._buffer, PyBUF_SIMPLE) @@ -694,6 +699,8 @@ cdef class DefaultRecordBatchBuilder: compressed = snappy_encode(data) elif self._compression_type == _ATTR_CODEC_LZ4: compressed = lz4_encode(data) + elif self._compression_type == _ATTR_CODEC_ZSTD: + compressed = zstd_encode(data) size = ( len(compressed)) + FIRST_RECORD_OFFSET # We will just write the result into the same memory space. PyByteArray_Resize(self._buffer, size) diff --git a/aiokafka/record/default_records.py b/aiokafka/record/default_records.py index dff10c14..e9ee69ee 100644 --- a/aiokafka/record/default_records.py +++ b/aiokafka/record/default_records.py @@ -61,8 +61,8 @@ from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from aiokafka.util import NO_EXTENSIONS from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, - gzip_decode, snappy_decode, lz4_decode + gzip_encode, snappy_encode, lz4_encode, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, zstd_decode ) import kafka.codec as codecs @@ -96,6 +96,7 @@ class DefaultRecordBase: CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 TRANSACTIONAL_MASK = 0x10 CONTROL_MASK = 0x20 @@ -112,6 +113,8 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == self.CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" else: raise UnsupportedCodecError( f"Unknown compression codec {compression_type:#04x}") @@ -202,6 +205,8 @@ def _maybe_uncompress(self): uncompressed = snappy_decode(data.tobytes()) elif compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) + if compression_type == self.CODEC_ZSTD: + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True @@ -531,6 +536,8 @@ def _maybe_compress(self): compressed = snappy_encode(data) elif self._compression_type == self.CODEC_LZ4: compressed = lz4_encode(data) + elif self._compression_type == self.CODEC_ZSTD: + compressed = zstd_encode(data) compressed_size = len(compressed) if len(data) <= compressed_size: # We did not get any benefit from compression, lets send diff --git a/docs/index.rst b/docs/index.rst index c82a78ce..d1344ae5 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -147,6 +147,14 @@ For **Windows** the easiest way is to fetch a precompiled wheel from http://www.lfd.uci.edu/~gohlke/pythonlibs/#python-snappy +Optional zstd indtall ++++++++++++++++++++++ + +To enable Zstandard compression/decompression, install zstandard: + +>>> pip3 install zstandard + + Optional GSSAPI install +++++++++++++++++++++++ diff --git a/requirements-ci.txt b/requirements-ci.txt index 0aec3dd4..d26e808a 100644 --- a/requirements-ci.txt +++ b/requirements-ci.txt @@ -16,3 +16,4 @@ gssapi==1.7.2 dataclasses==0.8; python_version<"3.7" async_generator==1.10; python_version<"3.7" async-timeout==4.0.1 +zstandard==0.16.0 diff --git a/requirements-win-test.txt b/requirements-win-test.txt index 04ed905a..5ce84b0b 100644 --- a/requirements-win-test.txt +++ b/requirements-win-test.txt @@ -12,3 +12,4 @@ xxhash==2.0.2 python-snappy==0.6.0 dataclasses==0.8; python_version<"3.7" async_generator==1.10; python_version<"3.7" +zstandard==0.16.0 diff --git a/setup.py b/setup.py index 53f04ef0..95c80c20 100644 --- a/setup.py +++ b/setup.py @@ -116,7 +116,10 @@ def read(f): extras_require = { "snappy": ["python-snappy>=0.5"], + "lz4": ["lz4"], # Old format (magic=0) requires xxhash + "zstd": ["zstandard"], } +extras_require["all"] = sum(extras_require.values(), []) def read_version(): diff --git a/stubs/kafka/protocol/message.pyi b/stubs/kafka/protocol/message.pyi index 848f7c35..ce67c64b 100644 --- a/stubs/kafka/protocol/message.pyi +++ b/stubs/kafka/protocol/message.pyi @@ -12,6 +12,7 @@ class Message(Struct): CODEC_GZIP: int = ... CODEC_SNAPPY: int = ... CODEC_LZ4: int = ... + CODEC_ZSTD: int = ... TIMESTAMP_TYPE_MASK: int = ... HEADER_SIZE: int = ... timestamp: Any = ... diff --git a/stubs/kafka/record/default_records.pyi b/stubs/kafka/record/default_records.pyi index e5709975..5431f888 100644 --- a/stubs/kafka/record/default_records.pyi +++ b/stubs/kafka/record/default_records.pyi @@ -14,6 +14,7 @@ class DefaultRecordBase: CODEC_GZIP: int = ... CODEC_SNAPPY: int = ... CODEC_LZ4: int = ... + CODEC_ZSTD: int = ... TIMESTAMP_TYPE_MASK: int = ... TRANSACTIONAL_MASK: int = ... CONTROL_MASK: int = ... diff --git a/tests/record/test_default_records.py b/tests/record/test_default_records.py index d1fb2ed6..f2d6d8cd 100644 --- a/tests/record/test_default_records.py +++ b/tests/record/test_default_records.py @@ -10,9 +10,11 @@ @pytest.mark.parametrize("compression_type,crc", [ (DefaultRecordBatch.CODEC_NONE, 3950153926), - (DefaultRecordBatch.CODEC_GZIP, None), # No idea why, but crc changes here + # Gzip header includes timestamp, so checksum varies + (DefaultRecordBatch.CODEC_GZIP, None), (DefaultRecordBatch.CODEC_SNAPPY, 2171068483), - (DefaultRecordBatch.CODEC_LZ4, 462121143) + (DefaultRecordBatch.CODEC_LZ4, 462121143), + (DefaultRecordBatch.CODEC_ZSTD, 1679657554), ]) def test_read_write_serde_v2(compression_type, crc): builder = DefaultRecordBatchBuilder( @@ -183,6 +185,7 @@ def test_default_batch_size_limit(): (DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"), (DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"), (DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4"), + (DefaultRecordBatch.CODEC_ZSTD, "zstd", "has_zstd"), ]) def test_unavailable_codec(compression_type, name, checker_name): builder = DefaultRecordBatchBuilder( diff --git a/tests/test_consumer.py b/tests/test_consumer.py index e85d7c49..f37f0578 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -487,18 +487,17 @@ async def test_subscribe_errors(self): with self.assertRaises(TypeError): consumer.subscribe(topics=["some_topic"], listener=object()) - @run_until_complete - async def test_compress_decompress(self): - producer = AIOKafkaProducer( + # TODO Use `@pytest.mark.parametrize()` after moving to pytest-asyncio + async def _test_compress_decompress(self, compression_type): + async with AIOKafkaProducer( bootstrap_servers=self.hosts, - compression_type="gzip") - await producer.start() - await self.wait_topic(producer.client, self.topic) - msg1 = b'some-message' * 10 - msg2 = b'other-message' * 30 - await producer.send(self.topic, msg1, partition=1) - await producer.send(self.topic, msg2, partition=1) - await producer.stop() + compression_type=compression_type + ) as producer: + await self.wait_topic(producer.client, self.topic) + msg1 = b'some-message' * 10 + msg2 = b'other-message' * 30 + await (await producer.send(self.topic, msg1, partition=1)) + await (await producer.send(self.topic, msg2, partition=1)) consumer = await self.consumer_factory() rmsg1 = await consumer.getone() @@ -506,24 +505,22 @@ async def test_compress_decompress(self): rmsg2 = await consumer.getone() self.assertEqual(rmsg2.value, msg2) + @run_until_complete + async def test_compress_decompress_gzip(self): + await self._test_compress_decompress("gzip") + + @run_until_complete + async def test_compress_decompress_snappy(self): + await self._test_compress_decompress("snappy") + @run_until_complete async def test_compress_decompress_lz4(self): - producer = AIOKafkaProducer( - bootstrap_servers=self.hosts, - compression_type="lz4") - await producer.start() - await self.wait_topic(producer.client, self.topic) - msg1 = b'some-message' * 10 - msg2 = b'other-message' * 30 - await producer.send(self.topic, msg1, partition=1) - await producer.send(self.topic, msg2, partition=1) - await producer.stop() + await self._test_compress_decompress("lz4") - consumer = await self.consumer_factory() - rmsg1 = await consumer.getone() - self.assertEqual(rmsg1.value, msg1) - rmsg2 = await consumer.getone() - self.assertEqual(rmsg2.value, msg2) + @kafka_versions('>=2.1.0') + @run_until_complete + async def test_compress_decompress_zstd(self): + await self._test_compress_decompress("zstd") @run_until_complete async def test_consumer_seek_backward(self): From 1e52c2c93bf7a6f26e60ddffd98fb61e52ad2b66 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sat, 4 Dec 2021 13:56:46 +0300 Subject: [PATCH 3/3] Add changelog entry --- CHANGES/801.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGES/801.feature diff --git a/CHANGES/801.feature b/CHANGES/801.feature new file mode 100644 index 00000000..42c23c55 --- /dev/null +++ b/CHANGES/801.feature @@ -0,0 +1 @@ +Add Codec for ZStandard Compression (KIP-110)