Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Eventhub] Changes made from perf testing #27703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,35 +146,42 @@ def __repr__(self) -> str:
# pylint: disable=bare-except
try:
body_str = self.body_as_str()
except:
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message body read error: %r", e)
body_str = "<read-error>"
event_repr = f"body='{body_str}'"
try:
event_repr += f", properties={self.properties}"
except:
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message properties read error: %r", e)
event_repr += ", properties=<read-error>"
try:
event_repr += f", offset={self.offset}"
except:
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message offset read error: %r", e)
event_repr += ", offset=<read-error>"
try:
event_repr += f", sequence_number={self.sequence_number}"
except:
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message sequence number read error: %r", e)
event_repr += ", sequence_number=<read-error>"
try:
event_repr += f", partition_key={self.partition_key!r}"
except:
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message partition key read error: %r", e)
event_repr += ", partition_key=<read-error>"
try:
event_repr += f", enqueued_time={self.enqueued_time!r}"
except:
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message enqueued time read error: %r", e)
event_repr += ", enqueued_time=<read-error>"
return f"EventData({event_repr})"

def __str__(self) -> str:
try:
body_str = self.body_as_str()
except: # pylint: disable=bare-except
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message body read error: %r", e)
body_str = "<read-error>"
event_str = f"{{ body: '{body_str}'"
try:
Expand All @@ -187,8 +194,8 @@ def __str__(self) -> str:
event_str += f", partition_key={self.partition_key!r}"
if self.enqueued_time:
event_str += f", enqueued_time={self.enqueued_time!r}"
except: # pylint: disable=bare-except
pass
except Exception as e: # pylint: disable=broad-except
_LOGGER.debug("Message metadata read error: %r", e)
event_str += " }"
return event_str

Expand Down Expand Up @@ -416,9 +423,11 @@ def body_as_str(self, encoding: str = "UTF-8") -> str:
if self.body_type != AmqpMessageBodyType.DATA:
return self._decode_non_data_body_as_str(encoding=encoding)
return "".join(b.decode(encoding) for b in cast(Iterable[bytes], data))
except TypeError:
except UnicodeDecodeError as e:
raise TypeError(f"Message data is not compatible with string type: {e}")
except TypeError as e:
return str(data)
except: # pylint: disable=bare-except
except Exception: # pylint: disable=broad-except
pass
try:
return cast(bytes, data).decode(encoding)
Expand Down
140 changes: 85 additions & 55 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py

Large diffs are not rendered by default.

222 changes: 121 additions & 101 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ def __init__(
read_timeout=None,
socket_settings=None,
raise_on_initial_eintr=True,
**kwargs # pylint: disable=unused-argument
**kwargs
):
self._quick_recv = None
self.connected = False
self.sock = None
self.raise_on_initial_eintr = raise_on_initial_eintr
self._read_buffer = BytesIO()
self.host, self.port = to_host_port(host, port)
self.network_trace_params = kwargs.get('network_trace_params')

self.connect_timeout = connect_timeout or TIMEOUT_INTERVAL
self.read_timeout = read_timeout or READ_TIMEOUT_INTERVAL
Expand All @@ -185,7 +186,8 @@ def connect(self):
# EINTR, EAGAIN, EWOULDBLOCK would signal that the banner
# has _not_ been sent
self.connected = True
except (OSError, IOError, SSLError):
except (OSError, IOError, SSLError) as e:
_LOGGER.info("Transport connection failed: %r", e, extra=self.network_trace_params)
# if not fully connected, close socket, and reraise error
if self.sock and not self.connected:
self.sock.close()
Expand Down Expand Up @@ -387,76 +389,91 @@ def _write(self, s):
raise NotImplementedError("Must be overriden in subclass")

def close(self):
if self.sock is not None:
self._shutdown_transport()
# Call shutdown first to make sure that pending messages
# reach the AMQP broker if the program exits after
# calling this method.
try:
self.sock.shutdown(socket.SHUT_RDWR)
except Exception as exc: # pylint: disable=broad-except
# TODO: shutdown could raise OSError, Transport endpoint is not connected if the endpoint is already
# disconnected. can we safely ignore the errors since the close operation is initiated by us.
_LOGGER.info("Transport endpoint is already disconnected: %r", exc)
self.sock.close()
self.sock = None
self.connected = False
with self.socket_lock:
if self.sock is not None:
self._shutdown_transport()
# Call shutdown first to make sure that pending messages
# reach the AMQP broker if the program exits after
# calling this method.
try:
self.sock.shutdown(socket.SHUT_RDWR)
except Exception as exc: # pylint: disable=broad-except
# TODO: shutdown could raise OSError, Transport endpoint is not connected if the endpoint is already
# disconnected. can we safely ignore the errors since the close operation is initiated by us.
_LOGGER.debug(
"Transport endpoint is already disconnected: %r",
exc,
extra=self.network_trace_params
)
self.sock.close()
self.sock = None
self.connected = False

def read(self, verify_frame_type=0):
read = self._read
read_frame_buffer = BytesIO()
try:
frame_header = memoryview(bytearray(8))
read_frame_buffer.write(read(8, buffer=frame_header, initial=True))

channel = struct.unpack(">H", frame_header[6:])[0]
size = frame_header[0:4]
if size == AMQP_FRAME: # Empty frame or AMQP header negotiation TODO
return frame_header, channel, None
size = struct.unpack(">I", size)[0]
offset = frame_header[4]
frame_type = frame_header[5]
if verify_frame_type is not None and frame_type != verify_frame_type:
_LOGGER.debug(
"Received invalid frame type: %r, expected: %r", frame_type, verify_frame_type
)
with self.socket_lock:
read = self._read
read_frame_buffer = BytesIO()
try:
frame_header = memoryview(bytearray(8))
read_frame_buffer.write(read(8, buffer=frame_header, initial=True))

channel = struct.unpack(">H", frame_header[6:])[0]
size = frame_header[0:4]
if size == AMQP_FRAME: # Empty frame or AMQP header negotiation TODO
return frame_header, channel, None
size = struct.unpack(">I", size)[0]
offset = frame_header[4]
frame_type = frame_header[5]
if verify_frame_type is not None and frame_type != verify_frame_type:
_LOGGER.debug(
"Received invalid frame type: %r, expected: %r",
frame_type,
verify_frame_type,
extra=self.network_trace_params
)
raise ValueError(
f"Received invalid frame type: {frame_type}, expected: {verify_frame_type}"
)

# >I is an unsigned int, but the argument to sock.recv is signed,
# so we know the size can be at most 2 * SIGNED_INT_MAX
payload_size = size - len(frame_header)
payload = memoryview(bytearray(payload_size))
if size > SIGNED_INT_MAX:
read_frame_buffer.write(read(SIGNED_INT_MAX, buffer=payload))
read_frame_buffer.write(
read(size - SIGNED_INT_MAX, buffer=payload[SIGNED_INT_MAX:])
)
else:
read_frame_buffer.write(read(payload_size, buffer=payload))
except (socket.timeout, TimeoutError):
read_frame_buffer.write(self._read_buffer.getvalue())
self._read_buffer = read_frame_buffer
self._read_buffer.seek(0)
raise
except (OSError, IOError, SSLError, socket.error) as exc:
# Don't disconnect for ssl read time outs
# http://bugs.python.org/issue10272
if isinstance(exc, SSLError) and "timed out" in str(exc):
raise socket.timeout()
if get_errno(exc) not in _UNAVAIL:
self.connected = False
raise
offset -= 2
# >I is an unsigned int, but the argument to sock.recv is signed,
# so we know the size can be at most 2 * SIGNED_INT_MAX
payload_size = size - len(frame_header)
payload = memoryview(bytearray(payload_size))
if size > SIGNED_INT_MAX:
read_frame_buffer.write(read(SIGNED_INT_MAX, buffer=payload))
read_frame_buffer.write(
read(size - SIGNED_INT_MAX, buffer=payload[SIGNED_INT_MAX:])
)
else:
read_frame_buffer.write(read(payload_size, buffer=payload))
except (socket.timeout, TimeoutError):
read_frame_buffer.write(self._read_buffer.getvalue())
self._read_buffer = read_frame_buffer
self._read_buffer.seek(0)
raise
except (OSError, IOError, SSLError, socket.error) as exc:
# Don't disconnect for ssl read time outs
# http://bugs.python.org/issue10272
if isinstance(exc, SSLError) and "timed out" in str(exc):
raise socket.timeout()
if get_errno(exc) not in _UNAVAIL:
self.connected = False
_LOGGER.debug("Transport read failed: %r", exc, extra=self.network_trace_params)
raise
offset -= 2
return frame_header, channel, payload[offset:]

def write(self, s):
try:
self._write(s)
except socket.timeout:
raise
except (OSError, IOError, socket.error) as exc:
if get_errno(exc) not in _UNAVAIL:
self.connected = False
raise
with self.socket_lock:
try:
self._write(s)
except socket.timeout:
raise
except (OSError, IOError, socket.error) as exc:
_LOGGER.debug("Transport write failed: %r", exc, extra=self.network_trace_params)
if get_errno(exc) not in _UNAVAIL:
self.connected = False
raise

def receive_frame(self, **kwargs):
try:
Expand Down Expand Up @@ -719,14 +736,9 @@ def connect(self):
# TODO: resolve pylance error when type: ignore is removed below, issue #22051
except (WebSocketTimeoutException, SSLError, WebSocketConnectionClosedException) as exc: # type: ignore
self.close()
if isinstance(exc, WebSocketTimeoutException):
message = f'Send timed out ({str(exc)})'
elif isinstance(exc, SSLError):
message = f'Send disconnected by SSL ({str(exc)})'
else:
message = f'Send disconnected ({str(exc)})'
raise ConnectionError(message)
except (OSError, IOError, SSLError):
raise ConnectionError("Websocket failed to establish connection: %r" % exc) from exc
except (OSError, IOError, SSLError) as e:
_LOGGER.info("Websocket connection failed: %r", e, extra=self.network_trace_params)
self.close()
raise
except ImportError:
Expand All @@ -737,35 +749,43 @@ def connect(self):
def _read(self, n, initial=False, buffer=None, _errnos=None): # pylint: disable=unused-argument
"""Read exactly n bytes from the peer."""
from websocket import WebSocketTimeoutException

length = 0
view = buffer or memoryview(bytearray(n))
nbytes = self._read_buffer.readinto(view)
length += nbytes
n -= nbytes
try:
while n:
data = self.ws.recv()
if len(data) <= n:
view[length : length + len(data)] = data
n -= len(data)
else:
view[length : length + n] = data[0:n]
self._read_buffer = BytesIO(data[n:])
n = 0
return view
except WebSocketTimeoutException as wte:
raise ConnectionError('Receive timed out (%s)' % wte)
length = 0
view = buffer or memoryview(bytearray(n))
nbytes = self._read_buffer.readinto(view)
length += nbytes
n -= nbytes
try:
while n:
data = self.ws.recv()
if len(data) <= n:
view[length : length + len(data)] = data
n -= len(data)
length += len(data)
else:
view[length : length + n] = data[0:n]
self._read_buffer = BytesIO(data[n:])
n = 0
return view
except AttributeError:
raise IOError("Websocket connection has already been closed.")
except WebSocketTimeoutException as wte:
raise TimeoutError('Websocket receive timed out (%s)' % wte)
except:
self._read_buffer = BytesIO(view[:length])
raise

def close(self):
if self.ws:
self._shutdown_transport()
self.ws = None
with self.socket_lock:
if self.ws:
self._shutdown_transport()
self.ws = None

def _shutdown_transport(self):
# TODO Sync and Async close functions named differently
"""Do any preliminary work in shutting down the connection."""
self.ws.close()
if self.ws:
self.ws.close()

def _write(self, s):
"""Completely write a string to the peer.
Expand All @@ -776,10 +796,10 @@ def _write(self, s):
from websocket import WebSocketConnectionClosedException, WebSocketTimeoutException
try:
self.ws.send_binary(s)
except AttributeError:
raise IOError("Websocket connection has already been closed.")
except WebSocketTimeoutException as e:
raise ConnectionError('Send timed out (%s)' % e)
except SSLError as e:
raise ConnectionError('Send disconnected by SSL (%s)' % e)
except WebSocketConnectionClosedException as e:
raise ConnectionError('Send disconnected (%s)' % e)
raise socket.timeout('Websocket send timed out (%s)' % e)
except (WebSocketConnectionClosedException, SSLError) as e:
raise ConnectionError('Websocket disconnected: %r' % e)

Loading