Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection_timeout_ms and reset the timeout counter more often #132

Merged
merged 8 commits into from
Apr 3, 2024
4 changes: 4 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class KafkaClient:
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -145,6 +148,7 @@ class KafkaClient:
'bootstrap_servers': 'localhost',
'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'connection_timeout_ms': None,
'request_timeout_ms': 30000,
'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
Expand Down
24 changes: 19 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class BrokerConnection:
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
Expand Down Expand Up @@ -188,6 +191,7 @@ class BrokerConnection:
'client_id': 'kafka-python-' + __version__,
'node_id': 0,
'request_timeout_ms': 30000,
'connection_timeout_ms': None,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
Expand Down Expand Up @@ -231,6 +235,9 @@ def __init__(self, host, port, afi, **configs):
for key in self.config:
if key in configs:
self.config[key] = configs[key]

if self.config['connection_timeout_ms'] is None:
self.config['connection_timeout_ms'] = self.config['request_timeout_ms']

self.node_id = self.config.pop('node_id')

Expand Down Expand Up @@ -284,7 +291,10 @@ def __init__(self, host, port, afi, **configs):
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._sasl_auth_future = None
self.last_attempt = 0
self.last_activity = 0
# This value is not used for internal state, but it is left to allow backwards-compatability
# The variable last_activity is now used instead, but is updated more often may therefore break compatability with some hacks.
self.last_attempt= 0
self._gai = []
self._sensors = None
if self.config['metrics']:
Expand Down Expand Up @@ -362,6 +372,7 @@ def connect(self):
self.config['state_change_callback'](self.node_id, self._sock, self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
self.last_activity = time.time()

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
Expand Down Expand Up @@ -394,6 +405,7 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
Expand All @@ -419,6 +431,7 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
Expand All @@ -429,12 +442,13 @@ def connect(self):
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)
self.last_activity = time.time()

if self.state not in (ConnectionStates.CONNECTED,
ConnectionStates.DISCONNECTED):
# Connection timed out
request_timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_attempt:
request_timeout = self.config['connection_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_activity:
log.error('Connection attempt to %s timed out', self)
self.close(Errors.KafkaConnectionError('timeout'))
return self.state
Expand Down Expand Up @@ -595,7 +609,7 @@ def blacked_out(self):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
if time.time() < self.last_attempt + self._reconnect_backoff:
if time.time() < self.last_activity + self._reconnect_backoff:
return True
return False

Expand All @@ -606,7 +620,7 @@ def connection_delay(self):
the reconnect backoff time. When connecting or connected, returns a very
large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
time_waited = time.time() - (self.last_activity or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
else:
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ class KafkaProducer:
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
connection_timeout_ms (int): Connection timeout in milliseconds.
Default: None, which defaults it to the same value as
request_timeout_ms.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
receive_buffer_bytes (int): The size of the TCP receive buffer
Expand Down Expand Up @@ -300,6 +303,7 @@ class KafkaProducer:
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'connection_timeout_ms': None,
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down
88 changes: 80 additions & 8 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import socket

import pytest
import time

from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.protocol.api import RequestHeader
Expand Down Expand Up @@ -61,28 +62,99 @@ def test_connect_timeout(_socket, conn):
# Initial connect returns EINPROGRESS
# immediate inline connect returns EALREADY
# second explicit connect returns EALREADY
# third explicit connect returns EALREADY and times out via last_attempt
# third explicit connect returns EALREADY and times out via last_activity
_socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY]
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
conn.last_activity = 0
conn.last_attempt = 0
conn.connect()
assert conn.state is ConnectionStates.DISCONNECTED

def test_connect_timeout_slowconn(_socket, conn, mocker):
# Same as test_connect_timeout,
# but we make the connection run longer than the timeout in order to test that
# BrokerConnection resets the timer whenever things happen during the connection
# See https://github.com/dpkp/kafka-python/issues/2386
_socket.connect_ex.side_effect = [EINPROGRESS, EISCONN]

# 0.8 = we guarantee that when testing with three intervals of this we are past the timeout
time_between_connect = (conn.config['connection_timeout_ms']/1000) * 0.8
start = time.time()

# Use plaintext auth for simplicity
last_activity = conn.last_activity
last_attempt = conn.last_attempt
conn.config['security_protocol'] = 'SASL_PLAINTEXT'
conn.connect()
assert conn.state is ConnectionStates.CONNECTING
# Ensure the last_activity counter was updated
# Last_attempt should also be updated
assert conn.last_activity > last_activity
assert conn.last_attempt > last_attempt
last_attempt = conn.last_attempt
last_activity = conn.last_activity

# Simulate time being passed
# This shouldn't be enough time to time out the connection
conn._try_authenticate = mocker.Mock(side_effect=[False, False, True])
with mock.patch("time.time", return_value=start+time_between_connect):
# This should trigger authentication
# Note that an authentication attempt isn't actually made until now.
# We simulate that authentication does not succeed at this point
# This is technically incorrect, but it lets us see what happens
# to the state machine when the state doesn't change for two function calls
conn.connect()
assert conn.last_activity > last_activity
# Last attempt is kept as a legacy variable, should not update
assert conn.last_attempt == last_attempt
last_activity = conn.last_activity

assert conn.state is ConnectionStates.AUTHENTICATING


# This time around we should be way past timeout.
# Now we care about connect() not terminating the attempt,
# because connection state was progressed in the meantime.
with mock.patch("time.time", return_value=start+time_between_connect*2):
# Simulate this one not succeeding as well. This is so we can ensure things don't time out
conn.connect()

# No state change = no activity change
assert conn.last_activity == last_activity
assert conn.last_attempt == last_attempt

# If last_activity was not reset when the state transitioned to AUTHENTICATING,
# the connection state would be timed out now.
assert conn.state is ConnectionStates.AUTHENTICATING


# This time around, the connection should succeed.
with mock.patch("time.time", return_value=start+time_between_connect*3):
# This should finalize the connection
conn.connect()

assert conn.last_activity > last_activity
assert conn.last_attempt == last_attempt
last_activity = conn.last_activity

assert conn.state is ConnectionStates.CONNECTED



def test_blacked_out(conn):
with mock.patch("time.time", return_value=1000):
conn.last_attempt = 0
conn.last_activity = 0
assert conn.blacked_out() is False
conn.last_attempt = 1000
conn.last_activity = 1000
assert conn.blacked_out() is True


def test_connection_delay(conn):
with mock.patch("time.time", return_value=1000):
conn.last_attempt = 1000
conn.last_activity = 1000
assert conn.connection_delay() == conn.config['reconnect_backoff_ms']
conn.state = ConnectionStates.CONNECTING
assert conn.connection_delay() == float('inf')
Expand Down Expand Up @@ -286,7 +358,7 @@ def test_lookup_on_connect():
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.last_activity = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn._sock_afi == afi2
Expand All @@ -301,11 +373,10 @@ def test_relookup_on_failure():
assert conn.host == hostname
mock_return1 = []
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
last_attempt = conn.last_attempt
last_activity = conn.last_activity
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn.disconnected()
assert conn.last_attempt > last_attempt

afi2 = socket.AF_INET
sockaddr2 = ('127.0.0.2', 9092)
Expand All @@ -314,12 +385,13 @@ def test_relookup_on_failure():
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.last_activity = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM)
assert conn._sock_afi == afi2
assert conn._sock_addr == sockaddr2
conn.close()
assert conn.last_activity > last_activity


def test_requests_timed_out(conn):
Expand Down
Loading