diff --git a/kafka/conn.py b/kafka/conn.py index b9ef0e2d9..e48c8ae36 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -834,50 +834,42 @@ def _recv(self): """Take all available bytes from socket, return list of any responses from parser""" recvd = [] err = None + with self._lock: if not self._can_send_recv(): log.warning('%s cannot recv: socket not connected', self) return () - while len(recvd) < self.config['sock_chunk_buffer_count']: - try: - data = self._sock.recv(self.config['sock_chunk_bytes']) - # We expect socket.recv to raise an exception if there are no - # bytes available to read from the socket in non-blocking mode. - # but if the socket is disconnected, we will get empty data - # without an exception raised - if not data: - log.error('%s: socket disconnected', self) - err = Errors.KafkaConnectionError('socket disconnected') - break - else: + try: + # BlockingIOError is a PY2 catchall that should not be caught + while len(recvd) < self.config['sock_chunk_buffer_count']: + try: + data = self._sock.recv(self.config['sock_chunk_bytes']) + # We expect socket.recv to raise an exception if there are no + # bytes available to read from the socket in non-blocking mode. + # but if the socket is disconnected, we will get empty data + # without an exception raised + if not data: + raise Errors.KafkaConnectionError('socket disconnected') recvd.append(data) + except (SSLWantReadError, SSLWantWriteError): + # End receive loop and process data + break - except (SSLWantReadError, SSLWantWriteError): - break - except (ConnectionError, TimeoutError) as e: - log.exception('%s: Error receiving network data' - ' closing socket', self) - err = Errors.KafkaConnectionError(e) - break - except BlockingIOError: - break - # For PY2 this is a catchall and should be re-raised - raise - - # Only process bytes if there was no connection exception - if err is None: recvd_data = b''.join(recvd) + if self._sensors: self._sensors.bytes_received.record(len(recvd_data)) # We need to keep the lock through protocol receipt # so that we ensure that the processed byte order is the # same as the received byte order - try: - return self._protocol.receive_bytes(recvd_data) - except Errors.KafkaProtocolError as e: - err = e + return self._protocol.receive_bytes(recvd_data) + except (Errors.KafkaConnectionError, Errors.KafkaProtocolError) as e: + err = e + except (ConnectionError, TimeoutError) as e: + log.warning('%s: Error receiving network data closing socket', self, exc_info=True) + err = Errors.KafkaConnectionError(e) self.close(error=err) return ()