Skip to content

Commit

Permalink
Possibly clean up the receive code
Browse files Browse the repository at this point in the history
  • Loading branch information
c committed Aug 22, 2024
1 parent 9bee9fc commit 4d59f6e
Showing 1 changed file with 22 additions and 30 deletions.
52 changes: 22 additions & 30 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,50 +834,42 @@ def _recv(self):
"""Take all available bytes from socket, return list of any responses from parser"""
recvd = []
err = None

with self._lock:
if not self._can_send_recv():
log.warning('%s cannot recv: socket not connected', self)
return ()

while len(recvd) < self.config['sock_chunk_buffer_count']:
try:
data = self._sock.recv(self.config['sock_chunk_bytes'])
# We expect socket.recv to raise an exception if there are no
# bytes available to read from the socket in non-blocking mode.
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
err = Errors.KafkaConnectionError('socket disconnected')
break
else:
try:
# BlockingIOError is a PY2 catchall that should not be caught
while len(recvd) < self.config['sock_chunk_buffer_count']:
try:
data = self._sock.recv(self.config['sock_chunk_bytes'])
# We expect socket.recv to raise an exception if there are no
# bytes available to read from the socket in non-blocking mode.
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
raise Errors.KafkaConnectionError('socket disconnected')
recvd.append(data)
except (SSLWantReadError, SSLWantWriteError):
# End receive loop and process data
break

except (SSLWantReadError, SSLWantWriteError):
break
except (ConnectionError, TimeoutError) as e:
log.exception('%s: Error receiving network data'
' closing socket', self)
err = Errors.KafkaConnectionError(e)
break
except BlockingIOError:
break
# For PY2 this is a catchall and should be re-raised
raise

# Only process bytes if there was no connection exception
if err is None:
recvd_data = b''.join(recvd)

if self._sensors:
self._sensors.bytes_received.record(len(recvd_data))

# We need to keep the lock through protocol receipt
# so that we ensure that the processed byte order is the
# same as the received byte order
try:
return self._protocol.receive_bytes(recvd_data)
except Errors.KafkaProtocolError as e:
err = e
return self._protocol.receive_bytes(recvd_data)
except (Errors.KafkaConnectionError, Errors.KafkaProtocolError) as e:
err = e
except (ConnectionError, TimeoutError) as e:
log.warning('%s: Error receiving network data closing socket', self, exc_info=True)
err = Errors.KafkaConnectionError(e)

self.close(error=err)
return ()
Expand Down

0 comments on commit 4d59f6e

Please sign in to comment.