Skip to content

Commit

Permalink
Add better autolockrenew on-failure handling capabilities. (#12307)
Browse files Browse the repository at this point in the history
* autolockrenewer can now take a callback that fires when for any non-user-defined reason (e.g. not due to settlement or shutdown) a lock is lost on an auto-lock-renewed session or message.  Adds tests as well and changelog notes.
* add a test for receiver shutdown halting autorenewal (and corrosponding mocks)
* Add proper typing and documentation to aio code.
* Rename autolockrenew shutdown to close to normalize method name with other comparable instances. Adjust tests/docs/guides/etc.
* Add changelog entry for the on lock renew callback.
* make unused ivars truly internal (loop, executor) within autolockrenew; make tests be more precise by explicitly clearing results list between trials.
* increase idle_timeout for receiveanddelete test to avoid flakiness.
  • Loading branch information
KieranBrantnerMagee authored Jul 23, 2020
1 parent fb25c20 commit 5cf31c0
Show file tree
Hide file tree
Showing 18 changed files with 650 additions and 238 deletions.
10 changes: 8 additions & 2 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
- Removed property `settled` on `PeekMessage`.
- Removed property `expired` on `ReceivedMessage`.

* Add `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion)

**Breaking Changes**

* `AutoLockRenew.sleep_time` and `AutoLockRenew.renew_period` have been made internal as `_sleep_time` and `_renew_period` respectively, as it is not expected a user will have to interact with them.
* `AutoLockRenew.shutdown` is now `AutoLockRenew.close` to normalize with other equivelent behaviors.

## 7.0.0b4 (2020-07-06)

Expand All @@ -35,8 +41,8 @@

**BugFixes**

* Fixed bug where sync AutoLockRenew does not shutdown itself timely.
* Fixed bug where async AutoLockRenew does not support context manager.
* Fixed bug where sync `AutoLockRenew` does not shutdown itself timely.
* Fixed bug where async `AutoLockRenew` does not support context manager.

**Breaking Changes**

Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

# Can also be called via "with AutoLockRenew() as renewer" to automate shutdown.
# Can also be called via "with AutoLockRenew() as renewer" to automate closing.
renewer = AutoLockRenew()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver:
Expand All @@ -390,7 +390,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
renewer.register(msg, timeout=60)
# Do your application logic here
msg.complete()
renewer.shutdown()
renewer.close()
```

If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._base_handler import ServiceBusSharedKeyCredential
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage
from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE
from ._common.utils import AutoLockRenew
from ._common.auto_lock_renewer import AutoLockRenew

TransportType = constants.TransportType

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import datetime
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING

from .._servicebus_session import ServiceBusSession
from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from .utils import renewable_start_time, utc_now

if TYPE_CHECKING:
from typing import Callable, Union, Optional, Awaitable
from .message import ReceivedMessage
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage],
Optional[Exception]], None]

_log = logging.getLogger(__name__)

class AutoLockRenew(object):
"""Auto renew locks for messages and sessions using a background thread pool.
:param executor: A user-specified thread pool. This cannot be combined with
setting `max_workers`.
:type executor: ~concurrent.futures.ThreadPoolExecutor
:param max_workers: Specify the maximum workers in the thread pool. If not
specified the number used will be derived from the core count of the environment.
This cannot be combined with `executor`.
:type max_workers: int
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_message_sync]
:end-before: [END auto_lock_renew_message_sync]
:language: python
:dedent: 4
:caption: Automatically renew a message lock
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_session_sync]
:end-before: [END auto_lock_renew_session_sync]
:language: python
:dedent: 4
:caption: Automatically renew a session lock
"""

def __init__(self, executor=None, max_workers=None):
self._executor = executor or ThreadPoolExecutor(max_workers=max_workers)
self._shutdown = threading.Event()
self._sleep_time = 1
self._renew_period = 10

def __enter__(self):
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
return self

def __exit__(self, *args):
self.close()

def _renewable(self, renewable):
# pylint: disable=protected-access
if self._shutdown.is_set():
return False
if hasattr(renewable, '_settled') and renewable._settled:
return False
if not renewable._receiver._running:
return False
if renewable._lock_expired:
return False
return True

def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None):
# pylint: disable=protected-access
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
error = None
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
try:
while self._renewable(renewable):
if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout):
_log.debug("Reached auto lock renew timeout - letting lock expire.")
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period):
_log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period)
renewable.renew_lock()
time.sleep(self._sleep_time)
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
error = e
renewable.auto_renew_error = e
clean_shutdown = not renewable._lock_expired
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
"Failed to auto-renew lock",
inner_exception=e)
renewable.auto_renew_error = error
finally:
if on_lock_renew_failure and not clean_shutdown:
on_lock_renew_failure(renewable, error)

def register(self, renewable, timeout=300, on_lock_renew_failure=None):
"""Register a renewable entity for automatic lock renewal.
:param renewable: A locked entity that needs to be renewed.
:type renewable: ~azure.servicebus.ReceivedMessage or
~azure.servicebus.ServiceBusSession
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
:param Optional[LockRenewFailureCallback] on_lock_renew_failure:
A callback may be specified to be called when the lock is lost on the renewable that is being registered.
Default value is None (no callback).
"""
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
starttime = renewable_start_time(renewable)
self._executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure)

def close(self, wait=True):
"""Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads.
:param wait: Whether to block until thread pool has shutdown. Default is `True`.
:type wait: bool
"""
self._shutdown.set()
self._executor.shutdown(wait=wait)
103 changes: 1 addition & 102 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@
import sys
import datetime
import logging
import threading
import time
import functools
import platform
from typing import Optional, Dict
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor

from uamqp import authentication, types

from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from ..exceptions import ServiceBusError
from .._version import VERSION
from .constants import (
JWT_TOKEN_SCOPE,
Expand Down Expand Up @@ -180,101 +177,3 @@ def generate_dead_letter_entity_name(
)

return entity_name


class AutoLockRenew(object):
"""Auto renew locks for messages and sessions using a background thread pool.
:param executor: A user-specified thread pool. This cannot be combined with
setting `max_workers`.
:type executor: ~concurrent.futures.ThreadPoolExecutor
:param max_workers: Specify the maximum workers in the thread pool. If not
specified the number used will be derived from the core count of the environment.
This cannot be combined with `executor`.
:type max_workers: int
.. admonition:: Example:
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_message_sync]
:end-before: [END auto_lock_renew_message_sync]
:language: python
:dedent: 4
:caption: Automatically renew a message lock
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_session_sync]
:end-before: [END auto_lock_renew_session_sync]
:language: python
:dedent: 4
:caption: Automatically renew a session lock
"""

def __init__(self, executor=None, max_workers=None):
self.executor = executor or ThreadPoolExecutor(max_workers=max_workers)
self._shutdown = threading.Event()
self.sleep_time = 1
self.renew_period = 10

def __enter__(self):
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
return self

def __exit__(self, *args):
self.shutdown()

def _renewable(self, renewable):
if self._shutdown.is_set():
return False
if hasattr(renewable, '_settled') and renewable._settled: # pylint: disable=protected-access
return False
if renewable._lock_expired: # pylint: disable=protected-access
return False
return True

def _auto_lock_renew(self, renewable, starttime, timeout):
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
try:
while self._renewable(renewable):
if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout):
_log.debug("Reached auto lock renew timeout - letting lock expire.")
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period):
_log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period)
renewable.renew_lock()
time.sleep(self.sleep_time)
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
"Failed to auto-renew lock",
inner_exception=e)
renewable.auto_renew_error = error

def register(self, renewable, timeout=300):
"""Register a renewable entity for automatic lock renewal.
:param renewable: A locked entity that needs to be renewed.
:type renewable: ~azure.servicebus.ReceivedMessage or
~azure.servicebus.Session
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
"""
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
starttime = renewable_start_time(renewable)
self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout)

def shutdown(self, wait=True):
"""Shutdown the thread pool to clean up any remaining lock renewal threads.
:param wait: Whether to block until thread pool has shutdown. Default is `True`.
:type wait: bool
"""
self._shutdown.set()
self.executor.shutdown(wait=wait)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ._servicebus_session_receiver_async import ServiceBusSessionReceiver
from ._servicebus_session_async import ServiceBusSession
from ._servicebus_client_async import ServiceBusClient
from ._async_utils import AutoLockRenew
from ._async_auto_lock_renewer import AutoLockRenew

__all__ = [
'ReceivedMessage',
Expand Down
Loading

0 comments on commit 5cf31c0

Please sign in to comment.