Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiong committed Apr 10, 2024
1 parent e686d6b commit f76365c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
7 changes: 5 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def _can_connect(self, node_id):
conn = self._conns[node_id]
return conn.disconnected() and not conn.blacked_out()

def _conn_state_change(self, node_id, sock, conn, ssl_upgraded = False):
def _conn_state_change(self, node_id, sock, conn):
with self._lock:
if conn.connecting():
# SSL connections can enter this state 2x (second during Handshake)
Expand All @@ -266,7 +266,10 @@ def _conn_state_change(self, node_id, sock, conn, ssl_upgraded = False):
try:
self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
if ssl_upgraded:
# SSL detaches the original socket, and transfers the
# underlying file descriptor to a new SSLSocket. We should
# explicitly unregister the original socket.
if conn.state == ConnectionStates.HANDSHAKE:
self._selector.unregister(sock)
self._selector.register(sock, selectors.EVENT_WRITE, conn)
else:
Expand Down
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def connect(self):
# _wrap_ssl can alter the connection state -- disconnects on failure
self._wrap_ssl()
self.state = ConnectionStates.HANDSHAKE
self.config['state_change_callback'](self.node_id, self._sock, self, True)
self.config['state_change_callback'](self.node_id, self._sock, self)

elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.debug('%s: initiating SASL authentication', self)
Expand Down

0 comments on commit f76365c

Please sign in to comment.