Skip to content

Commit

Permalink
[SB PyAMQP] Servicebus PyAMQP Working off of Anna's Branch (#24975)
Browse files Browse the repository at this point in the history
* Added pyamqp

* Added message compatibility tests

* Start rewiring messages for pyamqp

* Added message backcompat layer

* Successful message send

* Started receiver

* Successful message receive

* Message settlement

* Fix other settlement outcomes

* Make tests live

* message partition_key if it can't be decoded - output value

* removing references to __future___ annotations for now - not supported in 3.6

* comparing name of transport - not the object

* passing in a dummy frame for new formatting of SBMessageReceived

* adding in fake frame for message in queue tests

* uamqp_mesage -> uamqp_message

* state should be auth_state

* switching this back - _message is Message

* Improved typing

* Revert "Improved typing"

This reverts commit aeffcb2.

* Fix TransportType enum

* Fix import statement

* Fix application property encoding

* Skip queue iterator tests

* Fix mgmt op timeout

* Fixes to mgmt link

* Fix frame decode tests

* More mgmt fixes

* Some message fixes

* Fix session filters

* Message tests

* Skip more iterator tests

* Update to retry policy

* adding in support for websockets  is CE supported?

* fixing up pylint-still some issues

* some more pylint/TODOs

* pylint changes

* fixing pylint

* more pylint connection

* More test fixes

* Fix scheduling

* Fix retry test

* Fix error handling

* Sender refactor for timeout

* Fix link detach

* Fixed receiver control flow

* Update pyamqp async code

* Updated sb async

* Typing fix

* Some async fixes

* Skip async iter tests

* Workaround socket timeout

* Literal import

* More async test fixes

* Added keepalive

* Pylint cleanup

* fix mypy errors in _pyamqp

* fix mypy sb layer

* fix bug

* unused import

* lint

* fix failing tests

* ignore sb iterator receive samples

Co-authored-by: antisch <[email protected]>
Co-authored-by: swathipil <[email protected]>
  • Loading branch information
3 people authored Sep 29, 2022
1 parent 0020fc1 commit 5305372
Show file tree
Hide file tree
Showing 67 changed files with 13,932 additions and 932 deletions.
13 changes: 7 additions & 6 deletions scripts/devops_tasks/test_run_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@
},
"azure-servicebus": {
"failure_and_recovery.py": (10),
"receive_iterator_queue.py": (10),
"sample_code_servicebus.py": (30),
"session_pool_receive.py": (20),
"receive_iterator_queue_async.py": (10),
"sample_code_servicebus_async.py": (30),
"session_pool_receive_async.py": (20),
},
}

Expand Down Expand Up @@ -109,6 +103,13 @@
"mgmt_topic_async.py",
"proxy_async.py",
"receive_deferred_message_queue_async.py",
"send_and_receive_amqp_annotated_message_async.py",
"send_and_receive_amqp_annotated_message.py",
"sample_code_servicebus_async.py",
"receive_iterator_queue_async.py",
"session_pool_receive_async.py",
"receive_iterator_queue.py",
"sample_code_servicebus.py"
],
"azure-communication-chat": [
"chat_client_sample_async.py",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from uamqp import constants
from ._pyamqp import constants

from ._version import VERSION

Expand Down
48 changes: 20 additions & 28 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,17 @@
import threading
from datetime import timedelta
from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any, Callable, Union
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
from azure.core.pipeline.policies import RetryMode

try:
from urllib.parse import quote_plus, urlparse
from urllib.parse import urlparse
except ImportError:
from urllib import quote_plus # type: ignore
from urlparse import urlparse # type: ignore

import uamqp
from uamqp import utils, compat
from uamqp.message import MessageProperties

from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
from azure.core.pipeline.policies import RetryMode
from ._pyamqp.utils import generate_sas_token, amqp_string_value
from ._pyamqp.message import Message, Properties
from ._pyamqp.client import AMQPClientSync

from ._common._configuration import Configuration
from .exceptions import (
Expand Down Expand Up @@ -146,11 +144,7 @@ def _generate_sas_token(uri, policy, key, expiry=None):
expiry = timedelta(hours=1) # Default to 1 hour.

abs_expiry = int(time.time()) + expiry.seconds
encoded_uri = quote_plus(uri).encode("utf-8") # pylint: disable=no-member
encoded_policy = quote_plus(policy).encode("utf-8") # pylint: disable=no-member
encoded_key = key.encode("utf-8")

token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry)
token = generate_sas_token(uri, policy, key, abs_expiry).encode("UTF-8")
return AccessToken(token=token, expires_on=abs_expiry)

def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
Expand Down Expand Up @@ -266,7 +260,7 @@ def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs)
self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8]
self._config = Configuration(**kwargs)
self._running = False
self._handler = None # type: uamqp.AMQPClient
self._handler = cast(AMQPClientSync, None) # type: AMQPClientSync
self._auth_uri = None
self._properties = create_properties(self._config.user_agent)
self._shutdown = threading.Event()
Expand Down Expand Up @@ -457,7 +451,7 @@ def _mgmt_request_response(
timeout=None,
**kwargs
):
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> Message
"""
Execute an amqp management operation.
Expand All @@ -480,29 +474,27 @@ def _mgmt_request_response(
if keep_alive_associated_link:
try:
application_properties = {
ASSOCIATEDLINKPROPERTYNAME: self._handler.message_handler.name
ASSOCIATEDLINKPROPERTYNAME: self._handler._link.name # pylint: disable=protected-access
}
except AttributeError:
pass

mgmt_msg = uamqp.Message(
body=message,
properties=MessageProperties(
reply_to=self._mgmt_target, encoding=self._config.encoding, **kwargs
),
mgmt_msg = Message( # type: ignore # TODO: fix mypy error
value=message,
properties=Properties(reply_to=self._mgmt_target, **kwargs),
application_properties=application_properties,
)
try:
return self._handler.mgmt_request(
status, description, response = self._handler.mgmt_request(
mgmt_msg,
mgmt_operation,
op_type=MGMT_REQUEST_OP_TYPE_ENTITY_MGMT,
operation=amqp_string_value(mgmt_operation),
operation_type=amqp_string_value(MGMT_REQUEST_OP_TYPE_ENTITY_MGMT),
node=self._mgmt_target.encode(self._config.encoding),
timeout=timeout * 1000 if timeout else None,
callback=callback,
timeout=timeout, # TODO: check if this should be seconds * 1000 if timeout else None,
)
return callback(status, response, description)
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
if isinstance(exp, TimeoutError): #TODO: was compat.TimeoutException
raise OperationTimeoutError(error=exp)
raise

Expand All @@ -512,7 +504,7 @@ def _mgmt_request_response_with_retry(
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
return self._do_retryable_operation(
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
mgmt_operation=mgmt_operation.decode("UTF-8"),
message=message,
callback=callback,
timeout=timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from typing import Optional, Dict, Any
from urllib.parse import urlparse

from uamqp.constants import TransportType, DEFAULT_AMQP_WSS_PORT, DEFAULT_AMQPS_PORT
from azure.core.pipeline.policies import RetryMode
from .._pyamqp.constants import TransportType


DEFAULT_AMQPS_PORT = 1571
DEFAULT_AMQP_WSS_PORT = 443


class Configuration(object): # pylint:disable=too-many-instance-attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
# license information.
# -------------------------------------------------------------------------
from enum import Enum

from uamqp import constants, types
from azure.core import CaseInsensitiveEnumMeta
from .._pyamqp import constants

VENDOR = b"com.microsoft"
DATETIMEOFFSET_EPOCH = 621355968000000000
Expand Down Expand Up @@ -162,6 +161,7 @@
TRACE_PROPERTY_ENCODING = "ascii"


MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024 # Backcompat with uAMQP
MESSAGE_PROPERTY_MAX_LENGTH = 128
# .NET TimeSpan.MaxValue: 10675199.02:48:05.4775807
MAX_DURATION_VALUE = 922337203685477
Expand All @@ -180,8 +180,8 @@ class ServiceBusMessageState(int, Enum):

# To enable extensible string enums for the public facing parameter, and translate to the "real" uamqp constants.
ServiceBusToAMQPReceiveModeMap = {
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.PeekLock,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.ReceiveAndDelete,
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.Second,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.First,
}


Expand All @@ -194,17 +194,4 @@ class ServiceBusSubQueue(str, Enum, metaclass=CaseInsensitiveEnumMeta):
TRANSFER_DEAD_LETTER = "transferdeadletter"


ANNOTATION_SYMBOL_PARTITION_KEY = types.AMQPSymbol(_X_OPT_PARTITION_KEY)
ANNOTATION_SYMBOL_VIA_PARTITION_KEY = types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY)
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = types.AMQPSymbol(
_X_OPT_SCHEDULED_ENQUEUE_TIME
)

ANNOTATION_SYMBOL_KEY_MAP = {
_X_OPT_PARTITION_KEY: ANNOTATION_SYMBOL_PARTITION_KEY,
_X_OPT_VIA_PARTITION_KEY: ANNOTATION_SYMBOL_VIA_PARTITION_KEY,
_X_OPT_SCHEDULED_ENQUEUE_TIME: ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME,
}


NEXT_AVAILABLE_SESSION = ServiceBusSessionFilter.NEXT_AVAILABLE
Loading

0 comments on commit 5305372

Please sign in to comment.