Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rely on socket selector to detect completed connection attempts #1909

Merged
merged 2 commits into from
Sep 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ def _conn_state_change(self, node_id, sock, conn):
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
self._selector.register(sock, selectors.EVENT_WRITE)
self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
self._selector.modify(sock, selectors.EVENT_WRITE)
self._selector.modify(sock, selectors.EVENT_WRITE, conn)

if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
Expand Down Expand Up @@ -623,7 +623,11 @@ def _poll(self, timeout):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
elif not (events & selectors.EVENT_READ):
if events & selectors.EVENT_WRITE:
conn = key.data
if conn.connecting():
conn.connect()
if not (events & selectors.EVENT_READ):
continue
conn = key.data
processed.add(conn)
Expand Down
10 changes: 5 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,16 +769,16 @@ def connection_delay(self):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting, returns 0 to allow
non-blocking connect to finish. When connected, returns a very large
number to handle slow/stalled connections.
the reconnect backoff time. When connecting or connected, returns a very
large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
elif self.connecting():
return 0
else:
# When connecting or connected, we should be able to delay
# indefinitely since other events (connection or data acked) will
# cause a wakeup once data can be sent.
return float('inf')

def connected(self):
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def run_once(self):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
self._client.poll(poll_timeout_ms)
self._client.poll(timeout_ms=poll_timeout_ms)

def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
Expand Down
2 changes: 1 addition & 1 deletion test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_conn_state_change(mocker, cli, conn):
sock = conn._sock
cli._conn_state_change(node_id, sock, conn)
assert node_id in cli._connecting
sel.register.assert_called_with(sock, selectors.EVENT_WRITE)
sel.register.assert_called_with(sock, selectors.EVENT_WRITE, conn)

conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, sock, conn)
Expand Down
2 changes: 1 addition & 1 deletion test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_connection_delay(conn):
conn.last_attempt = 1000
assert conn.connection_delay() == conn.config['reconnect_backoff_ms']
conn.state = ConnectionStates.CONNECTING
assert conn.connection_delay() == 0
assert conn.connection_delay() == float('inf')
conn.state = ConnectionStates.CONNECTED
assert conn.connection_delay() == float('inf')

Expand Down