Skip to content

Commit

Permalink
Remove all vendoring
Browse files Browse the repository at this point in the history
Now that the codebase has been modernised by using pyupgrade, we can
also remove all backported vendor modules, and all uses of them.
  • Loading branch information
s-t-e-v-e-n-k committed Mar 19, 2024
1 parent fcca556 commit d505f5e
Show file tree
Hide file tree
Showing 43 changed files with 90 additions and 2,733 deletions.
3 changes: 0 additions & 3 deletions .covrc

This file was deleted.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ test-local: build-integration
cov-local: build-integration
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
--cov-report html $(FLAGS) kafka test
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaConsumer, KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
2 changes: 0 additions & 2 deletions benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
25 changes: 9 additions & 16 deletions benchmarks/varint_speed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import pyperf
from kafka.vendor import six


test_data = [
Expand Down Expand Up @@ -67,6 +65,10 @@
BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))


def int2byte(i):
return bytes((i),)


def _assert_valid_enc(enc_func):
for encoded, decoded in test_data:
assert enc_func(decoded) == encoded, decoded
Expand Down Expand Up @@ -116,7 +118,7 @@ def encode_varint_1(num):
_assert_valid_enc(encode_varint_1)


def encode_varint_2(value, int2byte=six.int2byte):
def encode_varint_2(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

bits = value & 0x7f
Expand Down Expand Up @@ -151,7 +153,7 @@ def encode_varint_3(value, buf):
assert res == encoded


def encode_varint_4(value, int2byte=six.int2byte):
def encode_varint_4(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

if value <= 0x7f: # 1 byte
Expand Down Expand Up @@ -301,22 +303,13 @@ def size_of_varint_2(value):
_assert_valid_size(size_of_varint_2)


if six.PY3:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return memview[pos]
else:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return ord(memview[pos])
return memview[pos]


def decode_varint_1(buffer, pos=0):
Expand Down
9 changes: 2 additions & 7 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from kafka.errors import IllegalArgumentError
from enum import IntEnum

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from kafka.errors import IllegalArgumentError


class ResourceType(IntEnum):
Expand Down
1 change: 0 additions & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
Expand Down
7 changes: 1 addition & 6 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum


class ConfigResourceType(IntEnum):
Expand Down
13 changes: 1 addition & 12 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@
import copy
import logging
import random
import selectors
import socket
import threading
import time
import weakref

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor import selectors34 as selectors

from kafka.vendor import six

from kafka.cluster import ClusterMetadata
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from kafka import errors as Errors
Expand All @@ -25,9 +17,6 @@
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.metadata import MetadataRequest
from kafka.util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from kafka.vendor import socketpair
from kafka.version import __version__

log = logging.getLogger('kafka.client')
Expand Down
2 changes: 0 additions & 2 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import threading
import time

from kafka.vendor import six

from kafka import errors as Errors
from kafka.conn import collect_hosts
from kafka.future import Future
Expand Down
3 changes: 0 additions & 3 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import platform
import struct

from kafka.vendor import six
from kafka.vendor.six.moves import range

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
Expand Down
14 changes: 1 addition & 13 deletions kafka/conn.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import copy
import errno
import logging
import selectors
from random import shuffle, uniform

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor import selectors34 as selectors

import socket
import threading
import time

from kafka.vendor import six

from kafka import sasl
import kafka.errors as Errors
from kafka.future import Future
Expand Down Expand Up @@ -565,8 +557,6 @@ def _send_bytes(self, data):
except (SSLWantReadError, SSLWantWriteError):
break
except (ConnectionError, TimeoutError) as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
raise
except BlockingIOError:
break
Expand Down Expand Up @@ -863,8 +853,6 @@ def _recv(self):
except (SSLWantReadError, SSLWantWriteError):
break
except (ConnectionError, TimeoutError) as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
log.exception('%s: Error receiving network data'
' closing socket', self)
err = Errors.KafkaConnectionError(e)
Expand Down
2 changes: 0 additions & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import sys
import time

from kafka.vendor import six

import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
Expand Down
2 changes: 0 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from kafka.errors import KafkaConfigurationError, UnsupportedVersionError

from kafka.vendor import six

from kafka.client_async import KafkaClient, selectors
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
Expand Down
2 changes: 0 additions & 2 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import logging
import re

from kafka.vendor import six

from kafka.errors import IllegalStateError
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata
Expand Down
8 changes: 2 additions & 6 deletions kafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from __future__ import absolute_import

import collections
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment

Expand Down Expand Up @@ -34,14 +30,14 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
@classmethod
def assign(cls, cluster, member_metadata):
consumers_per_topic = collections.defaultdict(list)
for member, metadata in six.iteritems(member_metadata):
for member, metadata in member_metadata.items():
for topic in metadata.subscription:
consumers_per_topic[topic].append(member)

# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)

for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
for topic, consumers_for_topic in consumers_per_topic.items():
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
Expand Down
6 changes: 1 addition & 5 deletions kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from __future__ import absolute_import

import collections
import itertools
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.structs import TopicPartition
Expand Down Expand Up @@ -51,7 +47,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
@classmethod
def assign(cls, cluster, member_metadata):
all_topics = set()
for metadata in six.itervalues(member_metadata):
for metadata in member_metadata.values():
all_topics.update(metadata.subscription)

all_topic_partitions = []
Expand Down
4 changes: 1 addition & 3 deletions kafka/coordinator/assignors/sticky/partition_movements.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.vendor import six

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -74,7 +72,7 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))

def are_sticky(self):
for topic, movements in six.iteritems(self.partition_movements_by_topic):
for topic, movements in self.partition_movements_by_topic.items():
movement_pairs = set(movements.keys())
if self._has_cycles(movement_pairs):
log.error(
Expand Down
Loading

0 comments on commit d505f5e

Please sign in to comment.