Skip to content

Commit

Permalink
[ServiceBus] Settle non-deferred message through receiver link (#10800)
Browse files Browse the repository at this point in the history
* settle non-deferred message through receiver link except dead_letter

* revert dead-letter back to t1 as well

* improve settlement and put is_deferred_letter into kwargs

* add test

* update according to comment

* fix a bug in dead_letter through receiver_link
  • Loading branch information
yunhaoling authored Apr 27, 2020
1 parent ce8cfad commit 91d96a8
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 55 deletions.
103 changes: 80 additions & 23 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import datetime
import uuid
import functools
import logging
from typing import Optional, List, Union, Generator

import uamqp
from uamqp import types
from uamqp import types, errors

from .constants import (
_BATCH_MESSAGE_OVERHEAD_COST,
Expand All @@ -34,7 +36,8 @@
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
MESSAGE_DEFER,
MESSAGE_RENEW_LOCK
MESSAGE_RENEW_LOCK,
DEADLETTERNAME
)
from ..exceptions import (
MessageAlreadySettled,
Expand All @@ -44,6 +47,8 @@
)
from .utils import utc_from_timestamp, utc_now

_LOGGER = logging.getLogger(__name__)


class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes
"""A Service Bus Message.
Expand Down Expand Up @@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage):
:dedent: 4
:caption: Checking the properties on a received message.
"""
def __init__(self, message, mode=ReceiveSettleMode.PeekLock):
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
super(ReceivedMessage, self).__init__(message=message)
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None

def _is_live(self, action):
Expand All @@ -458,6 +464,69 @@ def _is_live(self, action):
except AttributeError:
pass

def _settle_message(
self,
settle_operation,
dead_letter_details=None
):
try:
if not self._is_deferred_message:
try:
self._settle_via_receiver_link(settle_operation, dead_letter_details)()
return
except RuntimeError as exception:
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
settle_operation,
exception
)
self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
# pylint: disable=protected-access
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_COMPLETE,
[self.lock_token],
)
if settle_operation == MESSAGE_ABANDON:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_ABANDON,
[self.lock_token],
)
if settle_operation == MESSAGE_DEAD_LETTER:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=dead_letter_details
)
if settle_operation == MESSAGE_DEFER:
return functools.partial(
self._receiver._settle_message,
SETTLEMENT_DEFER,
[self.lock_token],
)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None):
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(self.message.accept)
if settle_operation == MESSAGE_ABANDON:
return functools.partial(self.message.modify, True, False)
if settle_operation == MESSAGE_DEAD_LETTER:
# note: message.reject() can not set reason and description properly due to the issue
# https://github.com/Azure/azure-uamqp-python/issues/155
return functools.partial(self.message.reject, condition=DEADLETTERNAME)
if settle_operation == MESSAGE_DEFER:
return functools.partial(self.message.modify, True, True)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

@property
def settled(self):
# type: () -> bool
Expand Down Expand Up @@ -535,11 +604,9 @@ def complete(self):
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
try:
self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
self._settle_message(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, description=None):
Expand All @@ -560,17 +627,12 @@ def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)

details = {
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
try:
self._receiver._settle_message(
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=details
)
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)

self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details)
self._settled = True

def abandon(self):
Expand All @@ -585,11 +647,9 @@ def abandon(self):
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
try:
self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_ABANDON, e)
self._settle_message(MESSAGE_ABANDON)
self._settled = True

def defer(self):
Expand All @@ -606,10 +666,7 @@ def defer(self):
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
"""
self._is_live(MESSAGE_DEFER)
try:
self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEFER, e)
self._settle_message(MESSAGE_DEFER)
self._settled = True

def renew_lock(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def deferred_message_op(
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(message_type(wrapped, mode))
parsed.append(message_type(wrapped, mode, is_deferred_message=True))
return parsed
if status_code in [202, 204]:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import logging
from typing import Optional

from .._common import message as sync_message
from .._common.constants import (
SETTLEMENT_ABANDON,
SETTLEMENT_COMPLETE,
SETTLEMENT_DEFER,
SETTLEMENT_DEADLETTER,
ReceiveSettleMode,
MGMT_RESPONSE_MESSAGE_EXPIRATION,
MGMT_REQUEST_DEAD_LETTER_REASON,
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
MESSAGE_COMPLETE,
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
Expand All @@ -24,15 +19,41 @@
from .._common.utils import get_running_loop, utc_from_timestamp
from ..exceptions import MessageSettleFailed

_LOGGER = logging.getLogger(__name__)


class ReceivedMessage(sync_message.ReceivedMessage):
"""A Service Bus Message received from service side.
"""

def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None):
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs):
self._loop = loop or get_running_loop()
super(ReceivedMessage, self).__init__(message=message, mode=mode)
super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs)

async def _settle_message(
self,
settle_operation,
dead_letter_details=None
):
try:
if not self._is_deferred_message:
try:
await self._loop.run_in_executor(
None,
self._settle_via_receiver_link(settle_operation, dead_letter_details)
)
return
except RuntimeError as exception:
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
settle_operation,
exception
)
await self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

async def complete(self):
# type: () -> None
Expand All @@ -48,10 +69,7 @@ async def complete(self):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_COMPLETE)
try:
await self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token])
except Exception as e:
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
await self._settle_message(MESSAGE_COMPLETE)
self._settled = True

async def dead_letter(self, reason=None, description=None):
Expand All @@ -71,17 +89,7 @@ async def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEAD_LETTER)
details = {
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
try:
await self._receiver._settle_message(
SETTLEMENT_DEADLETTER,
[self.lock_token],
dead_letter_details=details
)
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)
await self._settle_message(MESSAGE_DEAD_LETTER)
self._settled = True

async def abandon(self):
Expand All @@ -95,10 +103,7 @@ async def abandon(self):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_ABANDON)
try:
await self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token])
except Exception as e:
raise MessageSettleFailed(MESSAGE_ABANDON, e)
await self._settle_message(MESSAGE_ABANDON)
self._settled = True

async def defer(self):
Expand All @@ -112,10 +117,7 @@ async def defer(self):
"""
# pylint: disable=protected-access
self._is_live(MESSAGE_DEFER)
try:
await self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token])
except Exception as e:
raise MessageSettleFailed(MESSAGE_DEFER, e)
await self._settle_message(MESSAGE_DEFER)
self._settled = True

async def renew_lock(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,3 +1076,23 @@ def test_queue_message_http_proxy_setting(self):
receiver = sb_client.get_queue_receiver(queue_name="mock")
assert receiver._config.http_proxy == http_proxy
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False) as sb_client:

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = Message("Test")
await sender.send(message)

async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
messages = await receiver.receive(max_wait_time=5)
await receiver._handler.message_handler.destroy_async() # destroy the underlying receiver link
assert len(messages) == 1
await messages[0].complete()
20 changes: 20 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,3 +1202,23 @@ def test_queue_message_http_proxy_setting(self):
receiver = sb_client.get_queue_receiver(queue_name="mock")
assert receiver._config.http_proxy == http_proxy
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string,
logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = Message("Test")
sender.send(message)

with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
messages = receiver.receive(max_wait_time=5)
receiver._handler.message_handler.destroy() # destroy the underlying receiver link
assert len(messages) == 1
messages[0].complete()

0 comments on commit 91d96a8

Please sign in to comment.