Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into feature/text_analytics_v3.0

* 'master' of https://github.com/Azure/azure-sdk-for-python:
  [ServiceBus] Settle non-deferred message through receiver link (#10800)
  Add sync/async samples to demonstrate consuming from a number of sessions at one time. (#11001)
  fixed alternative document input samples (#11078)
  • Loading branch information
iscai-msft committed Apr 27, 2020
2 parents 343f256 + 91d96a8 commit 136277c
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 57 deletions.
103 changes: 80 additions & 23 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import datetime
import uuid
import functools
import logging
from typing import Optional, List, Union, Generator

import uamqp
from uamqp import types
from uamqp import types, errors

from .constants import (
_BATCH_MESSAGE_OVERHEAD_COST,
Expand All @@ -34,7 +36,8 @@
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
MESSAGE_DEFER,
MESSAGE_RENEW_LOCK
MESSAGE_RENEW_LOCK,
DEADLETTERNAME
)
from ..exceptions import (
MessageAlreadySettled,
Expand All @@ -44,6 +47,8 @@
)
from .utils import utc_from_timestamp, utc_now

_LOGGER = logging.getLogger(__name__)


class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes
"""A Service Bus Message.
Expand Down Expand Up @@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage):
:dedent: 4
:caption: Checking the properties on a received message.
"""
def __init__(self, message, mode=ReceiveSettleMode.PeekLock):
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
super(ReceivedMessage, self).__init__(message=message)
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None

def _is_live(self, action):
Expand All @@ -458,6 +464,69 @@ def _is_live(self, action):
except AttributeError:
pass

def _settle_message(
self,
settle_operation,
dead_letter_details=None
):
try:
if not self._is_deferred_message:
try:
self._settle_via_receiver_link(settle_operation, dead_letter_details)()
return
except RuntimeError as exception:
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
settle_operation,
exception
)
self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
# pylint: disable=protected-access
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_COMPLETE,
[self.lock_token],
)
if settle_operation == MESSAGE_ABANDON:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_ABANDON,
[self.lock_token],
)
if settle_operation == MESSAGE_DEAD_LETTER:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=dead_letter_details
)
if settle_operation == MESSAGE_DEFER:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_DEFER,
[self.lock_token],
)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None):
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(self.message.accept)
if settle_operation == MESSAGE_ABANDON:
return functools.partial(self.message.modify, True, False)
if settle_operation == MESSAGE_DEAD_LETTER:
# note: message.reject() can not set reason and description properly due to the issue
# https://github.com/Azure/azure-uamqp-python/issues/155
return functools.partial(self.message.reject, condition=DEADLETTERNAME)
if settle_operation == MESSAGE_DEFER:
return functools.partial(self.message.modify, True, True)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

@property
def settled(self):
# type: () -> bool
Expand Down Expand Up @@ -535,11 +604,9 @@ def complete(self):
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
try:
self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
self._settle_message(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, description=None):
Expand All @@ -560,17 +627,12 @@ def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)

details = {
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
try:
self._receiver._settle_message(
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=details
)
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)

self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details)
self._settled = True

def abandon(self):
Expand All @@ -585,11 +647,9 @@ def abandon(self):
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
try:
self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_ABANDON, e)
self._settle_message(MESSAGE_ABANDON)
self._settled = True

def defer(self):
Expand All @@ -606,10 +666,7 @@ def defer(self):
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
self._is_live(MESSAGE_DEFER)
try:
self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEFER, e)
self._settle_message(MESSAGE_DEFER)
self._settled = True

def renew_lock(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def deferred_message_op(
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(message_type(wrapped, mode))
parsed.append(message_type(wrapped, mode, is_deferred_message=True))
return parsed
if status_code in [202, 204]:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import logging
from typing import Optional

from .._common import message as sync_message
from .._common.constants import (
SETTLEMENT_ABANDON,
SETTLEMENT_COMPLETE,
SETTLEMENT_DEFER,
SETTLEMENT_DEADLETTER,
ReceiveSettleMode,
MGMT_RESPONSE_MESSAGE_EXPIRATION,
MGMT_REQUEST_DEAD_LETTER_REASON,
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
MESSAGE_COMPLETE,
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
Expand All @@ -24,15 +19,41 @@
from .._common.utils import get_running_loop, utc_from_timestamp
from ..exceptions import MessageSettleFailed

_LOGGER = logging.getLogger(__name__)


class ReceivedMessage(sync_message.ReceivedMessage):
"""A Service Bus Message received from service side.
"""

def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None):
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs):
self._loop = loop or get_running_loop()
super(ReceivedMessage, self).__init__(message=message, mode=mode)
super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs)

async def _settle_message(
self,
settle_operation,
dead_letter_details=None
):
try:
if not self._is_deferred_message:
try:
await self._loop.run_in_executor(
None,
self._settle_via_receiver_link(settle_operation, dead_letter_details)
)
return
except RuntimeError as exception:
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
settle_operation,
exception
)
await self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

async def complete(self):
# type: () -> None
Expand All @@ -48,10 +69,7 @@ async def complete(self):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
try:
await self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token])
except Exception as e:
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
await self._settle_message(MESSAGE_COMPLETE)
self._settled = True

async def dead_letter(self, reason=None, description=None):
Expand All @@ -71,17 +89,7 @@ async def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
details = {
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
try:
await self._receiver._settle_message(
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=details
)
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)
await self._settle_message(MESSAGE_DEAD_LETTER)
self._settled = True

async def abandon(self):
Expand All @@ -95,10 +103,7 @@ async def abandon(self):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
try:
await self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token])
except Exception as e:
raise MessageSettleFailed(MESSAGE_ABANDON, e)
await self._settle_message(MESSAGE_ABANDON)
self._settled = True

async def defer(self):
Expand All @@ -112,10 +117,7 @@ async def defer(self):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEFER)
try:
await self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token])
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEFER, e)
await self._settle_message(MESSAGE_DEFER)
self._settled = True

async def renew_lock(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio
import uuid

from azure.servicebus.aio import ServiceBusClient, AutoLockRenew
from azure.servicebus import NoActiveSession, Message


CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

async def message_processing(servicebus_client, queue_name):
while True:
try:
async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
await receiver.session.set_session_state("OPEN")
async for message in receiver:
print("Message: {}".format(message))
print("Time to live: {}".format(message.header.time_to_live))
print("Sequence number: {}".format(message.sequence_number))
print("Enqueue Sequence numger: {}".format(message.enqueue_sequence_number))
print("Partition ID: {}".format(message.partition_id))
print("Partition Key: {}".format(message.partition_key))
print("Locked until: {}".format(message.locked_until_utc))
print("Lock Token: {}".format(message.lock_token))
print("Enqueued time: {}".format(message.enqueued_time_utc))
await message.complete()
if str(message) == 'shutdown':
await receiver.session.set_session_state("CLOSED")
break
renewer.shutdown()
except NoActiveSession:
print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.")
return


async def sample_session_send_receive_with_pool_async(connection_string, queue_name):

concurrent_receivers = 5
sessions = [str(uuid.uuid4()) for i in range(concurrent_receivers)]
client = ServiceBusClient.from_connection_string(connection_string)

for session_id in sessions:
async with client.get_queue_sender(queue_name) as sender:
await asyncio.gather(*[sender.send(Message("Sample message no. {}".format(i), session_id=session_id)) for i in range(20)])
await sender.send(Message("shutdown", session_id=session_id))

receive_sessions = [message_processing(client, queue_name) for _ in range(concurrent_receivers)]
await asyncio.gather(*receive_sessions)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(sample_session_send_receive_with_pool_async(CONNECTION_STR, QUEUE_NAME))
Loading

0 comments on commit 136277c

Please sign in to comment.