From 94b0ea5df8227f409097d6f61eec01b3ac66c2ba Mon Sep 17 00:00:00 2001 From: iscai-msft <43154838+iscai-msft@users.noreply.github.com> Date: Mon, 27 Apr 2020 12:08:28 -0400 Subject: [PATCH 1/3] fixed alternative document input samples (#11078) --- .../async_samples/sample_alternative_document_input_async.py | 2 +- .../samples/sample_alternative_document_input.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py index b70ba4461f5a..29f7589e70bd 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py @@ -44,7 +44,7 @@ async def alternative_document_input(self): "text": "L'hôtel n'était pas très confortable. L'éclairage était trop sombre."} ] async with text_analytics_client: - result = await text_analytics_client.analyze_sentiment(documents) + result = await text_analytics_client.detect_language(documents) for idx, doc in enumerate(result): if not doc.is_error: diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py index 581d08d48c45..339498f775fd 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py @@ -44,7 +44,7 @@ def alternative_document_input(self): "text": "L'hôtel n'était pas très confortable. L'éclairage était trop sombre."} ] - result = text_analytics_client.analyze_sentiment(documents) + result = text_analytics_client.detect_language(documents) for idx, doc in enumerate(result): if not doc.is_error: From ce8cfad2da10596ed69bd820d2e368bca3506df1 Mon Sep 17 00:00:00 2001 From: KieranBrantnerMagee Date: Mon, 27 Apr 2020 09:43:24 -0700 Subject: [PATCH 2/3] Add sync/async samples to demonstrate consuming from a number of sessions at one time. (#11001) * Add sync/async samples to demonstrate consuming from a number of sessions at one time. * Add informational message to session pool samples regarding the exit condition and how it manifests. --- .../session_pool_receive_async.py | 63 +++++++++++++++++ .../sync_samples/session_pool_receive.py | 68 +++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py create mode 100644 sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py new file mode 100644 index 000000000000..e442827b469f --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio +import uuid + +from azure.servicebus.aio import ServiceBusClient, AutoLockRenew +from azure.servicebus import NoActiveSession, Message + + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +# Note: This must be a session-enabled queue. +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +async def message_processing(servicebus_client, queue_name): + while True: + try: + async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver: + renewer = AutoLockRenew() + renewer.register(receiver.session, timeout=None) + await receiver.session.set_session_state("OPEN") + async for message in receiver: + print("Message: {}".format(message)) + print("Time to live: {}".format(message.header.time_to_live)) + print("Sequence number: {}".format(message.sequence_number)) + print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) + print("Partition ID: {}".format(message.partition_id)) + print("Partition Key: {}".format(message.partition_key)) + print("Locked until: {}".format(message.locked_until_utc)) + print("Lock Token: {}".format(message.lock_token)) + print("Enqueued time: {}".format(message.enqueued_time_utc)) + await message.complete() + if str(message) == 'shutdown': + await receiver.session.set_session_state("CLOSED") + break + renewer.shutdown() + except NoActiveSession: + print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.") + return + + +async def sample_session_send_receive_with_pool_async(connection_string, queue_name): + + concurrent_receivers = 5 + sessions = [str(uuid.uuid4()) for i in range(concurrent_receivers)] + client = ServiceBusClient.from_connection_string(connection_string) + + for session_id in sessions: + async with client.get_queue_sender(queue_name) as sender: + await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)]) + await sender.send(Message("shutdown", session_id=session_id)) + + receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)] + await asyncio.gather(*receive_sessions) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(sample_session_send_receive_with_pool_async(CONNECTION_STR, QUEUE_NAME)) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py new file mode 100644 index 000000000000..12c03b683f76 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import uuid +import concurrent + +from azure.servicebus import ServiceBusClient, Message, AutoLockRenew +from azure.servicebus import NoActiveSession + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +# Note: This must be a session-enabled queue. +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +def message_processing(sb_client, queue_name, messages): + while True: + try: + with sb_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver: + renewer = AutoLockRenew() + renewer.register(receiver.session, timeout=None) + receiver.session.set_session_state("OPEN") + for message in receiver: + messages.append(message) + print("Message: {}".format(message)) + print("Time to live: {}".format(message.header.time_to_live)) + print("Sequence number: {}".format(message.sequence_number)) + print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) + print("Partition ID: {}".format(message.partition_id)) + print("Partition Key: {}".format(message.partition_key)) + print("Locked until: {}".format(message.locked_until_utc)) + print("Lock Token: {}".format(message.lock_token)) + print("Enqueued time: {}".format(message.enqueued_time_utc)) + message.complete() + if str(message) == 'shutdown': + receiver.session.set_session_state("CLOSED") + renewer.shutdown() + except NoActiveSession: + print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.") + return + + +def sample_session_send_receive_with_pool(connection_string, queue_name): + + concurrent_receivers = 5 + sessions = [str(uuid.uuid4()) for i in range(2 * concurrent_receivers)] + with ServiceBusClient.from_connection_string(connection_string) as client: + + with client.get_queue_sender(queue_name) as sender: + for session_id in sessions: + for i in range(20): + message = Message("Sample message no. {}".format(i), session_id=session_id) + sender.send(message) + + all_messages = [] + futures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_receivers) as thread_pool: + for _ in range(concurrent_receivers): + futures.append(thread_pool.submit(message_processing, client, queue_name, all_messages)) + concurrent.futures.wait(futures) + + print("Received total {} messages across {} sessions.".format(len(all_messages), len(sessions))) + + +if __name__ == '__main__': + sample_session_send_receive_with_pool(CONNECTION_STR, QUEUE_NAME) From 91d96a8f20abf30af0234b7416f5329c36a54606 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Mon, 27 Apr 2020 10:09:00 -0700 Subject: [PATCH 3/3] [ServiceBus] Settle non-deferred message through receiver link (#10800) * settle non-deferred message through receiver link except dead_letter * revert dead-letter back to t1 as well * improve settlement and put is_deferred_letter into kwargs * add test * update according to comment * fix a bug in dead_letter through receiver_link --- .../azure/servicebus/_common/message.py | 103 ++++++++++++++---- .../azure/servicebus/_common/mgmt_handlers.py | 2 +- .../azure/servicebus/aio/_async_message.py | 64 +++++------ .../tests/async_tests/test_queues_async.py | 20 ++++ .../azure-servicebus/tests/test_queues.py | 20 ++++ 5 files changed, 154 insertions(+), 55 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index c9e4b027130a..926142970537 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -6,10 +6,12 @@ import datetime import uuid +import functools +import logging from typing import Optional, List, Union, Generator import uamqp -from uamqp import types +from uamqp import types, errors from .constants import ( _BATCH_MESSAGE_OVERHEAD_COST, @@ -34,7 +36,8 @@ MESSAGE_DEAD_LETTER, MESSAGE_ABANDON, MESSAGE_DEFER, - MESSAGE_RENEW_LOCK + MESSAGE_RENEW_LOCK, + DEADLETTERNAME ) from ..exceptions import ( MessageAlreadySettled, @@ -44,6 +47,8 @@ ) from .utils import utc_from_timestamp, utc_now +_LOGGER = logging.getLogger(__name__) + class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. @@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage): :dedent: 4 :caption: Checking the properties on a received message. """ - def __init__(self, message, mode=ReceiveSettleMode.PeekLock): + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): super(ReceivedMessage, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) + self._is_deferred_message = kwargs.get("is_deferred_message", False) self.auto_renew_error = None def _is_live(self, action): @@ -458,6 +464,69 @@ def _is_live(self, action): except AttributeError: pass + def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + try: + if not self._is_deferred_message: + try: + self._settle_via_receiver_link(settle_operation, dead_letter_details)() + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) + + def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None): + # pylint: disable=protected-access + if settle_operation == MESSAGE_COMPLETE: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_COMPLETE, + [self.lock_token], + ) + if settle_operation == MESSAGE_ABANDON: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_ABANDON, + [self.lock_token], + ) + if settle_operation == MESSAGE_DEAD_LETTER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEADLETTER, + [self.lock_token], + dead_letter_details=dead_letter_details + ) + if settle_operation == MESSAGE_DEFER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEFER, + [self.lock_token], + ) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + + def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None): + if settle_operation == MESSAGE_COMPLETE: + return functools.partial(self.message.accept) + if settle_operation == MESSAGE_ABANDON: + return functools.partial(self.message.modify, True, False) + if settle_operation == MESSAGE_DEAD_LETTER: + # note: message.reject() can not set reason and description properly due to the issue + # https://github.com/Azure/azure-uamqp-python/issues/155 + return functools.partial(self.message.reject, condition=DEADLETTERNAME) + if settle_operation == MESSAGE_DEFER: + return functools.partial(self.message.modify, True, True) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + @property def settled(self): # type: () -> bool @@ -535,11 +604,9 @@ def complete(self): :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ + # pylint: disable=protected-access self._is_live(MESSAGE_COMPLETE) - try: - self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_COMPLETE, e) + self._settle_message(MESSAGE_COMPLETE) self._settled = True def dead_letter(self, reason=None, description=None): @@ -560,17 +627,12 @@ def dead_letter(self, reason=None, description=None): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEAD_LETTER) + details = { MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "", MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""} - try: - self._receiver._settle_message( - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=details - ) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e) + + self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details) self._settled = True def abandon(self): @@ -585,11 +647,9 @@ def abandon(self): :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ + # pylint: disable=protected-access self._is_live(MESSAGE_ABANDON) - try: - self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_ABANDON, e) + self._settle_message(MESSAGE_ABANDON) self._settled = True def defer(self): @@ -606,10 +666,7 @@ def defer(self): :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live(MESSAGE_DEFER) - try: - self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEFER, e) + self._settle_message(MESSAGE_DEFER) self._settled = True def renew_lock(self): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py index b42d2cd955cb..48ebdfe156ec 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py @@ -69,7 +69,7 @@ def deferred_message_op( parsed = [] for m in message.get_data()[b'messages']: wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message'])) - parsed.append(message_type(wrapped, mode)) + parsed.append(message_type(wrapped, mode, is_deferred_message=True)) return parsed if status_code in [202, 204]: return [] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index 891188f1e9a3..9182b9457802 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -3,18 +3,13 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- +import logging from typing import Optional from .._common import message as sync_message from .._common.constants import ( - SETTLEMENT_ABANDON, - SETTLEMENT_COMPLETE, - SETTLEMENT_DEFER, - SETTLEMENT_DEADLETTER, ReceiveSettleMode, MGMT_RESPONSE_MESSAGE_EXPIRATION, - MGMT_REQUEST_DEAD_LETTER_REASON, - MGMT_REQUEST_DEAD_LETTER_DESCRIPTION, MESSAGE_COMPLETE, MESSAGE_DEAD_LETTER, MESSAGE_ABANDON, @@ -24,15 +19,41 @@ from .._common.utils import get_running_loop, utc_from_timestamp from ..exceptions import MessageSettleFailed +_LOGGER = logging.getLogger(__name__) + class ReceivedMessage(sync_message.ReceivedMessage): """A Service Bus Message received from service side. """ - def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None): + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs): self._loop = loop or get_running_loop() - super(ReceivedMessage, self).__init__(message=message, mode=mode) + super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs) + + async def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + try: + if not self._is_deferred_message: + try: + await self._loop.run_in_executor( + None, + self._settle_via_receiver_link(settle_operation, dead_letter_details) + ) + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + await self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) async def complete(self): # type: () -> None @@ -48,10 +69,7 @@ async def complete(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_COMPLETE) - try: - await self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_COMPLETE, e) + await self._settle_message(MESSAGE_COMPLETE) self._settled = True async def dead_letter(self, reason=None, description=None): @@ -71,17 +89,7 @@ async def dead_letter(self, reason=None, description=None): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEAD_LETTER) - details = { - MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "", - MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""} - try: - await self._receiver._settle_message( - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=details - ) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e) + await self._settle_message(MESSAGE_DEAD_LETTER) self._settled = True async def abandon(self): @@ -95,10 +103,7 @@ async def abandon(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_ABANDON) - try: - await self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_ABANDON, e) + await self._settle_message(MESSAGE_ABANDON) self._settled = True async def defer(self): @@ -112,10 +117,7 @@ async def defer(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEFER) - try: - await self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEFER, e) + await self._settle_message(MESSAGE_DEFER) self._settled = True async def renew_lock(self): diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index a4f8e5a3269a..df28865e22c4 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1076,3 +1076,23 @@ def test_queue_message_http_proxy_setting(self): receiver = sb_client.get_queue_receiver(queue_name="mock") assert receiver._config.http_proxy == http_proxy assert receiver._config.transport_type == TransportType.AmqpOverWebsocket + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False) as sb_client: + + async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + message = Message("Test") + await sender.send(message) + + async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + messages = await receiver.receive(max_wait_time=5) + await receiver._handler.message_handler.destroy_async() # destroy the underlying receiver link + assert len(messages) == 1 + await messages[0].complete() diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 95a6b61f11c5..125c2d662b88 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1202,3 +1202,23 @@ def test_queue_message_http_proxy_setting(self): receiver = sb_client.get_queue_receiver(queue_name="mock") assert receiver._config.http_proxy == http_proxy assert receiver._config.transport_type == TransportType.AmqpOverWebsocket + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False) as sb_client: + + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + message = Message("Test") + sender.send(message) + + with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + messages = receiver.receive(max_wait_time=5) + receiver._handler.message_handler.destroy() # destroy the underlying receiver link + assert len(messages) == 1 + messages[0].complete()