Skip to content

Commit

Permalink
Run pyupgrade on everything. (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha committed Mar 18, 2024
1 parent deeccfa commit fcca556
Show file tree
Hide file tree
Showing 80 changed files with 290 additions and 456 deletions.
2 changes: 0 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

__title__ = 'kafka'
from kafka.version import __version__
__author__ = 'Dana Powers'
Expand Down
2 changes: 0 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
Expand Down
11 changes: 5 additions & 6 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from __future__ import absolute_import
from kafka.errors import IllegalArgumentError

# enum in stdlib as of py3.4
Expand Down Expand Up @@ -69,7 +68,7 @@ class ACLResourcePatternType(IntEnum):
PREFIXED = 4


class ACLFilter(object):
class ACLFilter:
"""Represents a filter to use with describing and deleting ACLs
The difference between this class and the ACL class is mainly that
Expand Down Expand Up @@ -161,7 +160,7 @@ def __init__(
permission_type,
resource_pattern
):
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
super().__init__(principal, host, operation, permission_type, resource_pattern)
self.validate()

def validate(self):
Expand All @@ -173,7 +172,7 @@ def validate(self):
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")


class ResourcePatternFilter(object):
class ResourcePatternFilter:
def __init__(
self,
resource_type,
Expand Down Expand Up @@ -232,13 +231,13 @@ def __init__(
resource_name,
pattern_type=ACLResourcePatternType.LITERAL
):
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
super().__init__(resource_type, resource_name, pattern_type)
self.validate()

def validate(self):
if self.resource_type == ResourceType.ANY:
raise IllegalArgumentError("resource_type cannot be ANY")
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
raise IllegalArgumentError(
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
)
10 changes: 4 additions & 6 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
Expand Down Expand Up @@ -32,7 +30,7 @@
log = logging.getLogger(__name__)


class KafkaAdminClient(object):
class KafkaAdminClient:
"""A class for administering the Kafka cluster.
Warning:
Expand Down Expand Up @@ -194,7 +192,7 @@ def __init__(self, **configs):
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down Expand Up @@ -874,7 +872,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
))
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")

self._wait_for_futures(futures)
return [f.value for f in futures]
Expand Down Expand Up @@ -1197,7 +1195,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(topics_partitions_dict.items())
request = OffsetFetchRequest[version](group_id, topics_partitions)
else:
raise NotImplementedError(
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
Expand All @@ -15,7 +13,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
Expand Down
5 changes: 1 addition & 4 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import


class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
Expand Down
22 changes: 8 additions & 14 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import, division

import collections
import copy
import logging
Expand Down Expand Up @@ -32,14 +30,10 @@
from kafka.vendor import socketpair
from kafka.version import __version__

if six.PY2:
ConnectionError = None


log = logging.getLogger('kafka.client')


class KafkaClient(object):
class KafkaClient:
"""
A network client for asynchronous request/response network I/O.
Expand Down Expand Up @@ -374,7 +368,7 @@ def _maybe_connect(self, node_id):

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
assert broker, 'Broker id {} not in current metadata'.format(node_id)

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
Expand Down Expand Up @@ -686,7 +680,7 @@ def _poll(self, timeout):
unexpected_data = key.fileobj.recv(1)
if unexpected_data: # anything other than a 0-byte read means protocol issues
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
except OSError:
pass
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
Expand All @@ -701,7 +695,7 @@ def _poll(self, timeout):
if conn not in processed and conn.connected() and conn._sock.pending():
self._pending_completion.extend(conn.recv())

for conn in six.itervalues(self._conns):
for conn in self._conns.values():
if conn.requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
conn, conn.config['request_timeout_ms'])
Expand Down Expand Up @@ -941,7 +935,7 @@ def wakeup(self):
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
raise Errors.KafkaTimeoutError()
except socket.error as e:
except OSError as e:
log.warning('Unable to send to wakeup socket!')
if self._raise_upon_socket_err_during_wakeup:
raise e
Expand All @@ -951,7 +945,7 @@ def _clear_wake_fd(self):
while True:
try:
self._wake_r.recv(1024)
except socket.error:
except OSError:
break

def _maybe_close_oldest_connection(self):
Expand Down Expand Up @@ -981,7 +975,7 @@ def bootstrap_connected(self):
OrderedDict = dict


class IdleConnectionManager(object):
class IdleConnectionManager:
def __init__(self, connections_max_idle_ms):
if connections_max_idle_ms > 0:
self.connections_max_idle = connections_max_idle_ms / 1000
Expand Down Expand Up @@ -1043,7 +1037,7 @@ def poll_expired_connection(self):
return None


class KafkaClientMetrics(object):
class KafkaClientMetrics:
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
self.metric_group_name = metric_group_prefix + '-metrics'
Expand Down
12 changes: 5 additions & 7 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import collections
import copy
import logging
Expand All @@ -16,7 +14,7 @@
log = logging.getLogger(__name__)


class ClusterMetadata(object):
class ClusterMetadata:
"""
A class to manage kafka cluster metadata.
Expand Down Expand Up @@ -128,9 +126,9 @@ def available_partitions_for_topic(self, topic):
"""
if topic not in self._partitions:
return None
return set([partition for partition, metadata
in six.iteritems(self._partitions[topic])
if metadata.leader != -1])
return {partition for partition, metadata
in self._partitions[topic].items()
if metadata.leader != -1}

def leader_for_partition(self, partition):
"""Return node_id of leader, -1 unavailable, None if unknown."""
Expand Down Expand Up @@ -361,7 +359,7 @@ def add_group_coordinator(self, group, response):

# Use a coordinator-specific node id so that group requests
# get a dedicated connection
node_id = 'coordinator-{}'.format(response.coordinator_id)
node_id = f'coordinator-{response.coordinator_id}'
coordinator = BrokerMetadata(
node_id,
response.host,
Expand Down
6 changes: 0 additions & 6 deletions kafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

import gzip
import io
import platform
Expand Down Expand Up @@ -149,10 +147,6 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
# buffer... likely a python-snappy bug, so just use a slice copy
chunker = lambda payload, i, size: payload[i:size+i]

elif six.PY2:
# Sliced buffer avoids additional copies
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: buffer(payload, i, size)
else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
Expand Down
Loading

0 comments on commit fcca556

Please sign in to comment.