From e76232106e4129d5ab2e279f546791446260b6b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 19:49:57 -0500 Subject: [PATCH 01/11] Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161) * Test Kafka 0.8.2.2 using Python 3.11 in the meantime * Override PYTHON_LATEST conditionally in python-package.yml * Update python-package.yml * add python annotation to kafka version test matrix * Update python-package.yml * try python 3.10 --- .github/workflows/python-package.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index ef49b4e58..d39eeccf8 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -110,7 +110,7 @@ jobs: KAFKA_VERSION: ${{ env.KAFKA_LATEST }} test-kafka: - name: Tests for Kafka ${{ matrix.kafka-version }} + name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }}) needs: - build-sdist runs-on: ubuntu-latest @@ -127,10 +127,17 @@ jobs: - "2.4.0" - "2.5.0" - "2.6.0" + python-version: ['3.12'] experimental: [false] include: - kafka-version: '0.8.2.2' experimental: true + python-version: "3.12" + - kafka-version: '0.8.2.2' + experimental: false + python-version: "3.10" + env: + PYTHON_LATEST: ${{ matrix.python-version }} continue-on-error: ${{ matrix.experimental }} steps: - name: Checkout the source code @@ -145,7 +152,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: ${{ env.PYTHON_LATEST }} + python-version: ${{ matrix.python-version }} cache: pip cache-dependency-path: | requirements-dev.txt From 00750aaa2eb7fe568e15d995d0bca12700bf94b9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:09:44 -0500 Subject: [PATCH 02/11] Remove support for EOL'ed versions of Python (#160) * Remove support for EOL'ed versions of Python * Update setup.py --- setup.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 75d2b8bcb..dd4e5de90 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,7 @@ def run(cls): setup( name="kafka-python-ng", + python_requires=">=3.8", use_scm_version=True, setup_requires=["setuptools_scm"], tests_require=test_require, @@ -60,13 +61,6 @@ def run(cls): "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.4", - "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", From 5bd1323d3ed4d44081592d6518435fc89724318a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:21:06 -0500 Subject: [PATCH 03/11] Stop testing Python 3.13 in python-package.yml (#162) Too many MRs to review... so little time. --- .github/workflows/python-package.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d39eeccf8..df0e4e489 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -68,9 +68,6 @@ jobs: - "3.12" - "pypy3.9" experimental: [ false ] - include: - - python-version: "~3.13.0-0" - experimental: true steps: - name: Checkout the source code uses: actions/checkout@v4 From cda8f81ae16ee087cd039a4277be13f2004d4d09 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 20:44:41 -0500 Subject: [PATCH 04/11] Avoid 100% CPU usage while socket is closed (#156) After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by unregister the socket if the fd is negative. Co-authored-by: Orange Kao --- kafka/client_async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..530a1f441 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -637,6 +637,9 @@ def _poll(self, timeout): self._sensors.select_time.record((end_select - start_select) * 1000000000) for key, events in ready: + if key.fileobj.fileno() < 0: + self._selector.unregister(key.fileobj) + if key.fileobj is self._wake_r: self._clear_wake_fd() continue From c02df0899f3b6a1e71f05fd91ff506b1b0c4c3eb Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 8 Mar 2024 22:45:43 -0500 Subject: [PATCH 05/11] Fix DescribeConfigsResponse_v1 config_source (#150) Co-authored-by: Ryar Nyah --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f9d61e5cd..41b4a9576 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -719,7 +719,7 @@ class DescribeConfigsResponse_v1(Response): ('config_names', String('utf-8')), ('config_value', String('utf-8')), ('read_only', Boolean), - ('is_default', Boolean), + ('config_source', Int8), ('is_sensitive', Boolean), ('config_synonyms', Array( ('config_name', String('utf-8')), From 65eacfb2e6900d789ad501c6d5c81fcbc420ed16 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:15:36 -0500 Subject: [PATCH 06/11] Fix base class of DescribeClientQuotasResponse_v0 (#144) Co-authored-by: Denis Otkidach --- kafka/protocol/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 41b4a9576..0bb1a7acc 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -925,7 +925,7 @@ class DeleteGroupsRequest_v1(Request): ] -class DescribeClientQuotasResponse_v0(Request): +class DescribeClientQuotasResponse_v0(Response): API_KEY = 48 API_VERSION = 0 SCHEMA = Schema( From e0ebe5dd3191778b35967b3a0dd22ca6e091a9b7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:16:53 -0500 Subject: [PATCH 07/11] Update license_file to license_files (#131) The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <45581170+micwoj92@users.noreply.github.com> --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 5c6311daf..76daa0897 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,4 @@ universal=1 [metadata] -license_file = LICENSE +license_files = LICENSE From 26bb3eb6534cc1aff0a2a7751de7f7dedfdd4cd5 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sat, 9 Mar 2024 23:22:03 -0500 Subject: [PATCH 08/11] Update some RST documentation syntax (#130) * docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <60973476+HalfSweet@users.noreply.github.com> --- README.rst | 165 +++++++++++++++++++++++++++++++------------------ docs/index.rst | 114 +++++++++++++++++++++------------- 2 files changed, 174 insertions(+), 105 deletions(-) diff --git a/README.rst b/README.rst index 8a5c71b38..b7acfc8a2 100644 --- a/README.rst +++ b/README.rst @@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. -See + +See https://kafka-python.readthedocs.io/en/master/compatibility.html + for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code-block:: bash + + $ pip install kafka-python-ng + KafkaConsumer @@ -48,42 +54,56 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html + for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code-block:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code-block:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) ->>> # Access record headers. The returned value is a list of tuples ->>> # with str, bytes for key and value ->>> for msg in consumer: -... print (msg.headers) +.. code-block:: python ->>> # Get consumer metrics ->>> metrics = consumer.metrics() + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) + +.. code-block:: python + + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) + +.. code-block:: python + + # Access record headers. The returned value is a list of tuples + # with str, bytes for key and value + for msg in consumer: + print (msg.headers) + +.. code-block:: python + + # Get consumer metrics + metrics = consumer.metrics() KafkaProducer @@ -91,46 +111,66 @@ KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. -See + +See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html + for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code-block:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code-block:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code-block:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code-block:: python ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() +.. code-block:: python ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) +.. code-block:: python ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) +.. code-block:: python ->>> # Include record headers. The format is list of tuples with string key ->>> # and bytes value. ->>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) ->>> # Get producer performance metrics ->>> metrics = producer.metrics() +.. code-block:: python + + # Include record headers. The format is list of tuples with string key + # and bytes value. + producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')]) + +.. code-block:: python + + # Get producer performance metrics + metrics = producer.metrics() Thread safety @@ -154,7 +194,9 @@ kafka-python-ng supports the following compression formats: - Zstandard (zstd) gzip is supported natively, the others require installing additional libraries. -See for more information. + +See https://kafka-python.readthedocs.io/en/master/install.html for more information. + Optimized CRC32 Validation @@ -162,8 +204,9 @@ Optimized CRC32 Validation Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure python implementation for compatibility. To improve performance for high-throughput -applications, kafka-python-ng will use `crc32c` for optimized native code if installed. -See for installation instructions. +applications, kafka-python will use `crc32c` for optimized native code if installed. +See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions. + See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib. diff --git a/docs/index.rst b/docs/index.rst index 92b998d92..779ad997b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,7 +31,11 @@ failures. See `Compatibility `_ for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. ->>> pip install kafka-python-ng + +.. code:: bash + + pip install kafka-python-ng + KafkaConsumer @@ -47,28 +51,36 @@ See `KafkaConsumer `_ for API and configuration detai The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic') ->>> for msg in consumer: -... print (msg) +.. code:: python + + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic') + for msg in consumer: + print (msg) + +.. code:: python + + # join a consumer group for dynamic partition assignment and offset commits + from kafka import KafkaConsumer + consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') + for msg in consumer: + print (msg) + +.. code:: python ->>> # join a consumer group for dynamic partition assignment and offset commits ->>> from kafka import KafkaConsumer ->>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') ->>> for msg in consumer: -... print (msg) + # manually assign the partition list for the consumer + from kafka import TopicPartition + consumer = KafkaConsumer(bootstrap_servers='localhost:1234') + consumer.assign([TopicPartition('foobar', 2)]) + msg = next(consumer) ->>> # manually assign the partition list for the consumer ->>> from kafka import TopicPartition ->>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') ->>> consumer.assign([TopicPartition('foobar', 2)]) ->>> msg = next(consumer) +.. code:: python ->>> # Deserialize msgpack-encoded values ->>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) ->>> consumer.subscribe(['msgpackfoo']) ->>> for msg in consumer: -... assert isinstance(msg.value, dict) + # Deserialize msgpack-encoded values + consumer = KafkaConsumer(value_deserializer=msgpack.loads) + consumer.subscribe(['msgpackfoo']) + for msg in consumer: + assert isinstance(msg.value, dict) KafkaProducer @@ -78,36 +90,50 @@ KafkaProducer The class is intended to operate as similarly as possible to the official java client. See `KafkaProducer `_ for more details. ->>> from kafka import KafkaProducer ->>> producer = KafkaProducer(bootstrap_servers='localhost:1234') ->>> for _ in range(100): -... producer.send('foobar', b'some_message_bytes') +.. code:: python + + from kafka import KafkaProducer + producer = KafkaProducer(bootstrap_servers='localhost:1234') + for _ in range(100): + producer.send('foobar', b'some_message_bytes') + +.. code:: python + + # Block until a single message is sent (or timeout) + future = producer.send('foobar', b'another_message') + result = future.get(timeout=60) + +.. code:: python + + # Block until all pending messages are at least put on the network + # NOTE: This does not guarantee delivery or success! It is really + # only useful if you configure internal batching using linger_ms + producer.flush() + +.. code:: python + + # Use a key for hashed-partitioning + producer.send('foobar', key=b'foo', value=b'bar') ->>> # Block until a single message is sent (or timeout) ->>> future = producer.send('foobar', b'another_message') ->>> result = future.get(timeout=60) +.. code:: python ->>> # Block until all pending messages are at least put on the network ->>> # NOTE: This does not guarantee delivery or success! It is really ->>> # only useful if you configure internal batching using linger_ms ->>> producer.flush() + # Serialize json messages + import json + producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('fizzbuzz', {'foo': 'bar'}) ->>> # Use a key for hashed-partitioning ->>> producer.send('foobar', key=b'foo', value=b'bar') +.. code:: python ->>> # Serialize json messages ->>> import json ->>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) ->>> producer.send('fizzbuzz', {'foo': 'bar'}) + # Serialize string keys + producer = KafkaProducer(key_serializer=str.encode) + producer.send('flipflap', key='ping', value=b'1234') ->>> # Serialize string keys ->>> producer = KafkaProducer(key_serializer=str.encode) ->>> producer.send('flipflap', key='ping', value=b'1234') +.. code:: python ->>> # Compress messages ->>> producer = KafkaProducer(compression_type='gzip') ->>> for i in range(1000): -... producer.send('foobar', b'msg %d' % i) + # Compress messages + producer = KafkaProducer(compression_type='gzip') + for i in range(1000): + producer.send('foobar', b'msg %d' % i) Thread safety From 88763da301f72759911ec4c0e4dc8b4e9f83e124 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:14:12 -0500 Subject: [PATCH 09/11] Fix crc32c's __main__ for Python 3 (#142) * Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt --- kafka/record/_crc32c.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index 9b51ad8a9..6642b5bbe 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -139,7 +139,5 @@ def crc(data): if __name__ == "__main__": import sys - # TODO remove the pylint disable once pylint fixes - # https://github.com/PyCQA/pylint/issues/2571 - data = sys.stdin.read() # pylint: disable=assignment-from-no-return + data = sys.stdin.buffer.read() # pylint: disable=assignment-from-no-return print(hex(crc(data))) From b1a4c53cfc426118cca491a552861e9b07592629 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 00:59:32 -0500 Subject: [PATCH 10/11] Strip trailing dot off hostname. (#133) Co-authored-by: Dave Voutila --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..1f3bc2006 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -496,7 +496,7 @@ def _wrap_ssl(self): try: self._sock = self._ssl_context.wrap_socket( self._sock, - server_hostname=self.host, + server_hostname=self.host.rstrip("."), do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) From 18eaa2d29e7ba3d0f261e620397712b5d67c3a94 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Sun, 10 Mar 2024 01:47:35 -0500 Subject: [PATCH 11/11] Handle OSError to properly recycle SSL connection, fix infinite loop (#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1f3bc2006..80f17009c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -510,7 +510,7 @@ def _try_handshake(self): # old ssl in python2.6 will swallow all SSLErrors here... except (SSLWantReadError, SSLWantWriteError): pass - except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): + except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError, ssl.SSLError, OSError) as e: log.warning('SSL connection closed by server during handshake.') self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user