Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SB PyAMQP] Servicebus PyAMQP Working off of Anna's Branch #24975

Merged
merged 65 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
460f93a
Added pyamqp
annatisch Jun 17, 2022
f06f20a
Added message compatibility tests
annatisch Jun 18, 2022
283734c
Start rewiring messages for pyamqp
annatisch Jun 18, 2022
478bc2d
Added message backcompat layer
annatisch Jun 18, 2022
499d136
Successful message send
annatisch Jun 19, 2022
3d7bf31
Started receiver
annatisch Jun 19, 2022
6570dae
Successful message receive
annatisch Jun 19, 2022
47298c2
Message settlement
annatisch Jun 19, 2022
26875c1
Fix other settlement outcomes
annatisch Jun 20, 2022
343b70b
Make tests live
annatisch Jun 20, 2022
4927fea
message partition_key if it can't be decoded - output value
l0lawrence Jun 24, 2022
f9bdf15
removing references to __future___ annotations for now - not supporte…
l0lawrence Jun 24, 2022
78d338a
comparing name of transport - not the object
l0lawrence Jun 24, 2022
bc342b8
passing in a dummy frame for new formatting of SBMessageReceived
l0lawrence Jun 24, 2022
3c6b949
adding in fake frame for message in queue tests
l0lawrence Jun 24, 2022
4703a34
uamqp_mesage -> uamqp_message
l0lawrence Jun 27, 2022
7c0ed84
state should be auth_state
l0lawrence Jun 29, 2022
91de046
switching this back - _message is Message
l0lawrence Jul 5, 2022
aeffcb2
Improved typing
annatisch Jul 12, 2022
e3cb80e
Revert "Improved typing"
annatisch Jul 18, 2022
5f0d7e9
Fix TransportType enum
annatisch Jul 19, 2022
c0c472a
Fix import statement
annatisch Jul 19, 2022
c69464e
Fix application property encoding
annatisch Jul 19, 2022
995e83d
Skip queue iterator tests
annatisch Jul 19, 2022
6c7a464
Fix mgmt op timeout
annatisch Jul 19, 2022
c857288
Fixes to mgmt link
annatisch Jul 19, 2022
6f80901
Fix frame decode tests
annatisch Jul 20, 2022
335f4fd
More mgmt fixes
annatisch Jul 20, 2022
47962fe
Some message fixes
annatisch Jul 20, 2022
92ddce1
Fix session filters
annatisch Jul 20, 2022
6ad0e4d
Message tests
annatisch Jul 20, 2022
48ad8ce
Skip more iterator tests
annatisch Jul 20, 2022
8a37bc1
Update to retry policy
annatisch Jul 21, 2022
f77bf1d
adding in support for websockets is CE supported?
l0lawrence Jul 21, 2022
8143648
fixing up pylint-still some issues
l0lawrence Jul 21, 2022
542cd8f
some more pylint/TODOs
l0lawrence Jul 21, 2022
5852eb3
pylint changes
l0lawrence Jul 25, 2022
dc1805a
fixing pylint
l0lawrence Jul 25, 2022
6440d64
more pylint connection
l0lawrence Jul 25, 2022
5e5f19d
More test fixes
annatisch Jul 29, 2022
fdcf57b
Merge branch 'sb_pyamqp_anna' of https://github.com/l0lawrence/azure-…
annatisch Jul 29, 2022
0ae2b96
Fix scheduling
annatisch Jul 31, 2022
ff13c60
Fix retry test
annatisch Aug 1, 2022
992541e
Fix error handling
annatisch Aug 1, 2022
903b286
Sender refactor for timeout
annatisch Aug 2, 2022
68a10ed
Fix link detach
annatisch Aug 2, 2022
e066b7c
Fixed receiver control flow
annatisch Aug 3, 2022
8b179e0
Update pyamqp async code
annatisch Aug 4, 2022
83f9d78
Updated sb async
annatisch Aug 4, 2022
16b8610
Typing fix
annatisch Aug 4, 2022
a187fda
Some async fixes
annatisch Aug 9, 2022
543fb5d
Skip async iter tests
annatisch Aug 9, 2022
f4c4e13
Workaround socket timeout
annatisch Aug 14, 2022
b9c4035
Literal import
annatisch Aug 15, 2022
75202db
More async test fixes
annatisch Aug 15, 2022
3bc336f
Merge branch 'feature/eventhub/pyproto' into sb_pyamqp_anna
annatisch Sep 14, 2022
bc5b47d
Added keepalive
annatisch Sep 16, 2022
1a111fd
Pylint cleanup
annatisch Sep 18, 2022
c5242a1
fix mypy errors in _pyamqp
swathipil Sep 28, 2022
994b164
fix mypy sb layer
swathipil Sep 28, 2022
cc8e73a
fix bug
swathipil Sep 28, 2022
8bbf4ba
unused import
swathipil Sep 28, 2022
1e641b5
lint
swathipil Sep 28, 2022
3b373e9
fix failing tests
swathipil Sep 29, 2022
9f13064
ignore sb iterator receive samples
swathipil Sep 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions scripts/devops_tasks/test_run_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@
},
"azure-servicebus": {
"failure_and_recovery.py": (10),
"receive_iterator_queue.py": (10),
"sample_code_servicebus.py": (30),
"session_pool_receive.py": (20),
"receive_iterator_queue_async.py": (10),
"sample_code_servicebus_async.py": (30),
"session_pool_receive_async.py": (20),
},
}

Expand Down Expand Up @@ -109,6 +103,13 @@
"mgmt_topic_async.py",
"proxy_async.py",
"receive_deferred_message_queue_async.py",
"send_and_receive_amqp_annotated_message_async.py",
"send_and_receive_amqp_annotated_message.py",
"sample_code_servicebus_async.py",
"receive_iterator_queue_async.py",
"session_pool_receive_async.py",
"receive_iterator_queue.py",
"sample_code_servicebus.py"
],
"azure-communication-chat": [
"chat_client_sample_async.py",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from uamqp import constants
from ._pyamqp import constants

from ._version import VERSION

Expand Down
48 changes: 20 additions & 28 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,17 @@
import threading
from datetime import timedelta
from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any, Callable, Union
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
from azure.core.pipeline.policies import RetryMode

try:
from urllib.parse import quote_plus, urlparse
from urllib.parse import urlparse
except ImportError:
from urllib import quote_plus # type: ignore
from urlparse import urlparse # type: ignore

import uamqp
from uamqp import utils, compat
from uamqp.message import MessageProperties

from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
from azure.core.pipeline.policies import RetryMode
from ._pyamqp.utils import generate_sas_token, amqp_string_value
from ._pyamqp.message import Message, Properties
from ._pyamqp.client import AMQPClientSync

from ._common._configuration import Configuration
from .exceptions import (
Expand Down Expand Up @@ -146,11 +144,7 @@ def _generate_sas_token(uri, policy, key, expiry=None):
expiry = timedelta(hours=1) # Default to 1 hour.

abs_expiry = int(time.time()) + expiry.seconds
encoded_uri = quote_plus(uri).encode("utf-8") # pylint: disable=no-member
encoded_policy = quote_plus(policy).encode("utf-8") # pylint: disable=no-member
encoded_key = key.encode("utf-8")

token = utils.create_sas_token(encoded_policy, encoded_key, encoded_uri, expiry)
token = generate_sas_token(uri, policy, key, abs_expiry).encode("UTF-8")
return AccessToken(token=token, expires_on=abs_expiry)

def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
Expand Down Expand Up @@ -266,7 +260,7 @@ def __init__(self, fully_qualified_namespace, entity_name, credential, **kwargs)
self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8]
self._config = Configuration(**kwargs)
self._running = False
self._handler = None # type: uamqp.AMQPClient
self._handler = cast(AMQPClientSync, None) # type: AMQPClientSync
self._auth_uri = None
self._properties = create_properties(self._config.user_agent)
self._shutdown = threading.Event()
Expand Down Expand Up @@ -457,7 +451,7 @@ def _mgmt_request_response(
timeout=None,
**kwargs
):
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> Message
"""
Execute an amqp management operation.

Expand All @@ -480,29 +474,27 @@ def _mgmt_request_response(
if keep_alive_associated_link:
try:
application_properties = {
ASSOCIATEDLINKPROPERTYNAME: self._handler.message_handler.name
ASSOCIATEDLINKPROPERTYNAME: self._handler._link.name # pylint: disable=protected-access
}
except AttributeError:
pass

mgmt_msg = uamqp.Message(
body=message,
properties=MessageProperties(
reply_to=self._mgmt_target, encoding=self._config.encoding, **kwargs
),
mgmt_msg = Message( # type: ignore # TODO: fix mypy error
value=message,
properties=Properties(reply_to=self._mgmt_target, **kwargs),
application_properties=application_properties,
)
try:
return self._handler.mgmt_request(
status, description, response = self._handler.mgmt_request(
mgmt_msg,
mgmt_operation,
op_type=MGMT_REQUEST_OP_TYPE_ENTITY_MGMT,
operation=amqp_string_value(mgmt_operation),
operation_type=amqp_string_value(MGMT_REQUEST_OP_TYPE_ENTITY_MGMT),
node=self._mgmt_target.encode(self._config.encoding),
timeout=timeout * 1000 if timeout else None,
callback=callback,
timeout=timeout, # TODO: check if this should be seconds * 1000 if timeout else None,
)
return callback(status, response, description)
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
if isinstance(exp, TimeoutError): #TODO: was compat.TimeoutException
raise OperationTimeoutError(error=exp)
raise

Expand All @@ -512,7 +504,7 @@ def _mgmt_request_response_with_retry(
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
return self._do_retryable_operation(
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
mgmt_operation=mgmt_operation.decode("UTF-8"),
message=message,
callback=callback,
timeout=timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from typing import Optional, Dict, Any
from urllib.parse import urlparse

from uamqp.constants import TransportType, DEFAULT_AMQP_WSS_PORT, DEFAULT_AMQPS_PORT
from azure.core.pipeline.policies import RetryMode
from .._pyamqp.constants import TransportType


DEFAULT_AMQPS_PORT = 1571
DEFAULT_AMQP_WSS_PORT = 443


class Configuration(object): # pylint:disable=too-many-instance-attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
# license information.
# -------------------------------------------------------------------------
from enum import Enum

from uamqp import constants, types
from azure.core import CaseInsensitiveEnumMeta
from .._pyamqp import constants

VENDOR = b"com.microsoft"
DATETIMEOFFSET_EPOCH = 621355968000000000
Expand Down Expand Up @@ -162,6 +161,7 @@
TRACE_PROPERTY_ENCODING = "ascii"


MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024 # Backcompat with uAMQP
MESSAGE_PROPERTY_MAX_LENGTH = 128
# .NET TimeSpan.MaxValue: 10675199.02:48:05.4775807
MAX_DURATION_VALUE = 922337203685477
Expand All @@ -180,8 +180,8 @@ class ServiceBusMessageState(int, Enum):

# To enable extensible string enums for the public facing parameter, and translate to the "real" uamqp constants.
ServiceBusToAMQPReceiveModeMap = {
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.PeekLock,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.ReceiveAndDelete,
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.Second,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.First,
}


Expand All @@ -194,17 +194,4 @@ class ServiceBusSubQueue(str, Enum, metaclass=CaseInsensitiveEnumMeta):
TRANSFER_DEAD_LETTER = "transferdeadletter"


ANNOTATION_SYMBOL_PARTITION_KEY = types.AMQPSymbol(_X_OPT_PARTITION_KEY)
ANNOTATION_SYMBOL_VIA_PARTITION_KEY = types.AMQPSymbol(_X_OPT_VIA_PARTITION_KEY)
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME = types.AMQPSymbol(
_X_OPT_SCHEDULED_ENQUEUE_TIME
)

ANNOTATION_SYMBOL_KEY_MAP = {
_X_OPT_PARTITION_KEY: ANNOTATION_SYMBOL_PARTITION_KEY,
_X_OPT_VIA_PARTITION_KEY: ANNOTATION_SYMBOL_VIA_PARTITION_KEY,
_X_OPT_SCHEDULED_ENQUEUE_TIME: ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME,
}


NEXT_AVAILABLE_SESSION = ServiceBusSessionFilter.NEXT_AVAILABLE
Loading