From 0d011183402b1ce50bfb7ea11f8dc7cc217b1e93 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Fri, 30 Sep 2022 11:26:44 -0500 Subject: [PATCH 1/3] remove unnecessary pops --- .../azure-eventhub/azure/eventhub/_pyamqp/_connection.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py index 8a61136443b7..53d2fb64b37b 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py @@ -506,8 +506,6 @@ def _incoming_end(self, channel, frame): description="Invalid channel number received" )) return - self._incoming_endpoints.pop(channel) - self._outgoing_endpoints.pop(channel) def _process_incoming_frame(self, channel, frame): # pylint:disable=too-many-return-statements # type: (int, Optional[Union[bytes, Tuple[int, Tuple[Any, ...]]]]) -> bool From 0eb4d332b31d9e3e93f417c5777731d8da913122 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Fri, 30 Sep 2022 11:26:59 -0500 Subject: [PATCH 2/3] fix var name + remove unnecessary pop --- .../azure/eventhub/_pyamqp/aio/_connection_async.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py index cc84b642fba4..afe38c696b03 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py @@ -470,8 +470,8 @@ async def _incoming_end(self, channel, frame): """ try: await self._incoming_endpoints[channel]._incoming_end(frame) # pylint:disable=protected-access - self.incoming_endpoints.pop(channel) - self.outgoing_endpoints.pop(channel) + self._incoming_endpoints.pop(channel) + self._outgoing_endpoints.pop(channel) except KeyError: #close the connection await self.close( @@ -480,8 +480,6 @@ async def _incoming_end(self, channel, frame): description="Invalid channel number received" )) return - self._incoming_endpoints.pop(channel) - self._outgoing_endpoints.pop(channel) async def _process_incoming_frame(self, channel, frame): # pylint:disable=too-many-return-statements # type: (int, Optional[Union[bytes, Tuple[int, Tuple[Any, ...]]]]) -> bool From ff022839bf5c7c89540e394c32b2c149f847e760 Mon Sep 17 00:00:00 2001 From: Kashif Khan Date: Fri, 30 Sep 2022 17:23:43 -0500 Subject: [PATCH 3/3] fix --- .../azure/eventhub/_pyamqp/_transport.py | 45 ++++++++++--------- .../eventhub/_pyamqp/aio/_transport_async.py | 45 ++++++++++--------- 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py index d65a5cfdd9b3..f86cb192ddb9 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py @@ -285,6 +285,28 @@ def _connect(self, host, port, timeout): try: entries = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, SOL_TCP) entries_num = len(entries) + # now that we have address(es) for the hostname, connect to broker + for i, res in enumerate(entries): + af, socktype, proto, _, sa = res + try: + self.sock = socket.socket(af, socktype, proto) + try: + set_cloexec(self.sock, True) + except NotImplementedError: + pass + self.sock.settimeout(timeout) + self.sock.connect(sa) + except socket.error as ex: + e = ex + if self.sock is not None: + self.sock.close() + self.sock = None + # we may have depleted all our options + if i + 1 >= entries_num and n + 1 >= addr_types_num: + raise + else: + # hurray, we established connection + return except socket.gaierror: # we may have depleted all our options if n + 1 >= addr_types_num: @@ -294,28 +316,7 @@ def _connect(self, host, port, timeout): raise e if e is not None else socket.error("failed to resolve broker hostname") continue # pragma: no cover - # now that we have address(es) for the hostname, connect to broker - for i, res in enumerate(entries): - af, socktype, proto, _, sa = res - try: - self.sock = socket.socket(af, socktype, proto) - try: - set_cloexec(self.sock, True) - except NotImplementedError: - pass - self.sock.settimeout(timeout) - self.sock.connect(sa) - except socket.error as ex: - e = ex - if self.sock is not None: - self.sock.close() - self.sock = None - # we may have depleted all our options - if i + 1 >= entries_num and n + 1 >= addr_types_num: - raise - else: - # hurray, we established connection - return + def _init_socket(self, socket_settings, read_timeout, write_timeout): self.sock.settimeout(None) # set socket back to blocking mode diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py index cd4e8d49514a..a5e421c52075 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_transport_async.py @@ -257,6 +257,28 @@ async def _connect(self, host, port, timeout): try: entries = await self.loop.getaddrinfo(host, port, family=family, type=socket.SOCK_STREAM, proto=SOL_TCP) entries_num = len(entries) + # now that we have address(es) for the hostname, connect to broker + for i, res in enumerate(entries): + af, socktype, proto, _, sa = res + try: + self.sock = socket.socket(af, socktype, proto) + try: + set_cloexec(self.sock, True) + except NotImplementedError: + pass + self.sock.settimeout(timeout) + await self.loop.sock_connect(self.sock, sa) + except socket.error as ex: + e = ex + if self.sock is not None: + self.sock.close() + self.sock = None + # we may have depleted all our options + if i + 1 >= entries_num and n + 1 >= addr_types_num: + raise + else: + # hurray, we established connection + return except socket.gaierror: # we may have depleted all our options if n + 1 >= addr_types_num: @@ -266,28 +288,7 @@ async def _connect(self, host, port, timeout): raise e if e is not None else socket.error("failed to resolve broker hostname") continue # pragma: no cover - # now that we have address(es) for the hostname, connect to broker - for i, res in enumerate(entries): - af, socktype, proto, _, sa = res - try: - self.sock = socket.socket(af, socktype, proto) - try: - set_cloexec(self.sock, True) - except NotImplementedError: - pass - self.sock.settimeout(timeout) - await self.loop.sock_connect(self.sock, sa) - except socket.error as ex: - e = ex - if self.sock is not None: - self.sock.close() - self.sock = None - # we may have depleted all our options - if i + 1 >= entries_num and n + 1 >= addr_types_num: - raise - else: - # hurray, we established connection - return + def _init_socket(self, socket_settings, read_timeout, write_timeout): self.sock.settimeout(None) # set socket back to blocking mode