Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use monkeytype to create some semblance of typing #173

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class AbstractPartitionAssignor(object):
partition counts which are always needed in assignors).
"""

@abc.abstractproperty
@abc.abstractmethod
def name(self):
""".name should be a string identifying the assignor"""
pass
Expand Down
1 change: 0 additions & 1 deletion kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.cluster import ClusterMetadata
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
Expand Down
9 changes: 5 additions & 4 deletions kafka/errors.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import inspect
import sys
from typing import Any


class KafkaError(RuntimeError):
retriable = False
# whether metadata should be refreshed on error
invalid_metadata = False

def __str__(self):
def __str__(self) -> str:
if not self.args:
return self.__class__.__name__
return '{}: {}'.format(self.__class__.__name__,
Expand Down Expand Up @@ -65,7 +66,7 @@ class IncompatibleBrokerVersion(KafkaError):


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(
"""Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
Expand All @@ -92,7 +93,7 @@ class BrokerResponseError(KafkaError):
message = None
description = None

def __str__(self):
def __str__(self) -> str:
"""Add errno to standard KafkaError str"""
return '[Error {}] {}'.format(
self.errno,
Expand Down Expand Up @@ -509,7 +510,7 @@ def _iter_broker_errors():
kafka_errors = {x.errno: x for x in _iter_broker_errors()}


def for_code(error_code):
def for_code(error_code: int) -> Any:
return kafka_errors.get(error_code, UnknownError)


Expand Down
14 changes: 7 additions & 7 deletions kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ class Request(Struct):

FLEXIBLE_VERSION = False

@abc.abstractproperty
@abc.abstractmethod
def API_KEY(self):
"""Integer identifier for api request"""
pass

@abc.abstractproperty
@abc.abstractmethod
def API_VERSION(self):
"""Integer of api request version"""
pass

@abc.abstractproperty
@abc.abstractmethod
def SCHEMA(self):
"""An instance of Schema() representing the request structure"""
pass

@abc.abstractproperty
@abc.abstractmethod
def RESPONSE_TYPE(self):
"""The Response class associated with the api request"""
pass
Expand All @@ -93,17 +93,17 @@ def parse_response_header(self, read_buffer):
class Response(Struct):
__metaclass__ = abc.ABCMeta

@abc.abstractproperty
@abc.abstractmethod
def API_KEY(self):
"""Integer identifier for api request/response"""
pass

@abc.abstractproperty
@abc.abstractmethod
def API_VERSION(self):
"""Integer of api request/response version"""
pass

@abc.abstractproperty
@abc.abstractmethod
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass
Expand Down
13 changes: 7 additions & 6 deletions kafka/protocol/struct.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from io import BytesIO
from typing import List, Union

from kafka.protocol.abstract import AbstractType
from kafka.protocol.types import Schema
Expand All @@ -9,7 +10,7 @@
class Struct(AbstractType):
SCHEMA = Schema()

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
if len(args) == len(self.SCHEMA.fields):
for i, name in enumerate(self.SCHEMA.names):
self.__dict__[name] = args[i]
Expand All @@ -36,23 +37,23 @@ def encode(cls, item): # pylint: disable=E0202
bits.append(field.encode(item[i]))
return b''.join(bits)

def _encode_self(self):
def _encode_self(self) -> bytes:
return self.SCHEMA.encode(
[self.__dict__[name] for name in self.SCHEMA.names]
)

@classmethod
def decode(cls, data):
def decode(cls, data: Union[BytesIO, bytes]) -> Union['ConsumerProtocolMemberAssignment', 'ConsumerProtocolMemberMetadata', 'FetchResponse_v0', 'StickyAssignorUserDataV1']:
if isinstance(data, bytes):
data = BytesIO(data)
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])

def get_item(self, name):
def get_item(self, name: str) -> Union[int, List[List[Union[int, str, bool, List[List[Union[int, List[int]]]]]]], str, List[List[Union[int, str]]]]:
if name not in self.SCHEMA.names:
raise KeyError("%s is not in the schema" % name)
return self.__dict__[name]

def __repr__(self):
def __repr__(self) -> str:
key_vals = []
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
key_vals.append(f'{name}={field.repr(self.__dict__[name])}')
Expand All @@ -61,7 +62,7 @@ def __repr__(self):
def __hash__(self):
return hash(self.encode())

def __eq__(self, other):
def __eq__(self, other: Union['ConsumerProtocolMemberAssignment', 'ConsumerProtocolMemberMetadata', 'MetadataRequest_v0', 'Message']) -> bool:
if self.SCHEMA != other.SCHEMA:
return False
for attr in self.SCHEMA.names:
Expand Down
6 changes: 3 additions & 3 deletions kafka/record/_crc32c.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
_MASK = 0xFFFFFFFF


def crc_update(crc, data):
def crc_update(crc: int, data: bytes) -> int:
"""Update CRC-32C checksum with data.
Args:
crc: 32-bit checksum to update as long.
Expand All @@ -116,7 +116,7 @@ def crc_update(crc, data):
return crc ^ _MASK


def crc_finalize(crc):
def crc_finalize(crc: int) -> int:
"""Finalize CRC-32C checksum.
This function should be called as last step of crc calculation.
Args:
Expand All @@ -127,7 +127,7 @@ def crc_finalize(crc):
return crc & _MASK


def crc(data):
def crc(data: bytes) -> int:
"""Compute CRC-32C checksum of the data.
Args:
data: byte array, string or iterable over bytes.
Expand Down
14 changes: 7 additions & 7 deletions kafka/record/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ class ABCRecord:
__metaclass__ = abc.ABCMeta
__slots__ = ()

@abc.abstractproperty
@abc.abstractmethod
def offset(self):
""" Absolute offset of record
"""

@abc.abstractproperty
@abc.abstractmethod
def timestamp(self):
""" Epoch milliseconds
"""

@abc.abstractproperty
@abc.abstractmethod
def timestamp_type(self):
""" CREATE_TIME(0) or APPEND_TIME(1)
"""

@abc.abstractproperty
@abc.abstractmethod
def key(self):
""" Bytes key or None
"""

@abc.abstractproperty
@abc.abstractmethod
def value(self):
""" Bytes value or None
"""

@abc.abstractproperty
@abc.abstractmethod
def checksum(self):
""" Prior to v2 format CRC was contained in every message. This will
be the checksum for v0 and v1 and None for v2 and above.
"""

@abc.abstractproperty
@abc.abstractmethod
def headers(self):
""" If supported by version list of key-value tuples, or empty list if
not supported by format.
Expand Down
Loading
Loading