From eedf322e603298cb1c73dd1d01731491f11faa9a Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Wed, 12 Oct 2022 13:16:51 -0500 Subject: [PATCH 1/6] revert back to websocket-client --- .../eventhub/_pyamqp/aio/_transport_async.py | 146 +++++++----------- 1 file changed, 57 insertions(+), 89 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index 0fb9e0268b52..c370a4918dc4 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -149,6 +149,36 @@ async def send_frame(self, channel, frame, **kwargs): await self.write(data) # _LOGGER.info("OCH%d -> %r", channel, frame) +class AsyncTransport( + AsyncTransportMixin +): # pylint: disable=too-many-instance-attributes + """Common superclass for TCP and SSL transports.""" + + def __init__( + self, + host, + *, + port=AMQP_PORT, + connect_timeout=None, + ssl_opts=False, + socket_settings=None, + raise_on_initial_eintr=True, + **kwargs, # pylint: disable=unused-argument + ): + self.connected = False + self.sock = None + self.reader = None + self.writer = None + self.raise_on_initial_eintr = raise_on_initial_eintr + self._read_buffer = BytesIO() + self.host, self.port = to_host_port(host, port) + + self.connect_timeout = connect_timeout + self.socket_settings = socket_settings + self.loop = asyncio.get_running_loop() + self.socket_lock = asyncio.Lock() + self.sslopts = self._build_ssl_opts(ssl_opts) + def _build_ssl_opts(self, sslopts): if sslopts in [True, False, None, {}]: return sslopts @@ -191,37 +221,6 @@ def _build_ssl_context( ctx.check_hostname = check_hostname return ctx - -class AsyncTransport( - AsyncTransportMixin -): # pylint: disable=too-many-instance-attributes - """Common superclass for TCP and SSL transports.""" - - def __init__( - self, - host, - *, - port=AMQP_PORT, - connect_timeout=None, - ssl_opts=False, - socket_settings=None, - raise_on_initial_eintr=True, - **kwargs, # pylint: disable=unused-argument - ): - self.connected = False - self.sock = None - self.reader = None - self.writer = None - self.raise_on_initial_eintr = raise_on_initial_eintr - self._read_buffer = BytesIO() - self.host, self.port = to_host_port(host, port) - - self.connect_timeout = connect_timeout - self.socket_settings = socket_settings - self.loop = asyncio.get_running_loop() - self.socket_lock = asyncio.Lock() - self.sslopts = self._build_ssl_opts(ssl_opts) - async def connect(self): try: # are we already connected? @@ -425,87 +424,56 @@ async def negotiate(self): ) -class WebSocketTransportAsync( - AsyncTransportMixin -): # pylint: disable=too-many-instance-attributes - def __init__( - self, - host, - *, - port=WEBSOCKET_PORT, - connect_timeout=None, - ssl_opts=None, - **kwargs - ): +class WebSocketTransportAsync(AsyncTransportMixin): + def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, **kwargs): self._read_buffer = BytesIO() + self.loop = asyncio.get_running_loop() self.socket_lock = asyncio.Lock() - self.sslopts = self._build_ssl_opts(ssl_opts) if isinstance(ssl_opts, dict) else None + self.sslopts = ssl if isinstance(ssl, dict) else {} self._connect_timeout = connect_timeout or TIMEOUT_INTERVAL self._custom_endpoint = kwargs.get("custom_endpoint") - self.host, self.port = to_host_port(host, port) + self.host = host self.ws = None - self.session = None self._http_proxy = kwargs.get("http_proxy", None) - self.connected = False async def connect(self): - username, password = None, None - http_proxy_host, http_proxy_port = None, None - http_proxy_auth = None - + http_proxy_host, http_proxy_port, http_proxy_auth = None, None, None if self._http_proxy: http_proxy_host = self._http_proxy["proxy_hostname"] http_proxy_port = self._http_proxy["proxy_port"] - if http_proxy_host and http_proxy_port: - http_proxy_host = f"{http_proxy_host}:{http_proxy_port}" username = self._http_proxy.get("username", None) password = self._http_proxy.get("password", None) - - try: - from aiohttp import ClientSession - from urllib.parse import urlsplit - if username or password: - from aiohttp import BasicAuth - - http_proxy_auth = BasicAuth(login=username, password=password) - - self.session = ClientSession() - if self._custom_endpoint: - url = f"wss://{self._custom_endpoint}" - else: - url = f"wss://{self.host}" - parsed_url = urlsplit(url) - url = f"{parsed_url.scheme}://{parsed_url.netloc}:{self.port}{parsed_url.path}" + http_proxy_auth = (username, password) + try: + from websocket import create_connection - self.ws = await self.session.ws_connect( - url=url, + self.ws = create_connection( + url="wss://{}".format(self._custom_endpoint or self.host), + subprotocols=[AMQP_WS_SUBPROTOCOL], timeout=self._connect_timeout, - protocols=[AMQP_WS_SUBPROTOCOL], - autoclose=False, - proxy=http_proxy_host, - proxy_auth=http_proxy_auth, - ssl=self.sslopts, + skip_utf8_validation=True, + sslopt=self.sslopts, + http_proxy_host=http_proxy_host, + http_proxy_port=http_proxy_port, + http_proxy_auth=http_proxy_auth, ) - self.connected = True - except ImportError: - raise ValueError( - "Please install aiohttp library to use websocket transport." - ) + raise ValueError("Please install websocket-client library to use websocket transport.") - async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-argument + async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-arguments """Read exactly n bytes from the peer.""" + from websocket import WebSocketTimeoutException length = 0 view = buffer or memoryview(bytearray(n)) nbytes = self._read_buffer.readinto(view) length += nbytes n -= nbytes - try: while n: - data = await self.ws.receive_bytes() + data = await self.loop.run_in_executor(None, self.ws.recv) + if len(data) <= n: view[length : length + len(data)] = data n -= len(data) @@ -513,20 +481,20 @@ async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-argum view[length : length + n] = data[0:n] self._read_buffer = BytesIO(data[n:]) n = 0 + return view - except asyncio.TimeoutError: + except WebSocketTimeoutException: raise TimeoutError() async def close(self): """Do any preliminary work in shutting down the connection.""" - await self.ws.close() - await self.session.close() + await self.loop.run_in_executor(None, self.ws.close) self.connected = False async def write(self, s): - """Completely write a string (byte array) to the peer. + """Completely write a string to the peer. ABNF, OPCODE_BINARY = 0x2 See http://tools.ietf.org/html/rfc5234 http://tools.ietf.org/html/rfc6455#section-5.2 """ - await self.ws.send_bytes(s) + await self.loop.run_in_executor(None, self.ws.send_binary, s) \ No newline at end of file From c720cb731d6cde5c047071a1adf82c6461faf05b Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Wed, 12 Oct 2022 13:50:16 -0500 Subject: [PATCH 2/6] pylint --- .../azure/eventhub/_pyamqp/aio/_transport_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index c370a4918dc4..ce37bb75866e 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -424,7 +424,7 @@ async def negotiate(self): ) -class WebSocketTransportAsync(AsyncTransportMixin): +class WebSocketTransportAsync(AsyncTransportMixin): # pylint: disable=too-many-instance-attributes def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, **kwargs): self._read_buffer = BytesIO() self.loop = asyncio.get_running_loop() @@ -461,7 +461,7 @@ async def connect(self): except ImportError: raise ValueError("Please install websocket-client library to use websocket transport.") - async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-arguments + async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-argument """Read exactly n bytes from the peer.""" from websocket import WebSocketTimeoutException From 496a7653ad570dcd447d42f1adf8d915638cfa12 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Wed, 12 Oct 2022 15:03:13 -0500 Subject: [PATCH 3/6] pylint fixes --- .../eventhub/_pyamqp/aio/_transport_async.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index ce37bb75866e..d0caf115753c 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -425,16 +425,25 @@ async def negotiate(self): class WebSocketTransportAsync(AsyncTransportMixin): # pylint: disable=too-many-instance-attributes - def __init__(self, host, port=WEBSOCKET_PORT, connect_timeout=None, ssl=None, **kwargs): + def __init__( + self, + host, + *, + port=WEBSOCKET_PORT, + connect_timeout=None, + ssl_opts=None, + **kwargs + ): # pylint: disable=unused-argument self._read_buffer = BytesIO() self.loop = asyncio.get_running_loop() self.socket_lock = asyncio.Lock() - self.sslopts = ssl if isinstance(ssl, dict) else {} + self.sslopts = ssl_opts if isinstance(ssl_opts, dict) else {} self._connect_timeout = connect_timeout or TIMEOUT_INTERVAL self._custom_endpoint = kwargs.get("custom_endpoint") self.host = host self.ws = None self._http_proxy = kwargs.get("http_proxy", None) + self.connected = False async def connect(self): http_proxy_host, http_proxy_port, http_proxy_auth = None, None, None @@ -458,6 +467,7 @@ async def connect(self): http_proxy_port=http_proxy_port, http_proxy_auth=http_proxy_auth, ) + self.connected = True except ImportError: raise ValueError("Please install websocket-client library to use websocket transport.") @@ -497,4 +507,4 @@ async def write(self, s): See http://tools.ietf.org/html/rfc5234 http://tools.ietf.org/html/rfc6455#section-5.2 """ - await self.loop.run_in_executor(None, self.ws.send_binary, s) \ No newline at end of file + await self.loop.run_in_executor(None, self.ws.send_binary, s) From e8ee01cfc038318dd87dceb48acef4de7879b875 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Fri, 14 Oct 2022 08:41:58 -0500 Subject: [PATCH 4/6] added wait_close when closing writer --- .../azure/eventhub/_pyamqp/aio/_transport_async.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index d0caf115753c..e874cdfb1f53 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -378,7 +378,8 @@ async def _read( async def _write(self, s): """Write a string out to the SSL socket fully.""" - self.writer.write(s) + self.loop.run_in_executor(None, self.writer.write,s) + await self.writer.drain() async def close(self): if self.writer is not None: @@ -386,6 +387,7 @@ async def close(self): # see issue: https://github.com/encode/httpx/issues/914 self.writer.transport.abort() self.writer.close() + await self.writer.wait_closed() self.writer, self.reader = None, None self.sock = None self.connected = False From 2ae9aed58667897f009afa0e9bf52946aafe3eaa Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Fri, 14 Oct 2022 09:10:44 -0500 Subject: [PATCH 5/6] set back close to not async --- .../azure/eventhub/_pyamqp/aio/_transport_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index e874cdfb1f53..0e4c9d501130 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -500,7 +500,7 @@ async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-argum async def close(self): """Do any preliminary work in shutting down the connection.""" - await self.loop.run_in_executor(None, self.ws.close) + self.ws.close() self.connected = False async def write(self, s): From 58a86037c41e90ab0c9f10969407ccc39bea0fb6 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Fri, 14 Oct 2022 10:41:21 -0500 Subject: [PATCH 6/6] reset other changes --- .../azure/eventhub/_pyamqp/aio/_connection_async.py | 2 +- .../azure/eventhub/_pyamqp/aio/_transport_async.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index b3a18ea83e2b..adfd67fdbe15 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -212,7 +212,7 @@ async def _disconnect(self) -> None: if self.state == ConnectionState.END: return await self._set_state(ConnectionState.END) - await self._transport.close() + self._transport.close() def _can_read(self): # type: () -> bool diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index 0e4c9d501130..eaa311cd2058 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -381,13 +381,12 @@ async def _write(self, s): self.loop.run_in_executor(None, self.writer.write,s) await self.writer.drain() - async def close(self): + def close(self): if self.writer is not None: if self.sslopts: # see issue: https://github.com/encode/httpx/issues/914 self.writer.transport.abort() self.writer.close() - await self.writer.wait_closed() self.writer, self.reader = None, None self.sock = None self.connected = False @@ -498,7 +497,7 @@ async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-argum except WebSocketTimeoutException: raise TimeoutError() - async def close(self): + def close(self): """Do any preliminary work in shutting down the connection.""" self.ws.close() self.connected = False