Skip to content

Commit

Permalink
Improved typing
Browse files Browse the repository at this point in the history
  • Loading branch information
annatisch committed Jul 12, 2022
1 parent 91de046 commit aeffcb2
Show file tree
Hide file tree
Showing 11 changed files with 1,136 additions and 794 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def decode_frame(data):
for i in range(count):
buffer, fields[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:])
if frame_type == 20:
# This is a transfer frame - add the remaining bytes in the buffer as the payload.
fields.append(buffer)
return frame_type, fields

Expand Down
587 changes: 430 additions & 157 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_encode.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
from collections import namedtuple
from enum import Enum
import struct

Expand Down Expand Up @@ -64,8 +63,6 @@

DEFAULT_LINK_CREDIT = 10000

FIELD = namedtuple('field', 'name, type, mandatory, default, multiple')

STRING_FILTER = b"apache.org:selector-filter:string"

DEFAULT_AUTH_TIMEOUT = 60
Expand Down
173 changes: 85 additions & 88 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@
# - the behavior of Messages which have been transferred on the Link, but have not yet reached a
# terminal state at the receiver, when the source is destroyed.

from collections import namedtuple
from enum import IntEnum, Enum
from typing import AnyStr, Dict, List, Optional, Tuple

from .types import AMQPTypes, FieldDefinition, ObjDefinition
from .constants import FIELD
from .performatives import _CAN_ADD_DOCSTRING
from .outcomes import SETTLEMENT_TYPES
from .types import (
AMQPTypes,
FieldDefinition,
ObjDefinition,
FIELD,
Performative,
AMQP_STRUCTURED_TYPES
)


class TerminusDurability(object):
class TerminusDurability(IntEnum):
"""Durability policy for a terminus.
<type name="terminus-durability" class="restricted" source="uint">
Expand All @@ -33,15 +40,15 @@ class TerminusDurability(object):
Determines which state of the terminus is held durably.
"""
#: No Terminus state is retained durably
NoDurability = 0
NoDurability: int = 0
#: Only the existence and configuration of the Terminus is retained durably.
Configuration = 1
Configuration: int = 1
#: In addition to the existence and configuration of the Terminus, the unsettled state for durable
#: messages is retained durably.
UnsettledState = 2
UnsettledState: int = 2


class ExpiryPolicy(object):
class ExpiryPolicy(bytes, Enum):
"""Expiry policy for a terminus.
<type name="terminus-expiry-policy" class="restricted" source="symbol">
Expand All @@ -57,16 +64,16 @@ class ExpiryPolicy(object):
re-met, the expiry timer restarts from its originally configured timeout value.
"""
#: The expiry timer starts when Terminus is detached.
LinkDetach = b"link-detach"
LinkDetach: bytes = b"link-detach"
#: The expiry timer starts when the most recently associated session is ended.
SessionEnd = b"session-end"
SessionEnd: bytes = b"session-end"
#: The expiry timer starts when most recently associated connection is closed.
ConnectionClose = b"connection-close"
ConnectionClose: bytes = b"connection-close"
#: The Terminus never expires.
Never = b"never"
Never: bytes = b"never"


class DistributionMode(object):
class DistributionMode(bytes, Enum):
"""Link distribution policy.
<type name="std-dist-mode" class="restricted" source="symbol" provides="distribution-mode">
Expand All @@ -78,87 +85,57 @@ class DistributionMode(object):
"""
#: Once successfully transferred over the link, the message will no longer be available
#: to other links from the same node.
Move = b'move'
Move: bytes = b'move'
#: Once successfully transferred over the link, the message is still available for other
#: links from the same node.
Copy = b'copy'
Copy: bytes = b'copy'


class LifeTimePolicy(object):
class LifeTimePolicy(IntEnum):
#: Lifetime of dynamic node scoped to lifetime of link which caused creation.
#: A node dynamically created with this lifetime policy will be deleted at the point that the link
#: which caused its creation ceases to exist.
DeleteOnClose = 0x0000002b
DeleteOnClose: int = 0x0000002b
#: Lifetime of dynamic node scoped to existence of links to the node.
#: A node dynamically created with this lifetime policy will be deleted at the point that there remain
#: no links for which the node is either the source or target.
DeleteOnNoLinks = 0x0000002c
DeleteOnNoLinks: int = 0x0000002c
#: Lifetime of dynamic node scoped to existence of messages on the node.
#: A node dynamically created with this lifetime policy will be deleted at the point that the link which
#: caused its creation no longer exists and there remain no messages at the node.
DeleteOnNoMessages = 0x0000002d
DeleteOnNoMessages: int = 0x0000002d
#: Lifetime of node scoped to existence of messages on or links to the node.
#: A node dynamically created with this lifetime policy will be deleted at the point that the there are no
#: links which have this node as their source or target, and there remain no messages at the node.
DeleteOnNoLinksOrMessages = 0x0000002e
DeleteOnNoLinksOrMessages: int = 0x0000002e


class SupportedOutcomes(object):
class SupportedOutcomes(bytes, Enum):
#: Indicates successful processing at the receiver.
accepted = b"amqp:accepted:list"
accepted: bytes = b"amqp:accepted:list"
#: Indicates an invalid and unprocessable message.
rejected = b"amqp:rejected:list"
rejected: bytes = b"amqp:rejected:list"
#: Indicates that the message was not (and will not be) processed.
released = b"amqp:released:list"
released: bytes = b"amqp:released:list"
#: Indicates that the message was modified, but not processed.
modified = b"amqp:modified:list"
modified: bytes = b"amqp:modified:list"


class ApacheFilters(object):
class ApacheFilters(bytes, Enum):
#: Exact match on subject - analogous to legacy AMQP direct exchange bindings.
legacy_amqp_direct_binding = b"apache.org:legacy-amqp-direct-binding:string"
legacy_amqp_direct_binding: bytes = b"apache.org:legacy-amqp-direct-binding:string"
#: Pattern match on subject - analogous to legacy AMQP topic exchange bindings.
legacy_amqp_topic_binding = b"apache.org:legacy-amqp-topic-binding:string"
legacy_amqp_topic_binding: bytes = b"apache.org:legacy-amqp-topic-binding:string"
#: Matching on message headers - analogous to legacy AMQP headers exchange bindings.
legacy_amqp_headers_binding = b"apache.org:legacy-amqp-headers-binding:map"
legacy_amqp_headers_binding: bytes = b"apache.org:legacy-amqp-headers-binding:map"
#: Filter out messages sent from the same connection as the link is currently associated with.
no_local_filter = b"apache.org:no-local-filter:list"
no_local_filter: bytes = b"apache.org:no-local-filter:list"
#: SQL-based filtering syntax.
selector_filter = b"apache.org:selector-filter:string"
selector_filter: bytes = b"apache.org:selector-filter:string"


Source = namedtuple(
'source',
[
'address',
'durable',
'expiry_policy',
'timeout',
'dynamic',
'dynamic_node_properties',
'distribution_mode',
'filters',
'default_outcome',
'outcomes',
'capabilities'
])
Source.__new__.__defaults__ = (None,) * len(Source._fields)
Source._code = 0x00000028
Source._definition = (
FIELD("address", AMQPTypes.string, False, None, False),
FIELD("durable", AMQPTypes.uint, False, "none", False),
FIELD("expiry_policy", AMQPTypes.symbol, False, ExpiryPolicy.SessionEnd, False),
FIELD("timeout", AMQPTypes.uint, False, 0, False),
FIELD("dynamic", AMQPTypes.boolean, False, False, False),
FIELD("dynamic_node_properties", FieldDefinition.node_properties, False, None, False),
FIELD("distribution_mode", AMQPTypes.symbol, False, None, False),
FIELD("filters", FieldDefinition.filter_set, False, None, False),
FIELD("default_outcome", ObjDefinition.delivery_state, False, None, False),
FIELD("outcomes", AMQPTypes.symbol, False, None, True),
FIELD("capabilities", AMQPTypes.symbol, False, None, True))
if _CAN_ADD_DOCSTRING:
Source.__doc__ = """
For containers which do not implement address resolution (and do not admit spontaneous link
class Source(Performative):
"""For containers which do not implement address resolution (and do not admit spontaneous link
attachment from their partners) but are instead only used as producers of messages, it is unnecessary to provide
spurious detail on the source. For this purpose it is possible to use a "minimal" source in which all the
fields are left unset.
Expand Down Expand Up @@ -214,32 +191,35 @@ class ApacheFilters(object):
:param list(bytes) capabilities: The extension capabilities the sender supports/desires.
See http://www.amqp.org/specification/1.0/source-capabilities.
"""
_code: int = 0x00000028
_definition: List[Optional[FIELD]] = [
FIELD(AMQPTypes.string, False),
FIELD(AMQPTypes.uint, False),
FIELD(AMQPTypes.symbol, False),
FIELD(AMQPTypes.uint, False),
FIELD(AMQPTypes.boolean, False),
FIELD(FieldDefinition.node_properties, False),
FIELD(AMQPTypes.symbol, False),
FIELD(FieldDefinition.filter_set, False),
FIELD(ObjDefinition.delivery_state, False),
FIELD(AMQPTypes.symbol, True),
FIELD(AMQPTypes.symbol, True)
]
address: Optional[str] = None
durable: int = TerminusDurability.NoDurability
expiry_policy: bytes = ExpiryPolicy.SessionEnd
timeout: int = 0
dynamic: bool = False
dynamic_node_properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None
distribution_mode: Optional[bytes] = None
filters: Optional[Dict[AnyStr, Optional[Tuple[AnyStr, AMQP_STRUCTURED_TYPES]]]] = None
default_outcome: Optional[SETTLEMENT_TYPES] = None
outcomes: Optional[List[AnyStr]] = None
capabilities: Optional[List[AnyStr]] = None


Target = namedtuple(
'target',
[
'address',
'durable',
'expiry_policy',
'timeout',
'dynamic',
'dynamic_node_properties',
'capabilities'
])
Target._code = 0x00000029
Target.__new__.__defaults__ = (None,) * len(Target._fields)
Target._definition = (
FIELD("address", AMQPTypes.string, False, None, False),
FIELD("durable", AMQPTypes.uint, False, "none", False),
FIELD("expiry_policy", AMQPTypes.symbol, False, ExpiryPolicy.SessionEnd, False),
FIELD("timeout", AMQPTypes.uint, False, 0, False),
FIELD("dynamic", AMQPTypes.boolean, False, False, False),
FIELD("dynamic_node_properties", FieldDefinition.node_properties, False, None, False),
FIELD("capabilities", AMQPTypes.symbol, False, None, True))
if _CAN_ADD_DOCSTRING:
Target.__doc__ = """
For containers which do not implement address resolution (and do not admit spontaneous link attachment
class Target(Performative):
"""For containers which do not implement address resolution (and do not admit spontaneous link attachment
from their partners) but are instead only used as consumers of messages, it is unnecessary to provide spurious
detail on the source. For this purpose it is possible to use a 'minimal' target in which all the
fields are left unset.
Expand Down Expand Up @@ -275,3 +255,20 @@ class ApacheFilters(object):
:param list(bytes) capabilities: The extension capabilities the sender supports/desires.
See http://www.amqp.org/specification/1.0/source-capabilities.
"""
_code: int = 0x00000029
_definition: List[Optional[FIELD]] = [
FIELD(AMQPTypes.string, False),
FIELD(AMQPTypes.uint, False),
FIELD(AMQPTypes.symbol, False),
FIELD(AMQPTypes.uint, False),
FIELD(AMQPTypes.boolean, False),
FIELD(FieldDefinition.node_properties, False),
FIELD(AMQPTypes.symbol, True)
]
address: Optional[str] = None
durable: int = TerminusDurability.NoDurability
expiry_policy: bytes = ExpiryPolicy.SessionEnd
timeout: int = 0
dynamic: bool = False
dynamic_node_properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None
capabilities: Optional[List[AnyStr]] = None
22 changes: 12 additions & 10 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#--------------------------------------------------------------------------

from enum import Enum
from collections import namedtuple
from typing import AnyStr, Dict, List, Optional

from .constants import SECURE_PORT, FIELD
from .types import AMQPTypes, FieldDefinition
from .types import AMQP_STRUCTURED_TYPES, AMQPTypes, FieldDefinition, Performative


class ErrorCondition(bytes, Enum):
Expand Down Expand Up @@ -181,14 +181,16 @@ def get_backoff_time(self, settings, error):
return min(settings['max_backoff'], backoff_value)


AMQPError = namedtuple('error', ['condition', 'description', 'info'])
AMQPError.__new__.__defaults__ = (None,) * len(AMQPError._fields)
AMQPError._code = 0x0000001d
AMQPError._definition = (
FIELD('condition', AMQPTypes.symbol, True, None, False),
FIELD('description', AMQPTypes.string, False, None, False),
FIELD('info', FieldDefinition.fields, False, None, False),
)
class AMQPError(Performative):
_code: int = 0x0000001d
_definition: List[FIELD] = [
FIELD(AMQPTypes.symbol, False),
FIELD(AMQPTypes.string, False),
FIELD(FieldDefinition.fields, False),
]
condition: AnyStr
description: Optional[AnyStr] = None
into: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None


class AMQPException(Exception):
Expand Down
Loading

0 comments on commit aeffcb2

Please sign in to comment.