Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into fix_test_errors_dl_recognize_entities

* 'master' of https://github.com/Azure/azure-sdk-for-python:
  [ServiceBus] Settle non-deferred message through receiver link (Azure#10800)
  Add sync/async samples to demonstrate consuming from a number of sessions at one time. (Azure#11001)
  fixed alternative document input samples (Azure#11078)
  Fix pip link in azure-keyvault-secrets readme (Azure#11056)
  [ServiceBus] Update for readme and sample (Azure#11047)
  • Loading branch information
iscai-msft committed Apr 27, 2020
2 parents a9a9a63 + 91d96a8 commit 637bc8e
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 78 deletions.
1 change: 1 addition & 0 deletions sdk/keyvault/azure-keyvault-secrets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ additional questions or comments.
[recover_purge_sample]: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/keyvault/azure-keyvault-secrets/samples/recover_purge_operations.py
[recover_purge_async_sample]: https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/keyvault/azure-keyvault-secrets/samples/recover_purge_operations_async.py
[keyvault_docs]: https://docs.microsoft.com/en-us/azure/key-vault/
[pip]: https://pypi.org/project/pip/
[pypi_package_secrets]: https://pypi.org/project/azure-keyvault-secrets/
[reference_docs]: https://aka.ms/azsdk-python-keyvault-secrets-ref
[secret_client_src]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-secrets/azure/keyvault/secrets
Expand Down
55 changes: 40 additions & 15 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish/subscribe capabilities, and the ability to easily scale as your needs gr

Use the Service Bus client library for Python to communicate between applications and services and implement asynchronous messaging patterns.

* Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings
* Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings.
* Send and receive messages within your Service Bus channels.
* Utilize message locks, sessions, and dead letter functionality to implement complex messaging patterns.

Expand All @@ -29,7 +29,7 @@ pip install azure-servicebus --pre
To use this package, you must have:
* Azure subscription - [Create a free account][azure_sub]
* Azure Service Bus - [Namespace and management credentials][service_bus_namespace]
* Python 2.7, 3.5, 3.6, 3.7 or 3.8 - [Install Python][python]
* Python 2.7, 3.5 or later - [Install Python][python]


If you need an Azure service bus namespace, you can create it via the [Azure Portal][azure_namespace_creation].
Expand All @@ -43,19 +43,17 @@ az servicebus namespace create --resource-group <resource-group-name> --name <se

Interaction with Service Bus starts with an instance of the `ServiceBusClient` class. You either need a **connection string with SAS key**, or a **namespace** and one of its **account keys** to instantiate the client object.

#### Get credentials
#### Create client from connection string

Use the [Azure CLI][azure_cli] snippet below to populate an environment variable with the service bus connection string (you can also find these values in the [Azure portal][azure_portal]. The snippet is formatted for the Bash shell.
- Get credentials: Use the [Azure CLI][azure_cli] snippet below to populate an environment variable with the service bus connection string (you can also find these values in the [Azure Portal][azure_portal] by following the step-by-step guide to [Get a service bus connection string][get_servicebus_conn_str]). The snippet is formatted for the Bash shell.

```Bash
RES_GROUP=<resource-group-name>
NAMESPACE_NAME=<servicebus-namespace-name>

export SERVICE_BUS_CONN_STR=$(az servicebus namespace authorization-rule keys list --resource-group $RES_GROUP --namespace-name $NAMESPACE_NAME --query RootManageSharedAccessKey --output tsv)
export SERVICE_BUS_CONN_STR=$(az servicebus namespace authorization-rule keys list --resource-group $RES_GROUP --namespace-name $NAMESPACE_NAME --name RootManageSharedAccessKey --query primaryConnectionString --output tsv)
```

#### Create client

Once you've populated the `SERVICE_BUS_CONN_STR` environment variable, you can create the `ServiceBusClient`.

```Python
Expand All @@ -68,6 +66,28 @@ with ServiceBusClient.from_connection_string(connstr) as client:
...
```

#### Create client using the azure-identity library:

```python
import os
from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()

FULLY_QUALIFIED_NAMESPACE = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
with ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential):
...
```

- This constructor takes the fully qualified namespace of your Service Bus instance and a credential that implements the
[TokenCredential][token_credential_interface]
protocol. There are implementations of the `TokenCredential` protocol available in the
[azure-identity package][pypi_azure_identity]. The fully qualified namespace is of the format `<yournamespace.servicebus.windows.net>`.
- When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such as the
Azure Service Bus Data Owner role. For more information about using Azure Active Directory authorization with Service Bus,
please refer to [the associated documentation][servicebus_aad_authentication].

Note: client can be initialized without a context manager, but must be manually closed via client.close() to not leak resources.

## Key concepts
Expand All @@ -88,22 +108,22 @@ To interact with these resources, one should be familiar with the following SDK

* [Sender](./azure/servicebus/_servicebus_sender.py): To send messages to a Queue or Topic, one would use the corresponding `get_queue_sender` or `get_topic_sender` method off of a `ServiceBusClient` instance as seen [here](./samples/sync_samples/send_queue.py).

* [Receiver](./azure/servicebus/_servicebus_receiver.py): To receive messages from a Queue or Subscription, one would use the corrosponding `get_queue_receiver` or `get_subscription_receiver` method off of a `ServiceBusClient` instance as seen [here](./samples/sync_samples/receive_queue.py).
* [Receiver](./azure/servicebus/_servicebus_receiver.py): To receive messages from a Queue or Subscription, one would use the corresponding `get_queue_receiver` or `get_subscription_receiver` method off of a `ServiceBusClient` instance as seen [here](./samples/sync_samples/receive_queue.py).

* [Message](./azure/servicebus/_common/message.py): When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload and control how the message is "settled" (completed, dead-lettered, etc); these functions are only available on a received message.

## Examples

The following sections provide several code snippets covering some of the most common Service Bus tasks, including:

* [Send a message to a queue](#send-to-a-queue)
* [Receive a message from a queue](#receive-from-a-queue)
* [Defer a message on receipt](#defer-a-message)
* [Send a message to a queue](#send-a-message-to-a-queue)
* [Receive a message from a queue](#receive-a-message-from-a-queue)
* [Defer a message on receipt](#defer-a-message-on-receipt)

To perform management tasks such as creating and deleting queues/topics/subscriptions, please utilize the azure-mgmt-servicebus library, available [here][servicebus_management_repository].


### Send to a queue
### Send a message to a queue

This example sends a message to a queue that is assumed to already exist, created via the Azure portal or az commands.

Expand All @@ -121,7 +141,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
sender.send(message)
```

### Receive from a queue
### Receive a message from a queue

To receive from a queue, you can either perform a one-off receive via "receiver.receive()" or receive persistently as follows:

Expand All @@ -139,7 +159,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
msg.complete()
```

### Defer a message
### Defer a message on receipt

When receiving from a queue, you have multiple actions you can take on the messages you receive. Where the prior example completes a message,
permanently removing it from the queue and marking as complete, this example demonstrates how to defer the message, sending it back to the queue
Expand All @@ -150,6 +170,7 @@ from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
Expand Down Expand Up @@ -225,4 +246,8 @@ contact [[email protected]](mailto:[email protected]) with any additio
[topic_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#topics
[subscription_concept]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions#topics-and-subscriptions
[azure_namespace_creation]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
[servicebus_management_repository]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-mgmt-servicebus
[servicebus_management_repository]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-mgmt-servicebus
[get_servicebus_conn_str]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string
[servicebus_aad_authentication]: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-authentication-and-authorization
[token_credential_interface]: ../../core/azure-core/azure/core/credentials.py
[pypi_azure_identity]: https://pypi.org/project/azure-identity/
103 changes: 80 additions & 23 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import datetime
import uuid
import functools
import logging
from typing import Optional, List, Union, Generator

import uamqp
from uamqp import types
from uamqp import types, errors

from .constants import (
_BATCH_MESSAGE_OVERHEAD_COST,
Expand All @@ -34,7 +36,8 @@
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
MESSAGE_DEFER,
MESSAGE_RENEW_LOCK
MESSAGE_RENEW_LOCK,
DEADLETTERNAME
)
from ..exceptions import (
MessageAlreadySettled,
Expand All @@ -44,6 +47,8 @@
)
from .utils import utc_from_timestamp, utc_now

_LOGGER = logging.getLogger(__name__)


class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes
"""A Service Bus Message.
Expand Down Expand Up @@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage):
:dedent: 4
:caption: Checking the properties on a received message.
"""
def __init__(self, message, mode=ReceiveSettleMode.PeekLock):
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
super(ReceivedMessage, self).__init__(message=message)
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None

def _is_live(self, action):
Expand All @@ -458,6 +464,69 @@ def _is_live(self, action):
except AttributeError:
pass

def _settle_message(
self,
settle_operation,
dead_letter_details=None
):
try:
if not self._is_deferred_message:
try:
self._settle_via_receiver_link(settle_operation, dead_letter_details)()
return
except RuntimeError as exception:
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
settle_operation,
exception
)
self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
# pylint: disable=protected-access
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_COMPLETE,
[self.lock_token],
)
if settle_operation == MESSAGE_ABANDON:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_ABANDON,
[self.lock_token],
)
if settle_operation == MESSAGE_DEAD_LETTER:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=dead_letter_details
)
if settle_operation == MESSAGE_DEFER:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_DEFER,
[self.lock_token],
)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None):
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(self.message.accept)
if settle_operation == MESSAGE_ABANDON:
return functools.partial(self.message.modify, True, False)
if settle_operation == MESSAGE_DEAD_LETTER:
# note: message.reject() can not set reason and description properly due to the issue
# https://github.com/Azure/azure-uamqp-python/issues/155
return functools.partial(self.message.reject, condition=DEADLETTERNAME)
if settle_operation == MESSAGE_DEFER:
return functools.partial(self.message.modify, True, True)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

@property
def settled(self):
# type: () -> bool
Expand Down Expand Up @@ -535,11 +604,9 @@ def complete(self):
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
try:
self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
self._settle_message(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, description=None):
Expand All @@ -560,17 +627,12 @@ def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)

details = {
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
try:
self._receiver._settle_message(
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=details
)
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)

self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details)
self._settled = True

def abandon(self):
Expand All @@ -585,11 +647,9 @@ def abandon(self):
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
try:
self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_ABANDON, e)
self._settle_message(MESSAGE_ABANDON)
self._settled = True

def defer(self):
Expand All @@ -606,10 +666,7 @@ def defer(self):
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
self._is_live(MESSAGE_DEFER)
try:
self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEFER, e)
self._settle_message(MESSAGE_DEFER)
self._settled = True

def renew_lock(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def deferred_message_op(
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(message_type(wrapped, mode))
parsed.append(message_type(wrapped, mode, is_deferred_message=True))
return parsed
if status_code in [202, 204]:
return []
Expand Down
Loading

0 comments on commit 637bc8e

Please sign in to comment.