diff --git a/sdk/keyvault/azure-keyvault-secrets/README.md b/sdk/keyvault/azure-keyvault-secrets/README.md index 763d271d75d0..59dbca16ec28 100644 --- a/sdk/keyvault/azure-keyvault-secrets/README.md +++ b/sdk/keyvault/azure-keyvault-secrets/README.md @@ -415,6 +415,7 @@ additional questions or comments. [recover_purge_sample]: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/keyvault/azure-keyvault-secrets/samples/recover_purge_operations.py [recover_purge_async_sample]: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/keyvault/azure-keyvault-secrets/samples/recover_purge_operations_async.py [keyvault_docs]: https://docs.microsoft.com/en-us/azure/key-vault/ +[pip]: https://pypi.org/project/pip/ [pypi_package_secrets]: https://pypi.org/project/azure-keyvault-secrets/ [reference_docs]: https://aka.ms/azsdk-python-keyvault-secrets-ref [secret_client_src]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index 9f155ee1967a..f0b0938ec8e3 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -7,7 +7,7 @@ publish/subscribe capabilities, and the ability to easily scale as your needs gr Use the Service Bus client library for Python to communicate between applications and services and implement asynchronous messaging patterns. -* Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings +* Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings. * Send and receive messages within your Service Bus channels. * Utilize message locks, sessions, and dead letter functionality to implement complex messaging patterns. @@ -29,7 +29,7 @@ pip install azure-servicebus --pre To use this package, you must have: * Azure subscription - [Create a free account][azure_sub] * Azure Service Bus - [Namespace and management credentials][service_bus_namespace] -* Python 2.7, 3.5, 3.6, 3.7 or 3.8 - [Install Python][python] +* Python 2.7, 3.5 or later - [Install Python][python] If you need an Azure service bus namespace, you can create it via the [Azure Portal][azure_namespace_creation]. @@ -43,19 +43,17 @@ az servicebus namespace create --resource-group --name NAMESPACE_NAME= -export SERVICE_BUS_CONN_STR=$(az servicebus namespace authorization-rule keys list --resource-group $RES_GROUP --namespace-name $NAMESPACE_NAME --query RootManageSharedAccessKey --output tsv) +export SERVICE_BUS_CONN_STR=$(az servicebus namespace authorization-rule keys list --resource-group $RES_GROUP --namespace-name $NAMESPACE_NAME --name RootManageSharedAccessKey --query primaryConnectionString --output tsv) ``` -#### Create client - Once you've populated the `SERVICE_BUS_CONN_STR` environment variable, you can create the `ServiceBusClient`. ```Python @@ -68,6 +66,28 @@ with ServiceBusClient.from_connection_string(connstr) as client: ... ``` +#### Create client using the azure-identity library: + +```python +import os +from azure.servicebus import ServiceBusClient +from azure.identity import DefaultAzureCredential + +credential = DefaultAzureCredential() + +FULLY_QUALIFIED_NAMESPACE = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] +with ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential): + ... +``` + +- This constructor takes the fully qualified namespace of your Service Bus instance and a credential that implements the +[TokenCredential][token_credential_interface] +protocol. There are implementations of the `TokenCredential` protocol available in the +[azure-identity package][pypi_azure_identity]. The fully qualified namespace is of the format ``. +- When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such as the +Azure Service Bus Data Owner role. For more information about using Azure Active Directory authorization with Service Bus, +please refer to [the associated documentation][servicebus_aad_authentication]. + Note: client can be initialized without a context manager, but must be manually closed via client.close() to not leak resources. ## Key concepts @@ -88,7 +108,7 @@ To interact with these resources, one should be familiar with the following SDK * [Sender](./azure/servicebus/_servicebus_sender.py): To send messages to a Queue or Topic, one would use the corresponding `get_queue_sender` or `get_topic_sender` method off of a `ServiceBusClient` instance as seen [here](./samples/sync_samples/send_queue.py). -* [Receiver](./azure/servicebus/_servicebus_receiver.py): To receive messages from a Queue or Subscription, one would use the corrosponding `get_queue_receiver` or `get_subscription_receiver` method off of a `ServiceBusClient` instance as seen [here](./samples/sync_samples/receive_queue.py). +* [Receiver](./azure/servicebus/_servicebus_receiver.py): To receive messages from a Queue or Subscription, one would use the corresponding `get_queue_receiver` or `get_subscription_receiver` method off of a `ServiceBusClient` instance as seen [here](./samples/sync_samples/receive_queue.py). * [Message](./azure/servicebus/_common/message.py): When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload and control how the message is "settled" (completed, dead-lettered, etc); these functions are only available on a received message. @@ -96,14 +116,14 @@ To interact with these resources, one should be familiar with the following SDK The following sections provide several code snippets covering some of the most common Service Bus tasks, including: -* [Send a message to a queue](#send-to-a-queue) -* [Receive a message from a queue](#receive-from-a-queue) -* [Defer a message on receipt](#defer-a-message) +* [Send a message to a queue](#send-a-message-to-a-queue) +* [Receive a message from a queue](#receive-a-message-from-a-queue) +* [Defer a message on receipt](#defer-a-message-on-receipt) To perform management tasks such as creating and deleting queues/topics/subscriptions, please utilize the azure-mgmt-servicebus library, available [here][servicebus_management_repository]. -### Send to a queue +### Send a message to a queue This example sends a message to a queue that is assumed to already exist, created via the Azure portal or az commands. @@ -121,7 +141,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: sender.send(message) ``` -### Receive from a queue +### Receive a message from a queue To receive from a queue, you can either perform a one-off receive via "receiver.receive()" or receive persistently as follows: @@ -139,7 +159,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: msg.complete() ``` -### Defer a message +### Defer a message on receipt When receiving from a queue, you have multiple actions you can take on the messages you receive. Where the prior example completes a message, permanently removing it from the queue and marking as complete, this example demonstrates how to defer the message, sending it back to the queue @@ -150,6 +170,7 @@ from azure.servicebus import ServiceBusClient import os connstr = os.environ['SERVICE_BUS_CONN_STR'] +queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] with ServiceBusClient.from_connection_string(connstr) as client: with client.get_queue_receiver(queue_name) as receiver: @@ -225,4 +246,8 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio [topic_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#topics [subscription_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions [azure_namespace_creation]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal -[servicebus_management_repository]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-mgmt-servicebus \ No newline at end of file +[servicebus_management_repository]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-mgmt-servicebus +[get_servicebus_conn_str]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string +[servicebus_aad_authentication]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-authentication-and-authorization +[token_credential_interface]: ../../core/azure-core/azure/core/credentials.py +[pypi_azure_identity]: https://pypi.org/project/azure-identity/ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index c9e4b027130a..926142970537 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -6,10 +6,12 @@ import datetime import uuid +import functools +import logging from typing import Optional, List, Union, Generator import uamqp -from uamqp import types +from uamqp import types, errors from .constants import ( _BATCH_MESSAGE_OVERHEAD_COST, @@ -34,7 +36,8 @@ MESSAGE_DEAD_LETTER, MESSAGE_ABANDON, MESSAGE_DEFER, - MESSAGE_RENEW_LOCK + MESSAGE_RENEW_LOCK, + DEADLETTERNAME ) from ..exceptions import ( MessageAlreadySettled, @@ -44,6 +47,8 @@ ) from .utils import utc_from_timestamp, utc_now +_LOGGER = logging.getLogger(__name__) + class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes """A Service Bus Message. @@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage): :dedent: 4 :caption: Checking the properties on a received message. """ - def __init__(self, message, mode=ReceiveSettleMode.PeekLock): + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): super(ReceivedMessage, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) + self._is_deferred_message = kwargs.get("is_deferred_message", False) self.auto_renew_error = None def _is_live(self, action): @@ -458,6 +464,69 @@ def _is_live(self, action): except AttributeError: pass + def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + try: + if not self._is_deferred_message: + try: + self._settle_via_receiver_link(settle_operation, dead_letter_details)() + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) + + def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None): + # pylint: disable=protected-access + if settle_operation == MESSAGE_COMPLETE: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_COMPLETE, + [self.lock_token], + ) + if settle_operation == MESSAGE_ABANDON: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_ABANDON, + [self.lock_token], + ) + if settle_operation == MESSAGE_DEAD_LETTER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEADLETTER, + [self.lock_token], + dead_letter_details=dead_letter_details + ) + if settle_operation == MESSAGE_DEFER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEFER, + [self.lock_token], + ) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + + def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None): + if settle_operation == MESSAGE_COMPLETE: + return functools.partial(self.message.accept) + if settle_operation == MESSAGE_ABANDON: + return functools.partial(self.message.modify, True, False) + if settle_operation == MESSAGE_DEAD_LETTER: + # note: message.reject() can not set reason and description properly due to the issue + # https://github.com/Azure/azure-uamqp-python/issues/155 + return functools.partial(self.message.reject, condition=DEADLETTERNAME) + if settle_operation == MESSAGE_DEFER: + return functools.partial(self.message.modify, True, True) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + @property def settled(self): # type: () -> bool @@ -535,11 +604,9 @@ def complete(self): :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ + # pylint: disable=protected-access self._is_live(MESSAGE_COMPLETE) - try: - self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_COMPLETE, e) + self._settle_message(MESSAGE_COMPLETE) self._settled = True def dead_letter(self, reason=None, description=None): @@ -560,17 +627,12 @@ def dead_letter(self, reason=None, description=None): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEAD_LETTER) + details = { MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "", MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""} - try: - self._receiver._settle_message( - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=details - ) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e) + + self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details) self._settled = True def abandon(self): @@ -585,11 +647,9 @@ def abandon(self): :raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired. :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ + # pylint: disable=protected-access self._is_live(MESSAGE_ABANDON) - try: - self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_ABANDON, e) + self._settle_message(MESSAGE_ABANDON) self._settled = True def defer(self): @@ -606,10 +666,7 @@ def defer(self): :raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails. """ self._is_live(MESSAGE_DEFER) - try: - self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEFER, e) + self._settle_message(MESSAGE_DEFER) self._settled = True def renew_lock(self): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py index b42d2cd955cb..48ebdfe156ec 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py @@ -69,7 +69,7 @@ def deferred_message_op( parsed = [] for m in message.get_data()[b'messages']: wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message'])) - parsed.append(message_type(wrapped, mode)) + parsed.append(message_type(wrapped, mode, is_deferred_message=True)) return parsed if status_code in [202, 204]: return [] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index 891188f1e9a3..9182b9457802 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -3,18 +3,13 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- +import logging from typing import Optional from .._common import message as sync_message from .._common.constants import ( - SETTLEMENT_ABANDON, - SETTLEMENT_COMPLETE, - SETTLEMENT_DEFER, - SETTLEMENT_DEADLETTER, ReceiveSettleMode, MGMT_RESPONSE_MESSAGE_EXPIRATION, - MGMT_REQUEST_DEAD_LETTER_REASON, - MGMT_REQUEST_DEAD_LETTER_DESCRIPTION, MESSAGE_COMPLETE, MESSAGE_DEAD_LETTER, MESSAGE_ABANDON, @@ -24,15 +19,41 @@ from .._common.utils import get_running_loop, utc_from_timestamp from ..exceptions import MessageSettleFailed +_LOGGER = logging.getLogger(__name__) + class ReceivedMessage(sync_message.ReceivedMessage): """A Service Bus Message received from service side. """ - def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None): + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs): self._loop = loop or get_running_loop() - super(ReceivedMessage, self).__init__(message=message, mode=mode) + super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs) + + async def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + try: + if not self._is_deferred_message: + try: + await self._loop.run_in_executor( + None, + self._settle_via_receiver_link(settle_operation, dead_letter_details) + ) + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + await self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) async def complete(self): # type: () -> None @@ -48,10 +69,7 @@ async def complete(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_COMPLETE) - try: - await self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_COMPLETE, e) + await self._settle_message(MESSAGE_COMPLETE) self._settled = True async def dead_letter(self, reason=None, description=None): @@ -71,17 +89,7 @@ async def dead_letter(self, reason=None, description=None): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEAD_LETTER) - details = { - MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "", - MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""} - try: - await self._receiver._settle_message( - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=details - ) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e) + await self._settle_message(MESSAGE_DEAD_LETTER) self._settled = True async def abandon(self): @@ -95,10 +103,7 @@ async def abandon(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_ABANDON) - try: - await self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_ABANDON, e) + await self._settle_message(MESSAGE_ABANDON) self._settled = True async def defer(self): @@ -112,10 +117,7 @@ async def defer(self): """ # pylint: disable=protected-access self._is_live(MESSAGE_DEFER) - try: - await self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) - except Exception as e: - raise MessageSettleFailed(MESSAGE_DEFER, e) + await self._settle_message(MESSAGE_DEFER) self._settled = True async def renew_lock(self): diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index 30caeab27de8..90657b76e374 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -35,7 +35,7 @@ def example_create_servicebus_client_async(): # [START create_sb_client_async] import os from azure.servicebus.aio import ServiceBusClient, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_CONNECTION_STR'] + fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] servicebus_client = ServiceBusClient( @@ -65,7 +65,7 @@ async def example_create_servicebus_sender_async(): # [START create_servicebus_sender_async] import os from azure.servicebus.aio import ServiceBusSender, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_CONNECTION_STR'] + fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] @@ -119,7 +119,7 @@ async def example_create_servicebus_receiver_async(): # [START create_servicebus_receiver_async] import os from azure.servicebus.aio import ServiceBusReceiver, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_CONNECTION_STR'] + fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py new file mode 100644 index 000000000000..e442827b469f --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/session_pool_receive_async.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio +import uuid + +from azure.servicebus.aio import ServiceBusClient, AutoLockRenew +from azure.servicebus import NoActiveSession, Message + + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +# Note: This must be a session-enabled queue. +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +async def message_processing(servicebus_client, queue_name): + while True: + try: + async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver: + renewer = AutoLockRenew() + renewer.register(receiver.session, timeout=None) + await receiver.session.set_session_state("OPEN") + async for message in receiver: + print("Message: {}".format(message)) + print("Time to live: {}".format(message.header.time_to_live)) + print("Sequence number: {}".format(message.sequence_number)) + print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) + print("Partition ID: {}".format(message.partition_id)) + print("Partition Key: {}".format(message.partition_key)) + print("Locked until: {}".format(message.locked_until_utc)) + print("Lock Token: {}".format(message.lock_token)) + print("Enqueued time: {}".format(message.enqueued_time_utc)) + await message.complete() + if str(message) == 'shutdown': + await receiver.session.set_session_state("CLOSED") + break + renewer.shutdown() + except NoActiveSession: + print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.") + return + + +async def sample_session_send_receive_with_pool_async(connection_string, queue_name): + + concurrent_receivers = 5 + sessions = [str(uuid.uuid4()) for i in range(concurrent_receivers)] + client = ServiceBusClient.from_connection_string(connection_string) + + for session_id in sessions: + async with client.get_queue_sender(queue_name) as sender: + await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)]) + await sender.send(Message("shutdown", session_id=session_id)) + + receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)] + await asyncio.gather(*receive_sessions) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(sample_session_send_receive_with_pool_async(CONNECTION_STR, QUEUE_NAME)) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index adb73303ddad..55d689e4bf5a 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -32,7 +32,7 @@ def example_create_servicebus_client_sync(): # [START create_sb_client_sync] import os from azure.servicebus import ServiceBusClient, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_CONNECTION_STR'] + fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] servicebus_client = ServiceBusClient( @@ -62,7 +62,7 @@ def example_create_servicebus_sender_sync(): # [START create_servicebus_sender_sync] import os from azure.servicebus import ServiceBusSender, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_CONNECTION_STR'] + fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] @@ -116,7 +116,7 @@ def example_create_servicebus_receiver_sync(): # [START create_servicebus_receiver_sync] import os from azure.servicebus import ServiceBusReceiver, ServiceBusSharedKeyCredential - fully_qualified_namespace = os.environ['SERVICE_BUS_CONNECTION_STR'] + fully_qualified_namespace = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE'] shared_access_policy = os.environ['SERVICE_BUS_SAS_POLICY'] shared_access_key = os.environ['SERVICE_BUS_SAS_KEY'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py new file mode 100644 index 000000000000..12c03b683f76 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/session_pool_receive.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import uuid +import concurrent + +from azure.servicebus import ServiceBusClient, Message, AutoLockRenew +from azure.servicebus import NoActiveSession + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +# Note: This must be a session-enabled queue. +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + +def message_processing(sb_client, queue_name, messages): + while True: + try: + with sb_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver: + renewer = AutoLockRenew() + renewer.register(receiver.session, timeout=None) + receiver.session.set_session_state("OPEN") + for message in receiver: + messages.append(message) + print("Message: {}".format(message)) + print("Time to live: {}".format(message.header.time_to_live)) + print("Sequence number: {}".format(message.sequence_number)) + print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number)) + print("Partition ID: {}".format(message.partition_id)) + print("Partition Key: {}".format(message.partition_key)) + print("Locked until: {}".format(message.locked_until_utc)) + print("Lock Token: {}".format(message.lock_token)) + print("Enqueued time: {}".format(message.enqueued_time_utc)) + message.complete() + if str(message) == 'shutdown': + receiver.session.set_session_state("CLOSED") + renewer.shutdown() + except NoActiveSession: + print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.") + return + + +def sample_session_send_receive_with_pool(connection_string, queue_name): + + concurrent_receivers = 5 + sessions = [str(uuid.uuid4()) for i in range(2 * concurrent_receivers)] + with ServiceBusClient.from_connection_string(connection_string) as client: + + with client.get_queue_sender(queue_name) as sender: + for session_id in sessions: + for i in range(20): + message = Message("Sample message no. {}".format(i), session_id=session_id) + sender.send(message) + + all_messages = [] + futures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_receivers) as thread_pool: + for _ in range(concurrent_receivers): + futures.append(thread_pool.submit(message_processing, client, queue_name, all_messages)) + concurrent.futures.wait(futures) + + print("Received total {} messages across {} sessions.".format(len(all_messages), len(sessions))) + + +if __name__ == '__main__': + sample_session_send_receive_with_pool(CONNECTION_STR, QUEUE_NAME) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index a4f8e5a3269a..df28865e22c4 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1076,3 +1076,23 @@ def test_queue_message_http_proxy_setting(self): receiver = sb_client.get_queue_receiver(queue_name="mock") assert receiver._config.http_proxy == http_proxy assert receiver._config.transport_type == TransportType.AmqpOverWebsocket + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False) as sb_client: + + async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + message = Message("Test") + await sender.send(message) + + async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + messages = await receiver.receive(max_wait_time=5) + await receiver._handler.message_handler.destroy_async() # destroy the underlying receiver link + assert len(messages) == 1 + await messages[0].complete() diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 95a6b61f11c5..125c2d662b88 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1202,3 +1202,23 @@ def test_queue_message_http_proxy_setting(self): receiver = sb_client.get_queue_receiver(queue_name="mock") assert receiver._config.http_proxy == http_proxy assert receiver._config.transport_type == TransportType.AmqpOverWebsocket + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, + logging_enable=False) as sb_client: + + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + message = Message("Test") + sender.send(message) + + with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + messages = receiver.receive(max_wait_time=5) + receiver._handler.message_handler.destroy() # destroy the underlying receiver link + assert len(messages) == 1 + messages[0].complete() diff --git a/sdk/textanalytics/azure-ai-textanalytics/CHANGELOG.md b/sdk/textanalytics/azure-ai-textanalytics/CHANGELOG.md index fc8ec1aa36e8..f8c48186f667 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/CHANGELOG.md +++ b/sdk/textanalytics/azure-ai-textanalytics/CHANGELOG.md @@ -5,6 +5,9 @@ **New features** - Added `text` property to `SentenceSentiment` +**Breaking changes** +- `score` attribute of `DetectedLanguage` has been renamed to `confidence_score` + ## 1.0.0b4 (2020-04-07) **Breaking changes** diff --git a/sdk/textanalytics/azure-ai-textanalytics/README.md b/sdk/textanalytics/azure-ai-textanalytics/README.md index 499cef748ce6..b39795504fb3 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/README.md +++ b/sdk/textanalytics/azure-ai-textanalytics/README.md @@ -343,7 +343,7 @@ result = [doc for doc in response if not doc.is_error] for doc in result: print("Language detected: {}".format(doc.primary_language.name)) print("ISO6391 name: {}".format(doc.primary_language.iso6391_name)) - print("Confidence score: {}\n".format(doc.primary_language.score)) + print("Confidence score: {}\n".format(doc.primary_language.confidence_score)) ``` The returned response is a heterogeneous list of result and error objects: list[[DetectLanguageResult][detect_language_result], [DocumentError][document_error]] diff --git a/sdk/textanalytics/azure-ai-textanalytics/azure/ai/textanalytics/_models.py b/sdk/textanalytics/azure-ai-textanalytics/azure/ai/textanalytics/_models.py index 11dfc1b6deed..2560d01d3bf1 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/azure/ai/textanalytics/_models.py +++ b/sdk/textanalytics/azure-ai-textanalytics/azure/ai/textanalytics/_models.py @@ -86,7 +86,7 @@ def _from_generated(cls, language): ) def __repr__(self): - return "DetectedLanguage(name={}, iso6391_name={}, score={})" \ + return "DetectedLanguage(name={}, iso6391_name={}, confidence_score={})" \ .format(self.name, self.iso6391_name, self.confidence_score)[:1024] diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py index b70ba4461f5a..c7bbc7d10523 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_alternative_document_input_async.py @@ -44,14 +44,14 @@ async def alternative_document_input(self): "text": "L'hôtel n'était pas très confortable. L'éclairage était trop sombre."} ] async with text_analytics_client: - result = await text_analytics_client.analyze_sentiment(documents) + result = await text_analytics_client.detect_language(documents) for idx, doc in enumerate(result): if not doc.is_error: print("Document text: {}".format(documents[idx])) print("Language detected: {}".format(doc.primary_language.name)) print("ISO6391 name: {}".format(doc.primary_language.iso6391_name)) - print("Confidence score: {}\n".format(doc.primary_language.score)) + print("Confidence score: {}\n".format(doc.primary_language.confidence_score)) if doc.is_error: print(doc.id, doc.error) diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py index fd924674df98..f0ac344c75de 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py @@ -52,7 +52,7 @@ async def authentication_with_api_key_credential_async(self): result = await text_analytics_client.detect_language(doc) print("Language detected: {}".format(result[0].primary_language.name)) - print("Confidence score: {}".format(result[0].primary_language.score)) + print("Confidence score: {}".format(result[0].primary_language.confidence_score)) async def authentication_with_azure_active_directory_async(self): """DefaultAzureCredential will use the values from these environment @@ -74,7 +74,7 @@ async def authentication_with_azure_active_directory_async(self): result = await text_analytics_client.detect_language(doc) print("Language detected: {}".format(result[0].primary_language.name)) - print("Confidence score: {}".format(result[0].primary_language.score)) + print("Confidence score: {}".format(result[0].primary_language.confidence_score)) async def main(): diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_detect_language_async.py b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_detect_language_async.py index 979080570941..8ccf20c9c89f 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_detect_language_async.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_detect_language_async.py @@ -50,7 +50,7 @@ async def detect_language_async(self): print("Document text: {}".format(documents[idx])) print("Language detected: {}".format(doc.primary_language.name)) print("ISO6391 name: {}".format(doc.primary_language.iso6391_name)) - print("Confidence score: {}\n".format(doc.primary_language.score)) + print("Confidence score: {}\n".format(doc.primary_language.confidence_score)) if doc.is_error: print(doc.id, doc.error) # [END batch_detect_language_async] diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py index 581d08d48c45..81ebc6d70bba 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_alternative_document_input.py @@ -44,14 +44,14 @@ def alternative_document_input(self): "text": "L'hôtel n'était pas très confortable. L'éclairage était trop sombre."} ] - result = text_analytics_client.analyze_sentiment(documents) + result = text_analytics_client.detect_language(documents) for idx, doc in enumerate(result): if not doc.is_error: print("Document text: {}".format(documents[idx])) print("Language detected: {}".format(doc.primary_language.name)) print("ISO6391 name: {}".format(doc.primary_language.iso6391_name)) - print("Confidence score: {}\n".format(doc.primary_language.score)) + print("Confidence score: {}\n".format(doc.primary_language.confidence_score)) if doc.is_error: print(doc.id, doc.error) diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_authentication.py b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_authentication.py index 06dc30aed696..0348b40ee163 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_authentication.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_authentication.py @@ -50,7 +50,7 @@ def authentication_with_api_key_credential(self): result = text_analytics_client.detect_language(doc) print("Language detected: {}".format(result[0].primary_language.name)) - print("Confidence score: {}".format(result[0].primary_language.score)) + print("Confidence score: {}".format(result[0].primary_language.confidence_score)) def authentication_with_azure_active_directory(self): """DefaultAzureCredential will use the values from these environment @@ -71,7 +71,7 @@ def authentication_with_azure_active_directory(self): result = text_analytics_client.detect_language(doc) print("Language detected: {}".format(result[0].primary_language.name)) - print("Confidence score: {}".format(result[0].primary_language.score)) + print("Confidence score: {}".format(result[0].primary_language.confidence_score)) if __name__ == '__main__': diff --git a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_detect_language.py b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_detect_language.py index a1aee02c5dcf..f7ea2e00ea93 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/samples/sample_detect_language.py +++ b/sdk/textanalytics/azure-ai-textanalytics/samples/sample_detect_language.py @@ -49,7 +49,7 @@ def detect_language(self): print("Document text: {}".format(documents[idx])) print("Language detected: {}".format(doc.primary_language.name)) print("ISO6391 name: {}".format(doc.primary_language.iso6391_name)) - print("Confidence score: {}\n".format(doc.primary_language.score)) + print("Confidence score: {}\n".format(doc.primary_language.confidence_score)) if doc.is_error: print(doc.id, doc.error) # [END batch_detect_language] diff --git a/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language.py b/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language.py index 38deedaf76e4..cb8dd8320714 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language.py +++ b/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language.py @@ -53,7 +53,7 @@ def test_all_successful_passing_dict(self, client): for doc in response: self.assertIsNotNone(doc.id) self.assertIsNotNone(doc.statistics) - self.assertIsNotNone(doc.primary_language.score) + self.assertIsNotNone(doc.primary_language.confidence_score) @GlobalTextAnalyticsAccountPreparer() @TextAnalyticsClientPreparer() @@ -77,7 +77,7 @@ def test_all_successful_passing_text_document_input(self, client): self.assertEqual(response[3].primary_language.iso6391_name, "de") for doc in response: - self.assertIsNotNone(doc.primary_language.score) + self.assertIsNotNone(doc.primary_language.confidence_score) @GlobalTextAnalyticsAccountPreparer() @TextAnalyticsClientPreparer() diff --git a/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language_async.py b/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language_async.py index 8fc0946a8d6c..fce974e63502 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language_async.py +++ b/sdk/textanalytics/azure-ai-textanalytics/tests/test_detect_language_async.py @@ -66,7 +66,7 @@ async def test_all_successful_passing_dict(self, client): for doc in response: self.assertIsNotNone(doc.id) self.assertIsNotNone(doc.statistics) - self.assertIsNotNone(doc.primary_language.score) + self.assertIsNotNone(doc.primary_language.confidence_score) @GlobalTextAnalyticsAccountPreparer() @TextAnalyticsClientPreparer() @@ -90,7 +90,7 @@ async def test_all_successful_passing_text_document_input(self, client): self.assertEqual(response[3].primary_language.iso6391_name, "de") for doc in response: - self.assertIsNotNone(doc.primary_language.score) + self.assertIsNotNone(doc.primary_language.confidence_score) @GlobalTextAnalyticsAccountPreparer() @TextAnalyticsClientPreparer() diff --git a/sdk/textanalytics/azure-ai-textanalytics/tests/test_text_analytics.py b/sdk/textanalytics/azure-ai-textanalytics/tests/test_text_analytics.py index f51b9acf6a54..db854b7795fe 100644 --- a/sdk/textanalytics/azure-ai-textanalytics/tests/test_text_analytics.py +++ b/sdk/textanalytics/azure-ai-textanalytics/tests/test_text_analytics.py @@ -27,7 +27,7 @@ def test_detect_language(self, resource_group, location, text_analytics_account, self.assertEqual(response[0].primary_language.name, "English") def test_repr(self): - detected_language = _models.DetectedLanguage(name="English", iso6391_name="en", score=1.0) + detected_language = _models.DetectedLanguage(name="English", iso6391_name="en", confidence_score=1.0) categorized_entity = _models.CategorizedEntity(text="Bill Gates", category="Person", subcategory="Age", grapheme_offset=0, grapheme_length=8, confidence_score=0.899) @@ -109,7 +109,7 @@ def test_repr(self): transaction_count=4 ) - self.assertEqual("DetectedLanguage(name=English, iso6391_name=en, score=1.0)", repr(detected_language)) + self.assertEqual("DetectedLanguage(name=English, iso6391_name=en, confidence_score=1.0)", repr(detected_language)) self.assertEqual("CategorizedEntity(text=Bill Gates, category=Person, subcategory=Age, grapheme_offset=0, " "grapheme_length=8, confidence_score=0.899)", repr(categorized_entity)) @@ -120,7 +120,7 @@ def test_repr(self): "statistics=TextDocumentStatistics(grapheme_count=14, transaction_count=18), " "is_error=False)", repr(recognize_entities_result)) self.assertEqual("DetectLanguageResult(id=1, primary_language=DetectedLanguage(name=English, " - "iso6391_name=en, score=1.0), statistics=TextDocumentStatistics(grapheme_count=14, " + "iso6391_name=en, confidence_score=1.0), statistics=TextDocumentStatistics(grapheme_count=14, " "transaction_count=18), is_error=False)", repr(detect_language_result)) self.assertEqual("TextAnalyticsError(code=invalidRequest, message=The request is invalid, target=request)", repr(text_analytics_error))