Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better autolockrenew on-failure handling capabilities. #12307

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
- Removed property `settled` on `PeekMessage`.
- Removed property `expired` on `ReceivedMessage`.

* Add `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion)

**Breaking Changes**

* `AutoLockRenew.sleep_time` and `AutoLockRenew.renew_period` have been made internal as `_sleep_time` and `_renew_period` respectively, as it is not expected a user will have to interact with them.
* `AutoLockRenew.shutdown` is now `AutoLockRenew.close` to normalize with other equivelent behaviors.

## 7.0.0b4 (2020-07-06)

Expand All @@ -35,8 +41,8 @@

**BugFixes**

* Fixed bug where sync AutoLockRenew does not shutdown itself timely.
* Fixed bug where async AutoLockRenew does not support context manager.
* Fixed bug where sync `AutoLockRenew` does not shutdown itself timely.
* Fixed bug where async `AutoLockRenew` does not support context manager.

**Breaking Changes**

Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

# Can also be called via "with AutoLockRenew() as renewer" to automate shutdown.
# Can also be called via "with AutoLockRenew() as renewer" to automate closing.
renewer = AutoLockRenew()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver:
Expand All @@ -390,7 +390,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
renewer.register(msg, timeout=60)
# Do your application logic here
msg.complete()
renewer.shutdown()
renewer.close()
```

If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._base_handler import ServiceBusSharedKeyCredential
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage
from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE
from ._common.utils import AutoLockRenew
from ._common.auto_lock_renewer import AutoLockRenew

TransportType = constants.TransportType

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import datetime
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING

from .._servicebus_session import ServiceBusSession
from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from .utils import renewable_start_time, utc_now

if TYPE_CHECKING:
from typing import Callable, Union, Optional, Awaitable
from .message import ReceivedMessage
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage],
Optional[Exception]], None]

_log = logging.getLogger(__name__)

class AutoLockRenew(object):
"""Auto renew locks for messages and sessions using a background thread pool.

:param executor: A user-specified thread pool. This cannot be combined with
setting `max_workers`.
:type executor: ~concurrent.futures.ThreadPoolExecutor
:param max_workers: Specify the maximum workers in the thread pool. If not
specified the number used will be derived from the core count of the environment.
This cannot be combined with `executor`.
:type max_workers: int

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_message_sync]
:end-before: [END auto_lock_renew_message_sync]
:language: python
:dedent: 4
:caption: Automatically renew a message lock

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_session_sync]
:end-before: [END auto_lock_renew_session_sync]
:language: python
:dedent: 4
:caption: Automatically renew a session lock

"""

def __init__(self, executor=None, max_workers=None):
self._executor = executor or ThreadPoolExecutor(max_workers=max_workers)
self._shutdown = threading.Event()
self._sleep_time = 1
self._renew_period = 10

def __enter__(self):
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
return self

def __exit__(self, *args):
self.close()

def _renewable(self, renewable):
# pylint: disable=protected-access
if self._shutdown.is_set():
return False
if hasattr(renewable, '_settled') and renewable._settled:
return False
if not renewable._receiver._running:
return False
if renewable._lock_expired:
return False
return True

def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None):
# pylint: disable=protected-access
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
error = None
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
try:
while self._renewable(renewable):
if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout):
_log.debug("Reached auto lock renew timeout - letting lock expire.")
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period):
_log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period)
renewable.renew_lock()
time.sleep(self._sleep_time)
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
error = e
renewable.auto_renew_error = e
clean_shutdown = not renewable._lock_expired
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
"Failed to auto-renew lock",
inner_exception=e)
renewable.auto_renew_error = error
finally:
if on_lock_renew_failure and not clean_shutdown:
on_lock_renew_failure(renewable, error)

def register(self, renewable, timeout=300, on_lock_renew_failure=None):
"""Register a renewable entity for automatic lock renewal.

:param renewable: A locked entity that needs to be renewed.
:type renewable: ~azure.servicebus.ReceivedMessage or
~azure.servicebus.ServiceBusSession
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
:param Optional[LockRenewFailureCallback] on_lock_renew_failure:
A callback may be specified to be called when the lock is lost on the renewable that is being registered.
Default value is None (no callback).
"""
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
starttime = renewable_start_time(renewable)
self._executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure)

def close(self, wait=True):
"""Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads.

:param wait: Whether to block until thread pool has shutdown. Default is `True`.
:type wait: bool
"""
self._shutdown.set()
self._executor.shutdown(wait=wait)
103 changes: 1 addition & 102 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@
import sys
import datetime
import logging
import threading
import time
import functools
import platform
from typing import Optional, Dict
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor

from uamqp import authentication, types

from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from ..exceptions import ServiceBusError
from .._version import VERSION
from .constants import (
JWT_TOKEN_SCOPE,
Expand Down Expand Up @@ -180,101 +177,3 @@ def generate_dead_letter_entity_name(
)

return entity_name


class AutoLockRenew(object):
"""Auto renew locks for messages and sessions using a background thread pool.

:param executor: A user-specified thread pool. This cannot be combined with
setting `max_workers`.
:type executor: ~concurrent.futures.ThreadPoolExecutor
:param max_workers: Specify the maximum workers in the thread pool. If not
specified the number used will be derived from the core count of the environment.
This cannot be combined with `executor`.
:type max_workers: int

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_message_sync]
:end-before: [END auto_lock_renew_message_sync]
:language: python
:dedent: 4
:caption: Automatically renew a message lock

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
:start-after: [START auto_lock_renew_session_sync]
:end-before: [END auto_lock_renew_session_sync]
:language: python
:dedent: 4
:caption: Automatically renew a session lock

"""

def __init__(self, executor=None, max_workers=None):
self.executor = executor or ThreadPoolExecutor(max_workers=max_workers)
self._shutdown = threading.Event()
self.sleep_time = 1
self.renew_period = 10

def __enter__(self):
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
return self

def __exit__(self, *args):
self.shutdown()

def _renewable(self, renewable):
if self._shutdown.is_set():
return False
if hasattr(renewable, '_settled') and renewable._settled: # pylint: disable=protected-access
return False
if renewable._lock_expired: # pylint: disable=protected-access
return False
return True

def _auto_lock_renew(self, renewable, starttime, timeout):
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
try:
while self._renewable(renewable):
if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout):
_log.debug("Reached auto lock renew timeout - letting lock expire.")
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period):
_log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period)
renewable.renew_lock()
time.sleep(self.sleep_time)
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
"Failed to auto-renew lock",
inner_exception=e)
renewable.auto_renew_error = error

def register(self, renewable, timeout=300):
"""Register a renewable entity for automatic lock renewal.

:param renewable: A locked entity that needs to be renewed.
:type renewable: ~azure.servicebus.ReceivedMessage or
~azure.servicebus.Session
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
"""
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
starttime = renewable_start_time(renewable)
self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout)

def shutdown(self, wait=True):
"""Shutdown the thread pool to clean up any remaining lock renewal threads.

:param wait: Whether to block until thread pool has shutdown. Default is `True`.
:type wait: bool
"""
self._shutdown.set()
self.executor.shutdown(wait=wait)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ._servicebus_session_receiver_async import ServiceBusSessionReceiver
from ._servicebus_session_async import ServiceBusSession
from ._servicebus_client_async import ServiceBusClient
from ._async_utils import AutoLockRenew
from ._async_auto_lock_renewer import AutoLockRenew

__all__ = [
'ReceivedMessage',
Expand Down
Loading