From aeffcb20e5669030e35797848d87bea5086ab87d Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 12 Jul 2022 19:53:15 +1200 Subject: [PATCH] Improved typing --- .../azure/servicebus/_pyamqp/_decode.py | 1 + .../azure/servicebus/_pyamqp/_encode.py | 587 +++++++++++++----- .../azure/servicebus/_pyamqp/constants.py | 3 - .../azure/servicebus/_pyamqp/endpoints.py | 173 +++--- .../azure/servicebus/_pyamqp/error.py | 22 +- .../azure/servicebus/_pyamqp/message.py | 192 +++--- .../azure/servicebus/_pyamqp/outcomes.py | 78 ++- .../azure/servicebus/_pyamqp/performatives.py | 527 ++++++++-------- .../azure/servicebus/_pyamqp/receiver.py | 113 +++- .../azure/servicebus/_pyamqp/types.py | 95 +-- .../azure/servicebus/_pyamqp/utils.py | 139 +++-- 11 files changed, 1136 insertions(+), 794 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py index 53915069be81..db78796197fc 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_decode.py @@ -297,6 +297,7 @@ def decode_frame(data): for i in range(count): buffer, fields[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) if frame_type == 20: + # This is a transfer frame - add the remaining bytes in the buffer as the payload. fields.append(buffer) return frame_type, fields diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_encode.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_encode.py index be24f39a08de..55f987e7cc2c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_encode.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/_encode.py @@ -3,16 +3,26 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- +# pylint: disable=unused-argument import calendar import struct import uuid from datetime import datetime -from typing import Iterable, Union, Tuple, Dict # pylint: disable=unused-import - -import six - -from .types import TYPE, VALUE, AMQPTypes, FieldDefinition, ObjDefinition, ConstructorBytes +from typing import AnyStr, Optional, Union, Tuple, Dict, Literal, List, TypeVar, Type, overload + +from .types import ( + TYPE, + VALUE, + AMQPTypes, + FieldDefinition, + ObjDefinition, + AMQP_STRUCTURED_TYPES, + AMQP_PRIMATIVE_TYPES, + AMQPDefinedType, + AMQPFieldType, + NullDefinedType +) from .message import Header, Properties, Message from . import performatives from . import outcomes @@ -20,313 +30,500 @@ from . import error +ENCODABLE_PRIMATIVE_TYPES = Union[AMQPDefinedType[AMQPTypes, AMQP_PRIMATIVE_TYPES], AMQP_PRIMATIVE_TYPES] +ENCODABLE_TYPES = Union[AMQPDefinedType[AMQPTypes, AMQP_STRUCTURED_TYPES], AMQP_STRUCTURED_TYPES] +ENCODABLE_T = TypeVar('ENCODABLE_T', ENCODABLE_TYPES) +ENCODABLE_P = TypeVar('ENCODABLE_P', ENCODABLE_PRIMATIVE_TYPES) + _FRAME_OFFSET = b"\x02" _FRAME_TYPE = b'\x00' - - -def _construct(byte, construct): - # type: (bytes, bool) -> bytes +_CONSTRUCTOR_NULL = b'\x40' +_CONSTRUCTOR_BOOL = b'\x56' +_CONSTRUCTOR_BOOL_TRUE = b'\x41' +_CONSTRUCTOR_BOOL_FALSE = b'\x42' +_CONSTRUCTOR_UBYTE = b'\x50' +_CONSTRUCTOR_BYTE = b'\x51' +_CONSTRUCTOR_USHORT = b'\x60' +_CONSTRUCTOR_SHORT = b'\x61' +_CONSTRUCTOR_UINT_0 = b'\x43' +_CONSTRUCTOR_UINT_SMALL = b'\x52' +_CONSTRUCTOR_INT_SMALL = b'\x54' +_CONSTRUCTOR_UINT_LARGE = b'\x70' +_CONSTRUCTOR_INT_LARGE = b'\x71' +_CONSTRUCTOR_ULONG_0 = b'\x44' +_CONSTRUCTOR_ULONG_SMALL = b'\x53' +_CONSTRUCTOR_LONG_SMALL = b'\x55' +_CONSTRUCTOR_ULONG_LARGE = b'\x80' +_CONSTRUCTOR_LONG_LARGE = b'\x81' +_CONSTRUCTOR_FLOAT = b'\x72' +_CONSTRUCTOR_DOUBLE = b'\x82' +_CONSTRUCTOR_TIMESTAMP = b'\x83' +_CONSTRUCTOR_UUID = b'\x98' +_CONSTRUCTOR_BINARY_SMALL = b'\xA0' +_CONSTRUCTOR_BINARY_LARGE = b'\xB0' +_CONSTRUCTOR_STRING_SMALL = b'\xA1' +_CONSTRUCTOR_STRING_LARGE = b'\xB1' +_CONSTRUCTOR_SYMBOL_SMALL = b'\xA3' +_CONSTRUCTOR_SYMBOL_LARGE = b'\xB3' +_CONSTRUCTOR_LIST_0 = b'\x45' +_CONSTRUCTOR_LIST_SMALL = b'\xC0' +_CONSTRUCTOR_LIST_LARGE = b'\xD0' +_CONSTRUCTOR_MAP_SMALL = b'\xC1' +_CONSTRUCTOR_MAP_LARGE = b'\xD1' +_CONSTRUCTOR_ARRAY_SMALL = b'\xE0' +_CONSTRUCTOR_ARRAY_LARGE = b'\xF0' +_CONSTRUCTOR_DESCRIPTOR = b'\x00' + + +def _construct(byte: bytes, construct: bool) -> bytes: + """Add the constructor byte if required.""" return byte if construct else b'' -def encode_null(output, *args, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, Any, Any) -> None - """ +def encode_null(output: bytearray, _: Literal[None], **kwargs) -> None: + """Encode a null value. + encoding code="0x40" category="fixed" width="0" label="the null value" + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. """ - output.extend(ConstructorBytes.null) + output.extend(_CONSTRUCTOR_NULL) -def encode_boolean(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, bool, bool, Any) -> None - """ +def encode_boolean(output: bytearray, value: bool, *, with_constructor: bool = True, **kwargs) -> None: + """Encode a boolean value. Optionally this will include a constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param bool value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ value = bool(value) if with_constructor: - output.extend(_construct(ConstructorBytes.bool, with_constructor)) + output.extend(_CONSTRUCTOR_BOOL) output.extend(b'\x01' if value else b'\x00') - return + else: + output.extend(_CONSTRUCTOR_BOOL_TRUE if value else _CONSTRUCTOR_BOOL_FALSE) - output.extend(ConstructorBytes.bool_true if value else ConstructorBytes.bool_false) +def encode_ubyte( + output: bytearray, + value: Union[int, bytes], + *, + with_constructor: bool = True, + **kwargs + ) -> None: + """Encode an unsigned byte value. Optionally this will include the constructor byte. -def encode_ubyte(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, Union[int, bytes], bool, Any) -> None - """ + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param Union[int, bytes] value: The data to encode. Must be 0-255. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ try: value = int(value) except ValueError: value = ord(value) try: - output.extend(_construct(ConstructorBytes.ubyte, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_UBYTE, with_constructor)) output.extend(struct.pack('>B', abs(value))) except struct.error: raise ValueError("Unsigned byte value must be 0-255") -def encode_ushort(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, int, bool, Any) -> None - """ +def encode_ushort(output: bytearray, value: int, *, with_constructor: bool = True, **kwargs) -> None: + """Encode an unsigned short value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. Must be 0-65535. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ value = int(value) try: - output.extend(_construct(ConstructorBytes.ushort, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_USHORT, with_constructor)) output.extend(struct.pack('>H', abs(value))) except struct.error: - raise ValueError("Unsigned byte value must be 0-65535") + raise ValueError("Unsigned short value must be 0-65535") -def encode_uint(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, int, bool, bool) -> None - """ +def encode_uint( + output: bytearray, + value: int, + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode an unsigned int value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ value = int(value) if value == 0: - output.extend(ConstructorBytes.uint_0) + output.extend(_CONSTRUCTOR_UINT_0) return try: if use_smallest and value <= 255: - output.extend(_construct(ConstructorBytes.uint_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_UINT_SMALL, with_constructor)) output.extend(struct.pack('>B', abs(value))) return - output.extend(_construct(ConstructorBytes.uint_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_UINT_LARGE, with_constructor)) output.extend(struct.pack('>I', abs(value))) except struct.error: raise ValueError("Value supplied for unsigned int invalid: {}".format(value)) -def encode_ulong(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, int, bool, bool) -> None - """ +def encode_ulong( + output: bytearray, + value: int, + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode an unsigned long value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 8 bytes. The default is to + use the smallest width possible. """ - try: - value = long(value) - except NameError: - value = int(value) + value = int(value) if value == 0: - output.extend(ConstructorBytes.ulong_0) + output.extend(_CONSTRUCTOR_ULONG_0) return try: if use_smallest and value <= 255: - output.extend(_construct(ConstructorBytes.ulong_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_ULONG_SMALL, with_constructor)) output.extend(struct.pack('>B', abs(value))) return - output.extend(_construct(ConstructorBytes.ulong_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_ULONG_LARGE, with_constructor)) output.extend(struct.pack('>Q', abs(value))) except struct.error: raise ValueError("Value supplied for unsigned long invalid: {}".format(value)) -def encode_byte(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, int, bool, Any) -> None - """ +def encode_byte(output: bytearray, value: int, *, with_constructor: bool = True, **kwargs) -> None: + """Encode a byte value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. Must be -128-127. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ value = int(value) try: - output.extend(_construct(ConstructorBytes.byte, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_BYTE, with_constructor)) output.extend(struct.pack('>b', value)) except struct.error: raise ValueError("Byte value must be -128-127") -def encode_short(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, int, bool, Any) -> None - """ +def encode_short(output: bytearray, value: int, *, with_constructor: bool = True, **kwargs) -> None: + """Encode a short value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. Must be -32768-32767. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ value = int(value) try: - output.extend(_construct(ConstructorBytes.short, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_SHORT, with_constructor)) output.extend(struct.pack('>h', value)) except struct.error: raise ValueError("Short value must be -32768-32767") -def encode_int(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, int, bool, bool) -> None - """ +def encode_int( + output: bytearray, + value: int, + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode an int value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ value = int(value) try: if use_smallest and (-128 <= value <= 127): - output.extend(_construct(ConstructorBytes.int_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_INT_SMALL, with_constructor)) output.extend(struct.pack('>b', value)) return - output.extend(_construct(ConstructorBytes.int_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_INT_LARGE, with_constructor)) output.extend(struct.pack('>i', value)) except struct.error: raise ValueError("Value supplied for int invalid: {}".format(value)) -def encode_long(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, int, bool, bool) -> None - """ +def encode_long( + output: bytearray, + value: int, + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode a long value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param int value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 8 bytes. The default is to + use the smallest width possible. """ - try: - value = long(value) - except NameError: - value = int(value) + value = int(value) try: if use_smallest and (-128 <= value <= 127): - output.extend(_construct(ConstructorBytes.long_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_LONG_SMALL, with_constructor)) output.extend(struct.pack('>b', value)) return - output.extend(_construct(ConstructorBytes.long_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_LONG_LARGE, with_constructor)) output.extend(struct.pack('>q', value)) except struct.error: raise ValueError("Value supplied for long invalid: {}".format(value)) -def encode_float(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, float, bool, Any) -> None - """ + +def encode_float(output: bytearray, value: float, *, with_constructor: bool = True, **kwargs) -> None: + """Encode a float value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param float value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ value = float(value) - output.extend(_construct(ConstructorBytes.float, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_FLOAT, with_constructor)) output.extend(struct.pack('>f', value)) -def encode_double(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, float, bool, Any) -> None - """ +def encode_double(output: bytearray, value: float, *, with_constructor: bool = True, **kwargs) -> None: + """Encode a double value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param float value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ value = float(value) - output.extend(_construct(ConstructorBytes.double, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_DOUBLE, with_constructor)) output.extend(struct.pack('>d', value)) -def encode_timestamp(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, Union[int, datetime], bool, Any) -> None - """ +def encode_timestamp( + output: bytearray, + value: Union[int, datetime], + *, + with_constructor: bool = True, + **kwargs + ) -> None: + """Encode a timestamp value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param Union[int, ~datetime.datetime] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ if isinstance(value, datetime): value = (calendar.timegm(value.utctimetuple()) * 1000) + (value.microsecond/1000) value = int(value) - output.extend(_construct(ConstructorBytes.timestamp, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_TIMESTAMP, with_constructor)) output.extend(struct.pack('>q', value)) -def encode_uuid(output, value, with_constructor=True, **kwargs): # pylint: disable=unused-argument - # type: (bytearray, Union[uuid.UUID, str, bytes], bool, Any) -> None - """ +def encode_uuid( + output: bytearray, + value: Union[str, bytes, uuid.UUID], + *, + with_constructor: bool = True, + **kwargs + ) -> None: + """Encode a UUID value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param Union[str, bytes, ~uuid.UUID] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. """ - if isinstance(value, six.text_type): + if isinstance(value, str): value = uuid.UUID(value).bytes elif isinstance(value, uuid.UUID): value = value.bytes - elif isinstance(value, six.binary_type): + elif isinstance(value, bytes): value = uuid.UUID(bytes=value).bytes else: raise TypeError("Invalid UUID type: {}".format(type(value))) - output.extend(_construct(ConstructorBytes.uuid, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_UUID, with_constructor)) output.extend(value) -def encode_binary(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, Union[bytes, bytearray], bool, bool) -> None - """ +def encode_binary( + output: bytearray, + value: Union[bytes, bytearray], + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode a binary value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param Union[bytes, bytearray] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ length = len(value) if use_smallest and length <= 255: - output.extend(_construct(ConstructorBytes.binary_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_BINARY_SMALL, with_constructor)) output.extend(struct.pack('>B', length)) output.extend(value) return try: - output.extend(_construct(ConstructorBytes.binary_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_BINARY_LARGE, with_constructor)) output.extend(struct.pack('>L', length)) output.extend(value) except struct.error: raise ValueError("Binary data to long to encode") -def encode_string(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, Union[bytes, str], bool, bool) -> None - """ +def encode_string( + output: bytearray, + value: Union[bytes, str], + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode a string value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param Union[bytes, str] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ - if isinstance(value, six.text_type): + if isinstance(value, str): value = value.encode('utf-8') length = len(value) if use_smallest and length <= 255: - output.extend(_construct(ConstructorBytes.string_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_STRING_SMALL, with_constructor)) output.extend(struct.pack('>B', length)) output.extend(value) return try: - output.extend(_construct(ConstructorBytes.string_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_STRING_LARGE, with_constructor)) output.extend(struct.pack('>L', length)) output.extend(value) except struct.error: raise ValueError("String value too long to encode.") -def encode_symbol(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, Union[bytes, str], bool, bool) -> None - """ +def encode_symbol( + output: bytearray, + value: Union[bytes, str], + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode a symbol value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param Union[bytes, str] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ - if isinstance(value, six.text_type): + if isinstance(value, str): value = value.encode('utf-8') length = len(value) if use_smallest and length <= 255: - output.extend(_construct(ConstructorBytes.symbol_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_SYMBOL_SMALL, with_constructor)) output.extend(struct.pack('>B', length)) output.extend(value) return try: - output.extend(_construct(ConstructorBytes.symbol_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_SYMBOL_LARGE, with_constructor)) output.extend(struct.pack('>L', length)) output.extend(value) except struct.error: raise ValueError("Symbol value too long to encode.") -def encode_list(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, Iterable[Any], bool, bool) -> None - """ +def encode_list( + output: bytearray, + value: List[ENCODABLE_TYPES], + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode a list value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param List[ENCODABLE_TYPES] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ count = len(value) if use_smallest and count == 0: - output.extend(ConstructorBytes.list_0) + output.extend(_CONSTRUCTOR_LIST_0) return encoded_size = 0 encoded_values = bytearray() @@ -334,12 +531,12 @@ def encode_list(output, value, with_constructor=True, use_smallest=True): encode_value(encoded_values, item, with_constructor=True) encoded_size += len(encoded_values) if use_smallest and count <= 255 and encoded_size < 255: - output.extend(_construct(ConstructorBytes.list_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_LIST_SMALL, with_constructor)) output.extend(struct.pack('>B', encoded_size + 1)) output.extend(struct.pack('>B', count)) else: try: - output.extend(_construct(ConstructorBytes.list_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_LIST_LARGE, with_constructor)) output.extend(struct.pack('>L', encoded_size + 4)) output.extend(struct.pack('>L', count)) except struct.error: @@ -347,13 +544,26 @@ def encode_list(output, value, with_constructor=True, use_smallest=True): output.extend(encoded_values) -def encode_map(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, Union[Dict[Any, Any], Iterable[Tuple[Any, Any]]], bool, bool) -> None - """ +def encode_map( + output: bytearray, + value: Union[Dict[ENCODABLE_TYPES, ENCODABLE_TYPES], List[Tuple[ENCODABLE_TYPES, ENCODABLE_TYPES]]], + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode a map value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param value: The data to encode. + :paramtype value: Union[Dict[ENCODABLE_TYPES, ENCODABLE_TYPES], List[Tuple[ENCODABLE_TYPES, ENCODABLE_TYPES]]] + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ count = len(value) * 2 encoded_size = 0 @@ -367,21 +577,29 @@ def encode_map(output, value, with_constructor=True, use_smallest=True): encode_value(encoded_values, data, with_constructor=True) encoded_size = len(encoded_values) if use_smallest and count <= 255 and encoded_size < 255: - output.extend(_construct(ConstructorBytes.map_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_MAP_SMALL, with_constructor)) output.extend(struct.pack('>B', encoded_size + 1)) output.extend(struct.pack('>B', count)) else: try: - output.extend(_construct(ConstructorBytes.map_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_MAP_LARGE, with_constructor)) output.extend(struct.pack('>L', encoded_size + 4)) output.extend(struct.pack('>L', count)) except struct.error: raise ValueError("Map is too large or too long to be encoded.") output.extend(encoded_values) - return -def _check_element_type(item, element_type): +def _check_element_type(item: ENCODABLE_T, element_type: Optional[Type[ENCODABLE_T]]) -> Type[ENCODABLE_T]: + """Validate the an item in the array is consistent with the other array items. + + This method will be called on every item in the array. For the first item, it + will determine the type, and that will be used to validate all subsequent items. + + :param item: An item in the array. + :param element_type: The class type of previous items in the array to validate. + :returns: The classtype of the array item. + """ if not element_type: try: return item['TYPE'] @@ -396,19 +614,31 @@ def _check_element_type(item, element_type): return element_type -def encode_array(output, value, with_constructor=True, use_smallest=True): - # type: (bytearray, Iterable[Any], bool, bool) -> None - """ +def encode_array( + output: bytearray, + value: List[ENCODABLE_TYPES], + *, + with_constructor: bool = True, + use_smallest: bool = True + ) -> None: + """Encode an array value. Optionally this will include the constructor byte. + + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param List[ENCODABLE_TYPES] value: The data to encode. + :keyword bool with_constructor: Whether to include the constructor byte. Default is True. + :keyword bool use_smallest: Whether to encode a value with 1 bytes or 4 bytes. The default is to + use the smallest width possible. """ count = len(value) encoded_size = 0 encoded_values = bytearray() - first_item = True - element_type = None + first_item = True # Only the first item in an array has a constructor byte. + element_type = None # Arrays must be homogeneous, so we enforce consistent content type. for item in value: element_type = _check_element_type(item, element_type) encode_value(encoded_values, item, with_constructor=first_item, use_smallest=False) @@ -418,12 +648,12 @@ def encode_array(output, value, with_constructor=True, use_smallest=True): break encoded_size += len(encoded_values) if use_smallest and count <= 255 and encoded_size < 255: - output.extend(_construct(ConstructorBytes.array_small, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_ARRAY_SMALL, with_constructor)) output.extend(struct.pack('>B', encoded_size + 1)) output.extend(struct.pack('>B', count)) else: try: - output.extend(_construct(ConstructorBytes.array_large, with_constructor)) + output.extend(_construct(_CONSTRUCTOR_ARRAY_LARGE, with_constructor)) output.extend(struct.pack('>L', encoded_size + 4)) output.extend(struct.pack('>L', count)) except struct.error: @@ -431,15 +661,26 @@ def encode_array(output, value, with_constructor=True, use_smallest=True): output.extend(encoded_values) -def encode_described(output, value, _=None, **kwargs): - # type: (bytearray, Tuple(Any, Any), bool, Any) -> None - output.extend(ConstructorBytes.descriptor) +def encode_described( + output: bytearray, + value: Tuple[ENCODABLE_TYPES, ENCODABLE_TYPES], + **kwargs + ) -> None: + """Encode a described value. + + :param bytearray output: The bytes encoded so far. The newly encoded value will be appended. + :param value: The data to encode. This is a tuple of two values, the descriptor (usually symbol + or ulong) and the described. + :paramtype value: Tuple[ENCODABLE_TYPES, ENCODABLE_TYPES] + """ + output.extend(_CONSTRUCTOR_DESCRIPTOR) encode_value(output, value[0], **kwargs) encode_value(output, value[1], **kwargs) -def encode_fields(value): - # type: (Optional[Dict[str, Any]]) -> Dict[str, Any] +def encode_fields( + value: Optional[Dict[AnyStr, ENCODABLE_T]] + ) -> Union[NullDefinedType, AMQPFieldType[ENCODABLE_T]]: """A mapping from field name to value. The fields type is a map where the keys are restricted to be of type symbol (this excludes the possibility @@ -447,19 +688,25 @@ def encode_fields(value): entries or the set of allowed keys. + + :param value: The optional dictionary to be encoded as fields. Keys must be string or + bytes. If empty or None, a null value will be encoded. + :paramtype value: Optional[Dict[Union[str, bytes], ENCODABLE_TYPES]] + :returns: An encoded mapping of symbols to AMQP types. """ if not value: return {TYPE: AMQPTypes.null, VALUE: None} fields = {TYPE: AMQPTypes.map, VALUE:[]} for key, data in value.items(): - if isinstance(key, six.text_type): + if isinstance(key, str): key = key.encode('utf-8') fields[VALUE].append(({TYPE: AMQPTypes.symbol, VALUE: key}, data)) return fields -def encode_annotations(value): - # type: (Optional[Dict[str, Any]]) -> Dict[str, Any] +def encode_annotations( + value: Optional[Dict[Union[int, AnyStr], ENCODABLE_T]] + ): """The annotations type is a map where the keys are restricted to be of type symbol or of type ulong. All ulong keys, and all symbolic keys except those beginning with "x-" are reserved. @@ -468,6 +715,11 @@ def encode_annotations(value): amqp-error. + + :param value: The optional dictionary to be encoded as annotations. Keys must be int, string or + bytes. If empty or None, a null value will be encoded. + :paramtype value: Optional[Dict[Union[int, str, bytes], ENCODABLE_TYPES]] + :returns: An encoded mapping of symbols or ulong to AMQP types. """ if not value: return {TYPE: AMQPTypes.null, VALUE: None} @@ -484,8 +736,9 @@ def encode_annotations(value): return fields -def encode_application_properties(value): - # type: (Optional[Dict[str, Any]]) -> Dict[str, Any] +def encode_application_properties( + value: Optional[Dict[Union[str, bytes], ENCODABLE_P]] + ): """The application-properties section is a part of the bare message used for structured application data. @@ -495,6 +748,11 @@ def encode_application_properties(value): Intermediaries may use the data within this structure for the purposes of filtering or routing. The keys of this map are restricted to be of type string (which excludes the possibility of a null key) and the values are restricted to be of simple types only, that is (excluding map, list, and array types). + + :param value: The optional dictionary to be encoded as fields. Keys must be string or + bytes. Values must be AMQP primitive types. If empty or None, a null value will be encoded. + :paramtype value: Optional[Dict[Union[str, bytes], ENCODABLE_TYPES]] + :returns: An encoded mapping of strings to AMQP primitive types. """ if not value: return {TYPE: AMQPTypes.null, VALUE: None} @@ -503,28 +761,46 @@ def encode_application_properties(value): fields[VALUE].append(({TYPE: AMQPTypes.string, VALUE: key}, data)) return fields +@overload +def encode_message_id(value: str) -> AMQPDefinedType[Literal[AMQPTypes.string], str]: + ... +@overload +def encode_message_id(value: bytes) -> AMQPDefinedType[Literal[AMQPTypes.binary], bytes]: + ... +@overload +def encode_message_id(value: uuid.uuid.UUID) -> AMQPDefinedType[Literal[AMQPTypes.uuid], uuid.uuid.UUID]: + ... +@overload +def encode_message_id(value: int) -> AMQPDefinedType[Literal[AMQPTypes.ulong], int]: + ... +def encode_message_id( + value: Union[str, bytes, uuid.UUID, int] + ) -> AMQPDefinedType[AMQPTypes, Union[str, bytes, uuid.UUID, int]]: + """Encode a message ID value. -def encode_message_id(value): - # type: (Any) -> Dict[str, Union[int, uuid.UUID, bytes, str]] - """ + + :param value: The Message ID value. This must be a string, bytes, UUID or int. Note that + in this case string and bytes will be encoded differently - as string and binary respectively. + :returns: An encoded mapping according to the input primitive type. """ if isinstance(value, int): return {TYPE: AMQPTypes.ulong, VALUE: value} elif isinstance(value, uuid.UUID): return {TYPE: AMQPTypes.uuid, VALUE: value} - elif isinstance(value, six.binary_type): + elif isinstance(value, bytes): return {TYPE: AMQPTypes.binary, VALUE: value} - elif isinstance(value, six.text_type): + elif isinstance(value, str): return {TYPE: AMQPTypes.string, VALUE: value} raise TypeError("Unsupported Message ID type.") -def encode_node_properties(value): - # type: (Optional[Dict[str, Any]]) -> Dict[str, Any] +def encode_node_properties( + value: Optional[Dict[AnyStr, ENCODABLE_T]] + ) -> Union[NullDefinedType, AMQPFieldType[ENCODABLE_T]]: """Properties of a node. @@ -559,7 +835,6 @@ def encode_node_properties(value): def encode_filter_set(value): - # type: (Optional[Dict[str, Any]]) -> Dict[str, Any] """A set of predicates to filter the Messages admitted onto the Link. @@ -580,7 +855,7 @@ def encode_filter_set(value): if data is None: described_filter = {TYPE: AMQPTypes.null, VALUE: None} else: - if isinstance(name, six.text_type): + if isinstance(name, str): name = name.encode('utf-8') descriptor, filter_value = data described_filter = { @@ -594,24 +869,21 @@ def encode_filter_set(value): return fields -def encode_unknown(output, value, **kwargs): - # type: (bytearray, Optional[Any], Any) -> None - """ - Dynamic encoding according to the type of `value`. - """ +def encode_unknown(output: bytearray, value: AMQP_STRUCTURED_TYPES, **kwargs) -> None: + """Dynamic encoding according to the type of `value`.""" if value is None: encode_null(output, **kwargs) elif isinstance(value, bool): encode_boolean(output, value, **kwargs) - elif isinstance(value, six.string_types): + elif isinstance(value, str): encode_string(output, value, **kwargs) elif isinstance(value, uuid.UUID): encode_uuid(output, value, **kwargs) - elif isinstance(value, (bytearray, six.binary_type)): + elif isinstance(value, (bytearray, bytes)): encode_binary(output, value, **kwargs) elif isinstance(value, float): encode_double(output, value, **kwargs) - elif isinstance(value, six.integer_types): + elif isinstance(value, int): encode_int(output, value, **kwargs) elif isinstance(value, datetime): encode_timestamp(output, value, **kwargs) @@ -660,16 +932,15 @@ def encode_unknown(output, value, **kwargs): } -def encode_value(output, value, **kwargs): - # type: (bytearray, Any, Any) -> None +def encode_value(output: bytearray, value: ENCODABLE_TYPES, **kwargs) -> None: + """Encode a value.""" try: _ENCODE_MAP[value[TYPE]](output, value[VALUE], **kwargs) except (KeyError, TypeError): encode_unknown(output, value, **kwargs) -def describe_performative(performative): - # type: (Performative) -> Tuple(bytes, bytes) +def describe_performative(performative: performatives.Performative): body = [] for index, value in enumerate(performative): field = performative._definition[index] @@ -699,9 +970,8 @@ def describe_performative(performative): } -def encode_payload(output, payload): - # type: (bytearray, Message) -> bytes - +def encode_payload(output: bytearray, payload: Message) -> bytearray: + """Encode a Message as payload bytes.""" if payload[0]: # header # TODO: Header and Properties encoding can be optimized to # 1. not encoding trailing None fields @@ -787,8 +1057,11 @@ def encode_payload(output, payload): return output -def encode_frame(frame, frame_type=_FRAME_TYPE): - # type: (Performative) -> Tuple(bytes, bytes) +def encode_frame( + frame: performatives.Performative, + frame_type: bytes = _FRAME_TYPE + ) -> Tuple[bytes, Optional[bytes]]: + """Encode a frame.""" # TODO: allow passing type specific bytes manually, e.g. Empty Frame needs padding if frame is None: size = 8 diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/constants.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/constants.py index 2fab3c76de7e..80f6bd53d389 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/constants.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/constants.py @@ -3,7 +3,6 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- -from collections import namedtuple from enum import Enum import struct @@ -64,8 +63,6 @@ DEFAULT_LINK_CREDIT = 10000 -FIELD = namedtuple('field', 'name, type, mandatory, default, multiple') - STRING_FILTER = b"apache.org:selector-filter:string" DEFAULT_AUTH_TIMEOUT = 60 diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/endpoints.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/endpoints.py index c68cc05c3d6f..4d84ac7f755e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/endpoints.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/endpoints.py @@ -14,14 +14,21 @@ # - the behavior of Messages which have been transferred on the Link, but have not yet reached a # terminal state at the receiver, when the source is destroyed. -from collections import namedtuple +from enum import IntEnum, Enum +from typing import AnyStr, Dict, List, Optional, Tuple -from .types import AMQPTypes, FieldDefinition, ObjDefinition -from .constants import FIELD -from .performatives import _CAN_ADD_DOCSTRING +from .outcomes import SETTLEMENT_TYPES +from .types import ( + AMQPTypes, + FieldDefinition, + ObjDefinition, + FIELD, + Performative, + AMQP_STRUCTURED_TYPES +) -class TerminusDurability(object): +class TerminusDurability(IntEnum): """Durability policy for a terminus. @@ -33,15 +40,15 @@ class TerminusDurability(object): Determines which state of the terminus is held durably. """ #: No Terminus state is retained durably - NoDurability = 0 + NoDurability: int = 0 #: Only the existence and configuration of the Terminus is retained durably. - Configuration = 1 + Configuration: int = 1 #: In addition to the existence and configuration of the Terminus, the unsettled state for durable #: messages is retained durably. - UnsettledState = 2 + UnsettledState: int = 2 -class ExpiryPolicy(object): +class ExpiryPolicy(bytes, Enum): """Expiry policy for a terminus. @@ -57,16 +64,16 @@ class ExpiryPolicy(object): re-met, the expiry timer restarts from its originally configured timeout value. """ #: The expiry timer starts when Terminus is detached. - LinkDetach = b"link-detach" + LinkDetach: bytes = b"link-detach" #: The expiry timer starts when the most recently associated session is ended. - SessionEnd = b"session-end" + SessionEnd: bytes = b"session-end" #: The expiry timer starts when most recently associated connection is closed. - ConnectionClose = b"connection-close" + ConnectionClose: bytes = b"connection-close" #: The Terminus never expires. - Never = b"never" + Never: bytes = b"never" -class DistributionMode(object): +class DistributionMode(bytes, Enum): """Link distribution policy. @@ -78,87 +85,57 @@ class DistributionMode(object): """ #: Once successfully transferred over the link, the message will no longer be available #: to other links from the same node. - Move = b'move' + Move: bytes = b'move' #: Once successfully transferred over the link, the message is still available for other #: links from the same node. - Copy = b'copy' + Copy: bytes = b'copy' -class LifeTimePolicy(object): +class LifeTimePolicy(IntEnum): #: Lifetime of dynamic node scoped to lifetime of link which caused creation. #: A node dynamically created with this lifetime policy will be deleted at the point that the link #: which caused its creation ceases to exist. - DeleteOnClose = 0x0000002b + DeleteOnClose: int = 0x0000002b #: Lifetime of dynamic node scoped to existence of links to the node. #: A node dynamically created with this lifetime policy will be deleted at the point that there remain #: no links for which the node is either the source or target. - DeleteOnNoLinks = 0x0000002c + DeleteOnNoLinks: int = 0x0000002c #: Lifetime of dynamic node scoped to existence of messages on the node. #: A node dynamically created with this lifetime policy will be deleted at the point that the link which #: caused its creation no longer exists and there remain no messages at the node. - DeleteOnNoMessages = 0x0000002d + DeleteOnNoMessages: int = 0x0000002d #: Lifetime of node scoped to existence of messages on or links to the node. #: A node dynamically created with this lifetime policy will be deleted at the point that the there are no #: links which have this node as their source or target, and there remain no messages at the node. - DeleteOnNoLinksOrMessages = 0x0000002e + DeleteOnNoLinksOrMessages: int = 0x0000002e -class SupportedOutcomes(object): +class SupportedOutcomes(bytes, Enum): #: Indicates successful processing at the receiver. - accepted = b"amqp:accepted:list" + accepted: bytes = b"amqp:accepted:list" #: Indicates an invalid and unprocessable message. - rejected = b"amqp:rejected:list" + rejected: bytes = b"amqp:rejected:list" #: Indicates that the message was not (and will not be) processed. - released = b"amqp:released:list" + released: bytes = b"amqp:released:list" #: Indicates that the message was modified, but not processed. - modified = b"amqp:modified:list" + modified: bytes = b"amqp:modified:list" -class ApacheFilters(object): +class ApacheFilters(bytes, Enum): #: Exact match on subject - analogous to legacy AMQP direct exchange bindings. - legacy_amqp_direct_binding = b"apache.org:legacy-amqp-direct-binding:string" + legacy_amqp_direct_binding: bytes = b"apache.org:legacy-amqp-direct-binding:string" #: Pattern match on subject - analogous to legacy AMQP topic exchange bindings. - legacy_amqp_topic_binding = b"apache.org:legacy-amqp-topic-binding:string" + legacy_amqp_topic_binding: bytes = b"apache.org:legacy-amqp-topic-binding:string" #: Matching on message headers - analogous to legacy AMQP headers exchange bindings. - legacy_amqp_headers_binding = b"apache.org:legacy-amqp-headers-binding:map" + legacy_amqp_headers_binding: bytes = b"apache.org:legacy-amqp-headers-binding:map" #: Filter out messages sent from the same connection as the link is currently associated with. - no_local_filter = b"apache.org:no-local-filter:list" + no_local_filter: bytes = b"apache.org:no-local-filter:list" #: SQL-based filtering syntax. - selector_filter = b"apache.org:selector-filter:string" + selector_filter: bytes = b"apache.org:selector-filter:string" -Source = namedtuple( - 'source', - [ - 'address', - 'durable', - 'expiry_policy', - 'timeout', - 'dynamic', - 'dynamic_node_properties', - 'distribution_mode', - 'filters', - 'default_outcome', - 'outcomes', - 'capabilities' - ]) -Source.__new__.__defaults__ = (None,) * len(Source._fields) -Source._code = 0x00000028 -Source._definition = ( - FIELD("address", AMQPTypes.string, False, None, False), - FIELD("durable", AMQPTypes.uint, False, "none", False), - FIELD("expiry_policy", AMQPTypes.symbol, False, ExpiryPolicy.SessionEnd, False), - FIELD("timeout", AMQPTypes.uint, False, 0, False), - FIELD("dynamic", AMQPTypes.boolean, False, False, False), - FIELD("dynamic_node_properties", FieldDefinition.node_properties, False, None, False), - FIELD("distribution_mode", AMQPTypes.symbol, False, None, False), - FIELD("filters", FieldDefinition.filter_set, False, None, False), - FIELD("default_outcome", ObjDefinition.delivery_state, False, None, False), - FIELD("outcomes", AMQPTypes.symbol, False, None, True), - FIELD("capabilities", AMQPTypes.symbol, False, None, True)) -if _CAN_ADD_DOCSTRING: - Source.__doc__ = """ - For containers which do not implement address resolution (and do not admit spontaneous link +class Source(Performative): + """For containers which do not implement address resolution (and do not admit spontaneous link attachment from their partners) but are instead only used as producers of messages, it is unnecessary to provide spurious detail on the source. For this purpose it is possible to use a "minimal" source in which all the fields are left unset. @@ -214,32 +191,35 @@ class ApacheFilters(object): :param list(bytes) capabilities: The extension capabilities the sender supports/desires. See http://www.amqp.org/specification/1.0/source-capabilities. """ + _code: int = 0x00000028 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.symbol, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(FieldDefinition.node_properties, False), + FIELD(AMQPTypes.symbol, False), + FIELD(FieldDefinition.filter_set, False), + FIELD(ObjDefinition.delivery_state, False), + FIELD(AMQPTypes.symbol, True), + FIELD(AMQPTypes.symbol, True) + ] + address: Optional[str] = None + durable: int = TerminusDurability.NoDurability + expiry_policy: bytes = ExpiryPolicy.SessionEnd + timeout: int = 0 + dynamic: bool = False + dynamic_node_properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + distribution_mode: Optional[bytes] = None + filters: Optional[Dict[AnyStr, Optional[Tuple[AnyStr, AMQP_STRUCTURED_TYPES]]]] = None + default_outcome: Optional[SETTLEMENT_TYPES] = None + outcomes: Optional[List[AnyStr]] = None + capabilities: Optional[List[AnyStr]] = None -Target = namedtuple( - 'target', - [ - 'address', - 'durable', - 'expiry_policy', - 'timeout', - 'dynamic', - 'dynamic_node_properties', - 'capabilities' - ]) -Target._code = 0x00000029 -Target.__new__.__defaults__ = (None,) * len(Target._fields) -Target._definition = ( - FIELD("address", AMQPTypes.string, False, None, False), - FIELD("durable", AMQPTypes.uint, False, "none", False), - FIELD("expiry_policy", AMQPTypes.symbol, False, ExpiryPolicy.SessionEnd, False), - FIELD("timeout", AMQPTypes.uint, False, 0, False), - FIELD("dynamic", AMQPTypes.boolean, False, False, False), - FIELD("dynamic_node_properties", FieldDefinition.node_properties, False, None, False), - FIELD("capabilities", AMQPTypes.symbol, False, None, True)) -if _CAN_ADD_DOCSTRING: - Target.__doc__ = """ - For containers which do not implement address resolution (and do not admit spontaneous link attachment +class Target(Performative): + """For containers which do not implement address resolution (and do not admit spontaneous link attachment from their partners) but are instead only used as consumers of messages, it is unnecessary to provide spurious detail on the source. For this purpose it is possible to use a 'minimal' target in which all the fields are left unset. @@ -275,3 +255,20 @@ class ApacheFilters(object): :param list(bytes) capabilities: The extension capabilities the sender supports/desires. See http://www.amqp.org/specification/1.0/source-capabilities. """ + _code: int = 0x00000029 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.symbol, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(FieldDefinition.node_properties, False), + FIELD(AMQPTypes.symbol, True) + ] + address: Optional[str] = None + durable: int = TerminusDurability.NoDurability + expiry_policy: bytes = ExpiryPolicy.SessionEnd + timeout: int = 0 + dynamic: bool = False + dynamic_node_properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + capabilities: Optional[List[AnyStr]] = None diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/error.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/error.py index fc2b8cbfe5dc..248c2d6830ba 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/error.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/error.py @@ -5,10 +5,10 @@ #-------------------------------------------------------------------------- from enum import Enum -from collections import namedtuple +from typing import AnyStr, Dict, List, Optional from .constants import SECURE_PORT, FIELD -from .types import AMQPTypes, FieldDefinition +from .types import AMQP_STRUCTURED_TYPES, AMQPTypes, FieldDefinition, Performative class ErrorCondition(bytes, Enum): @@ -181,14 +181,16 @@ def get_backoff_time(self, settings, error): return min(settings['max_backoff'], backoff_value) -AMQPError = namedtuple('error', ['condition', 'description', 'info']) -AMQPError.__new__.__defaults__ = (None,) * len(AMQPError._fields) -AMQPError._code = 0x0000001d -AMQPError._definition = ( - FIELD('condition', AMQPTypes.symbol, True, None, False), - FIELD('description', AMQPTypes.string, False, None, False), - FIELD('info', FieldDefinition.fields, False, None, False), -) +class AMQPError(Performative): + _code: int = 0x0000001d + _definition: List[FIELD] = [ + FIELD(AMQPTypes.symbol, False), + FIELD(AMQPTypes.string, False), + FIELD(FieldDefinition.fields, False), + ] + condition: AnyStr + description: Optional[AnyStr] = None + into: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None class AMQPException(Exception): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/message.py index a2ef0087fd94..c660f51904eb 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/message.py @@ -4,33 +4,18 @@ # license information. #-------------------------------------------------------------------------- -from collections import namedtuple +from uuid import UUID +from datetime import datetime +from typing import AnyStr, Dict, List, NamedTuple, Optional, Union -from .types import AMQPTypes, FieldDefinition +from .types import AMQPTypes, FieldDefinition, AMQP_STRUCTURED_TYPES, AMQP_PRIMATIVE_TYPES from .constants import FIELD, MessageDeliveryState from .performatives import _CAN_ADD_DOCSTRING +from .error import AMQPError -Header = namedtuple( - 'header', - [ - 'durable', - 'priority', - 'ttl', - 'first_acquirer', - 'delivery_count' - ]) -Header._code = 0x00000070 -Header.__new__.__defaults__ = (None,) * len(Header._fields) -Header._definition = ( - FIELD("durable", AMQPTypes.boolean, False, None, False), - FIELD("priority", AMQPTypes.ubyte, False, None, False), - FIELD("ttl", AMQPTypes.uint, False, None, False), - FIELD("first_acquirer", AMQPTypes.boolean, False, None, False), - FIELD("delivery_count", AMQPTypes.uint, False, None, False)) -if _CAN_ADD_DOCSTRING: - Header.__doc__ = """ - Transport headers for a Message. +class Header(NamedTuple): + """Transport headers for a Message. The header section carries standard delivery details about the transfer of a Message through the AMQP network. If the header section is omitted the receiver MUST assume the appropriate default values for @@ -72,44 +57,23 @@ be taken as an indication that the delivery may be a duplicate. On first delivery, the value is zero. It is incremented upon an outcome being settled at the sender, according to rules defined for each outcome. """ - - -Properties = namedtuple( - 'properties', - [ - 'message_id', - 'user_id', - 'to', - 'subject', - 'reply_to', - 'correlation_id', - 'content_type', - 'content_encoding', - 'absolute_expiry_time', - 'creation_time', - 'group_id', - 'group_sequence', - 'reply_to_group_id' - ]) -Properties._code = 0x00000073 -Properties.__new__.__defaults__ = (None,) * len(Properties._fields) -Properties._definition = ( - FIELD("message_id", FieldDefinition.message_id, False, None, False), - FIELD("user_id", AMQPTypes.binary, False, None, False), - FIELD("to", AMQPTypes.string, False, None, False), - FIELD("subject", AMQPTypes.string, False, None, False), - FIELD("reply_to", AMQPTypes.string, False, None, False), - FIELD("correlation_id", FieldDefinition.message_id, False, None, False), - FIELD("content_type", AMQPTypes.symbol, False, None, False), - FIELD("content_encoding", AMQPTypes.symbol, False, None, False), - FIELD("absolute_expiry_time", AMQPTypes.timestamp, False, None, False), - FIELD("creation_time", AMQPTypes.timestamp, False, None, False), - FIELD("group_id", AMQPTypes.string, False, None, False), - FIELD("group_sequence", AMQPTypes.uint, False, None, False), - FIELD("reply_to_group_id", AMQPTypes.string, False, None, False)) -if _CAN_ADD_DOCSTRING: - Properties.__doc__ = """ - Immutable properties of the Message. + _code: int = 0x00000070 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.ubyte, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.uint, False) + ] + durable: Optional[bool] = None + priority: Optional[int] = None + ttl: Optional[int] = None + first_acquirer: Optional[bool] = None + delivery_count: Optional[int] = None + + +class Properties(NamedTuple): + """Immutable properties of the Message. The properties section is used for a defined set of standard properties of the message. The properties section is part of the bare message and thus must, if retransmitted by an intermediary, remain completely @@ -120,18 +84,20 @@ The Message producer is usually responsible for setting the message-id in such a way that it is assured to be globally unique. A broker MAY discard a Message as a duplicate if the value of the message-id matches that of a previously received Message sent to the same Node. + :paramtype message_id: str or bytes or int or ~uuid.UUID :param bytes user_id: Creating user id. The identity of the user responsible for producing the Message. The client sets this value, and it MAY be authenticated by intermediaries. - :param to: The address of the Node the Message is destined for. + :param str to: The address of the Node the Message is destined for. The to field identifies the Node that is the intended destination of the Message. On any given transfer this may not be the Node at the receiving end of the Link. :param str subject: The subject of the message. A common field for summary information about the Message content and purpose. - :param reply_to: The Node to send replies to. + :param str reply_to: The Node to send replies to. The address of the Node to send replies to. :param correlation_id: Application correlation identifier. This is a client-specific id that may be used to mark or identify Messages between clients. + :paramtype correlation_id: str or bytes or int or ~uuid.UUID :param bytes content_type: MIME content type. The RFC-2046 MIME type for the Message's application-data section (body). As per RFC-2046 this may contain a charset parameter defining the character encoding used: e.g. 'text/plain; charset="utf-8"'. @@ -151,9 +117,9 @@ encoding, except as to remain compatible with messages originally sent with other protocols, e.g. HTTP or SMTP. Implementations SHOULD NOT specify multiple content encoding values except as to be compatible with messages originally sent with other protocols, e.g. HTTP or SMTP. - :param datetime absolute_expiry_time: The time when this message is considered expired. + :param ~datetime.datetime absolute_expiry_time: The time when this message is considered expired. An absolute time when this message is considered to be expired. - :param datetime creation_time: The time when this message was created. + :param ~datetime.datetime creation_time: The time when this message was created. An absolute time when this message was created. :param str group_id: The group this message belongs to. Identifies the group the message belongs to. @@ -162,38 +128,42 @@ :param str reply_to_group_id: The group the reply message belongs to. This is a client-specific id that is used so that client can send replies to this message to a specific group. """ - -# TODO: should be a class, namedtuple or dataclass, immutability vs performance, need to collect performance data -Message = namedtuple( - 'message', - [ - 'header', - 'delivery_annotations', - 'message_annotations', - 'properties', - 'application_properties', - 'data', - 'sequence', - 'value', - 'footer', - ]) -Message.__new__.__defaults__ = (None,) * len(Message._fields) -Message._code = 0 -Message._definition = ( - (0x00000070, FIELD("header", Header, False, None, False)), - (0x00000071, FIELD("delivery_annotations", FieldDefinition.annotations, False, None, False)), - (0x00000072, FIELD("message_annotations", FieldDefinition.annotations, False, None, False)), - (0x00000073, FIELD("properties", Properties, False, None, False)), - (0x00000074, FIELD("application_properties", AMQPTypes.map, False, None, False)), - (0x00000075, FIELD("data", AMQPTypes.binary, False, None, True)), - (0x00000076, FIELD("sequence", AMQPTypes.list, False, None, False)), - (0x00000077, FIELD("value", None, False, None, False)), - (0x00000078, FIELD("footer", FieldDefinition.annotations, False, None, False))) -if _CAN_ADD_DOCSTRING: - Message.__doc__ = """ - An annotated message consists of the bare message plus sections for annotation at the head and tail + _code: int = 0x00000073 + _definition: List[Optional[FIELD]] = [ + FIELD(FieldDefinition.message_id, False), + FIELD(AMQPTypes.binary, False), + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.string, False), + FIELD(FieldDefinition.message_id, False), + FIELD(AMQPTypes.symbol, False), + FIELD(AMQPTypes.symbol, False), + FIELD(AMQPTypes.timestamp, False), + FIELD(AMQPTypes.timestamp, False), + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.string, False) + ] + message_id: Optional[Union[str, bytes, int, UUID]] = None + user_id: Optional[bytes] = None + to: Optional[str] = None + subject: Optional[str] = None + reply_to: Optional[str] = None + correlation_id: Optional[Union[str, bytes, int, UUID]] = None + content_type: Optional[bytes] = None + content_encoding: Optional[bytes] = None + absolute_expiry_time: Optional[datetime] = None + creation_time: Optional[datetime] = None + group_id: Optional[str] = None + group_sequence: Optional[int] = None + reply_to_group_id: Optional[str] = None + + +class Message(NamedTuple): + """An annotated message. + + Consists of the bare message plus sections for annotation at the head and tail of the bare message. - There are two classes of annotations: annotations that travel with the message indefinitely, and annotations that are consumed by the next node. The exact structure of a message, together with its encoding, is defined by the message format. This document @@ -209,7 +179,7 @@ or a single amqp-value section. - Zero or one footer. - :param ~uamqp.message.Header header: Transport headers for a Message. + :param ~pyamqp.Header header: Transport headers for a Message. The header section carries standard delivery details about the transfer of a Message through the AMQP network. If the header section is omitted the receiver MUST assume the appropriate default values for the fields within the header unless other target or node specific defaults have otherwise been set. @@ -233,7 +203,7 @@ filtered on. A registry of defined annotations and their meanings can be found here: http://www.amqp.org/specification/1.0/message-annotations. If the message-annotations section is omitted, it is equivalent to a message-annotations section containing an empty map of annotations. - :param ~uamqp.message.Properties: Immutable properties of the Message. + :param ~pyamqp.Properties: Immutable properties of the Message. The properties section is used for a defined set of standard properties of the message. The properties section is part of the bare message and thus must, if retransmitted by an intermediary, remain completely unaltered. @@ -242,7 +212,7 @@ of filtering or routing. The keys of this map are restricted to be of type string (which excludes the possibility of a null key) and the values are restricted to be of simple types only (that is excluding map, list, and array types). - :param list(bytes) data_body: A data section contains opaque binary data. + :param List[bytes] data_body: A data section contains opaque binary data. :param list sequence_body: A sequence section contains an arbitrary number of structured data elements. :param value_body: An amqp-value section contains a single AMQP value. :param dict footer: Transport footers for a Message. @@ -251,17 +221,33 @@ signatures and encryption details). A registry of defined footers and their meanings can be found here: http://www.amqp.org/specification/1.0/footer. """ + # TODO: should be a class, namedtuple or dataclass, immutability vs performance, need to collect performance data + _code: int = 0 + header: Optional[Header] = None + delivery_annotations: Optional[Dict[Union[int, AnyStr], AMQP_STRUCTURED_TYPES]] = None + message_annotations: Optional[Dict[Union[int, AnyStr], AMQP_STRUCTURED_TYPES]] = None + properties: Optional[Properties] = None + application_properties: Optional[Dict[Union[str, bytes], AMQP_PRIMATIVE_TYPES]] = None + data: Optional[List[bytes]] = None + sequence: Optional[List[AMQP_STRUCTURED_TYPES]] = None + value: Optional[AMQP_STRUCTURED_TYPES] = None + footer: Optional[Dict[Union[int, AnyStr], AMQP_STRUCTURED_TYPES]] = None class BatchMessage(Message): - _code = 0x80013700 + _code: int = 0x80013700 class _MessageDelivery: - def __init__(self, message, state=MessageDeliveryState.WaitingToBeSent, expiry=None): + def __init__( + self, + message: Message, + state: MessageDeliveryState = MessageDeliveryState.WaitingToBeSent, + expiry: Optional[datetime] = None + ): self.message = message self.state = state self.expiry = expiry - self.reason = None - self.delivery = None - self.error = None + self.reason: Optional[bytes] = None + self.delivery: Optional[bool] = None + self.error: Optional[AMQPError] = None diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/outcomes.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/outcomes.py index 0dcf41cd54c2..277a0ecd7796 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/outcomes.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/outcomes.py @@ -25,21 +25,14 @@ # - received: indicates partial message data seen by the receiver as well as the starting point for a # resumed transfer -from collections import namedtuple +from typing import AnyStr, Dict, List, Optional, Union -from .types import AMQPTypes, FieldDefinition, ObjDefinition -from .constants import FIELD -from .performatives import _CAN_ADD_DOCSTRING +from .types import AMQPTypes, FieldDefinition, ObjDefinition, FIELD, Performative, AMQP_STRUCTURED_TYPES +from .error import AMQPError -Received = namedtuple('received', ['section_number', 'section_offset']) -Received._code = 0x00000023 -Received._definition = ( - FIELD("section_number", AMQPTypes.uint, True, None, False), - FIELD("section_offset", AMQPTypes.ulong, True, None, False)) -if _CAN_ADD_DOCSTRING: - Received.__doc__ = """ - At the target the received state indicates the furthest point in the payload of the message +class Received(Performative): + """At the target the received state indicates the furthest point in the payload of the message which the target will not need to have resent if the link is resumed. At the source the received state represents the earliest point in the payload which the Sender is able to resume transferring at in the case of link resumption. When resuming a delivery, if this state is set on the first transfer performative it indicates @@ -62,14 +55,17 @@ Received(section-number=X+1, section-offset=0). The state Received(sectionnumber=0, section-offset=0) indicates that no message data at all has been transferred. """ + _code: int = 0x00000023 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.ulong, False) + ] + section_number: int + section_offset: int -Accepted = namedtuple('accepted', []) -Accepted._code = 0x00000024 -Accepted._definition = () -if _CAN_ADD_DOCSTRING: - Accepted.__doc__ = """ - The accepted outcome. +class Accepted(Performative): + """The accepted outcome. At the source the accepted state means that the message has been retired from the node, and transfer of payload data will not be able to be resumed if the link becomes suspended. A delivery may become accepted at @@ -80,15 +76,11 @@ to transition the delivery to the accepted state at the source. The accepted outcome does not increment the delivery-count in the header of the accepted Message. """ + _code: int = 0x00000024 -Rejected = namedtuple('rejected', ['error']) -Rejected.__new__.__defaults__ = (None,) * len(Rejected._fields) -Rejected._code = 0x00000025 -Rejected._definition = (FIELD("error", ObjDefinition.error, False, None, False),) -if _CAN_ADD_DOCSTRING: - Rejected.__doc__ = """ - The rejected outcome. +class Rejected(Performative): + """The rejected outcome. At the target, the rejected outcome is used to indicate that an incoming Message is invalid and therefore unprocessable. The rejected outcome when applied to a Message will cause the delivery-count to be incremented @@ -100,14 +92,13 @@ The value supplied in this field will be placed in the delivery-annotations of the rejected Message associated with the symbolic key "rejected". """ + _code: int = 0x00000025 + _definition: List[Optional[FIELD]] = [FIELD(ObjDefinition.error, False)] + error: Optional[AMQPError] = None -Released = namedtuple('released', []) -Released._code = 0x00000026 -Released._definition = () -if _CAN_ADD_DOCSTRING: - Released.__doc__ = """ - The released outcome. +class Released(Performative): + """The released outcome. At the source the released outcome means that the message is no longer acquired by the receiver, and has been made available for (re-)delivery to the same or other targets receiving from the node. The message is unchanged @@ -121,18 +112,11 @@ At the target, the released outcome is used to indicate that a given transfer was not and will not be acted upon. """ + _code: int = 0x00000026 -Modified = namedtuple('modified', ['delivery_failed', 'undeliverable_here', 'message_annotations']) -Modified.__new__.__defaults__ = (None,) * len(Modified._fields) -Modified._code = 0x00000027 -Modified._definition = ( - FIELD('delivery_failed', AMQPTypes.boolean, False, None, False), - FIELD('undeliverable_here', AMQPTypes.boolean, False, None, False), - FIELD('message_annotations', FieldDefinition.fields, False, None, False)) -if _CAN_ADD_DOCSTRING: - Modified.__doc__ = """ - The modified outcome. +class Modified(Performative): + """The modified outcome. At the source the modified outcome means that the message is no longer acquired by the receiver, and has been made available for (re-)delivery to the same or other targets receiving from the node. The message has been @@ -157,3 +141,15 @@ entry in this field, the value in this field associated with that key replaces the one in the existing headers; where the existing message-annotations has no such value, the value in this map is added. """ + _code: int = 0x00000027 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.boolean, False), + FIELD(FieldDefinition.fields, False) + ] + delivery_failed: Optional[bool] = None + undeliverable_here: Optional[bool] = None + message_annotations: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + + +SETTLEMENT_TYPES = Union[Received, Released, Accepted, Modified, Rejected] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/performatives.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/performatives.py index 8b27295faedf..191a33eedf7d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/performatives.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/performatives.py @@ -3,45 +3,23 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- +from typing import Dict, List, Optional, AnyStr -from collections import namedtuple -import sys - -from .types import AMQPTypes, FieldDefinition, ObjDefinition -from .constants import FIELD - -_CAN_ADD_DOCSTRING = sys.version_info.major >= 3 - - -OpenFrame = namedtuple( - 'open', - [ - 'container_id', - 'hostname', - 'max_frame_size', - 'channel_max', - 'idle_timeout', - 'outgoing_locales', - 'incoming_locales', - 'offered_capabilities', - 'desired_capabilities', - 'properties' - ]) -OpenFrame._code = 0x00000010 # pylint:disable=protected-access -OpenFrame._definition = ( # pylint:disable=protected-access - FIELD("container_id", AMQPTypes.string, True, None, False), - FIELD("hostname", AMQPTypes.string, False, None, False), - FIELD("max_frame_size", AMQPTypes.uint, False, 4294967295, False), - FIELD("channel_max", AMQPTypes.ushort, False, 65535, False), - FIELD("idle_timeout", AMQPTypes.uint, False, None, False), - FIELD("outgoing_locales", AMQPTypes.symbol, False, None, True), - FIELD("incoming_locales", AMQPTypes.symbol, False, None, True), - FIELD("offered_capabilities", AMQPTypes.symbol, False, None, True), - FIELD("desired_capabilities", AMQPTypes.symbol, False, None, True), - FIELD("properties", FieldDefinition.fields, False, None, False)) -if _CAN_ADD_DOCSTRING: - OpenFrame.__doc__ = """ - OPEN performative. Negotiate Connection parameters. +from .outcomes import SETTLEMENT_TYPES +from .error import AMQPError +from .endpoints import Source, Target +from .types import ( + Performative, + AMQPTypes, + FieldDefinition, + ObjDefinition, + AMQP_STRUCTURED_TYPES, + FIELD +) + + +class OpenFrame(Performative): + """OPEN performative. Negotiate Connection parameters. The first frame sent on a connection in either direction MUST contain an Open body. (Note that theConnection header which is sent first on the Connection is *not* a frame.) @@ -73,60 +51,60 @@ an error explaining why (eg, because it is too small). If the value is not set, then the sender does not have an idle time-out. However, senders doing this should be aware that implementations MAY choose to use an internal default to efficiently manage a peer's resources. - :param list(str) outgoing_locales: Locales available for outgoing text. + :param List[AnyStr] outgoing_locales: Locales available for outgoing text. A list of the locales that the peer supports for sending informational text. This includes Connection, Session and Link error descriptions. A peer MUST support at least the en-US locale. Since this value is always supported, it need not be supplied in the outgoing-locales. A null value or an empty list implies that only en-US is supported. - :param list(str) incoming_locales: Desired locales for incoming text in decreasing level of preference. + :param List[AnyStr] incoming_locales: Desired locales for incoming text in decreasing level of preference. A list of locales that the sending peer permits for incoming informational text. This list is ordered in decreasing level of preference. The receiving partner will chose the first (most preferred) incoming locale from those which it supports. If none of the requested locales are supported, en-US will be chosen. Note that en-US need not be supplied in this list as it is always the fallback. A peer may determine which of the permitted incoming locales is chosen by examining the partner's supported locales asspecified in the outgoing_locales field. A null value or an empty list implies that only en-US is supported. - :param list(str) offered_capabilities: The extension capabilities the sender supports. + :param List[AnyStr] offered_capabilities: The extension capabilities the sender supports. If the receiver of the offered-capabilities requires an extension capability which is not present in the offered-capability list then it MUST close the connection. A list of commonly defined connection capabilities and their meanings can be found here: http://www.amqp.org/specification/1.0/connection-capabilities. - :param list(str) required_capabilities: The extension capabilities the sender may use if the receiver supports + :param List[AnyStr] required_capabilities: The extension capabilities the sender may use if the receiver supports them. The desired-capability list defines which extension capabilities the sender MAY use if the receiver offers them (i.e. they are in the offered-capabilities list received by the sender of the desired-capabilities). If the receiver of the desired-capabilities offers extension capabilities which are not present in the desired-capability list it received, then it can be sure those (undesired) capabilities will not be used on the Connection. - :param dict properties: Connection properties. + :param Dict[AnyStr, AMQP_STRUCTURED_TYPES] properties: Connection properties. The properties map contains a set of fields intended to indicate information about the connection and its container. A list of commonly defined connection properties and their meanings can be found here: http://www.amqp.org/specification/1.0/connection-properties. """ - - -BeginFrame = namedtuple( - 'begin', - [ - 'remote_channel', - 'next_outgoing_id', - 'incoming_window', - 'outgoing_window', - 'handle_max', - 'offered_capabilities', - 'desired_capabilities', - 'properties' - ]) -BeginFrame._code = 0x00000011 # pylint:disable=protected-access -BeginFrame._definition = ( # pylint:disable=protected-access - FIELD("remote_channel", AMQPTypes.ushort, False, None, False), - FIELD("next_outgoing_id", AMQPTypes.uint, True, None, False), - FIELD("incoming_window", AMQPTypes.uint, True, None, False), - FIELD("outgoing_window", AMQPTypes.uint, True, None, False), - FIELD("handle_max", AMQPTypes.uint, False, 4294967295, False), - FIELD("offered_capabilities", AMQPTypes.symbol, False, None, True), - FIELD("desired_capabilities", AMQPTypes.symbol, False, None, True), - FIELD("properties", FieldDefinition.fields, False, None, False)) -if _CAN_ADD_DOCSTRING: - BeginFrame.__doc__ = """ - BEGIN performative. Begin a Session on a channel. + _code: int = 0x00000010 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.ushort, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.symbol, True), + FIELD(AMQPTypes.symbol, True), + FIELD(AMQPTypes.symbol, True), + FIELD(AMQPTypes.symbol, True), + FIELD(FieldDefinition.fields, False) + ] + container_id: AnyStr + hostname: Optional[AnyStr] = None + max_frame_size: int = 4294967295 + channel_max: int = 65535 + idle_timeout: Optional[int] = None + outgoing_locales: Optional[List[AnyStr]] = None + incoming_locales: Optional[List[AnyStr]] = None + offered_capabilities: Optional[List[AnyStr]] = None + desired_capabilities: Optional[List[AnyStr]] = None + properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + + +class BeginFrame(Performative): + """BEGIN performative. Begin a Session on a channel. Indicate that a Session has begun on the channel. @@ -150,55 +128,39 @@ The handle-max value is the highest handle value that may be used on the Session. A peer MUST NOT attempt to attach a Link using a handle value outside the range that its partner can handle. A peer that receives a handle outside the supported range MUST close the Connection with the framing-error error-code. - :param list(str) offered_capabilities: The extension capabilities the sender supports. + :param List[AnyStr] offered_capabilities: The extension capabilities the sender supports. A list of commonly defined session capabilities and their meanings can be found here: http://www.amqp.org/specification/1.0/session-capabilities. - :param list(str) desired_capabilities: The extension capabilities the sender may use if the receiver + :param List[AnyStr] desired_capabilities: The extension capabilities the sender may use if the receiver supports them. - :param dict properties: Session properties. + :param Dict[AnyStr, AMQP_STRUCTURED_TYPES] properties: Session properties. The properties map contains a set of fields intended to indicate information about the session and its container. A list of commonly defined session properties and their meanings can be found here: http://www.amqp.org/specification/1.0/session-properties. """ - - -AttachFrame = namedtuple( - 'attach', - [ - 'name', - 'handle', - 'role', - 'send_settle_mode', - 'rcv_settle_mode', - 'source', - 'target', - 'unsettled', - 'incomplete_unsettled', - 'initial_delivery_count', - 'max_message_size', - 'offered_capabilities', - 'desired_capabilities', - 'properties' - ]) -AttachFrame._code = 0x00000012 # pylint:disable=protected-access -AttachFrame._definition = ( # pylint:disable=protected-access - FIELD("name", AMQPTypes.string, True, None, False), - FIELD("handle", AMQPTypes.uint, True, None, False), - FIELD("role", AMQPTypes.boolean, True, None, False), - FIELD("send_settle_mode", AMQPTypes.ubyte, False, 2, False), - FIELD("rcv_settle_mode", AMQPTypes.ubyte, False, 0, False), - FIELD("source", ObjDefinition.source, False, None, False), - FIELD("target", ObjDefinition.target, False, None, False), - FIELD("unsettled", AMQPTypes.map, False, None, False), - FIELD("incomplete_unsettled", AMQPTypes.boolean, False, False, False), - FIELD("initial_delivery_count", AMQPTypes.uint, False, None, False), - FIELD("max_message_size", AMQPTypes.ulong, False, None, False), - FIELD("offered_capabilities", AMQPTypes.symbol, False, None, True), - FIELD("desired_capabilities", AMQPTypes.symbol, False, None, True), - FIELD("properties", FieldDefinition.fields, False, None, False)) -if _CAN_ADD_DOCSTRING: - AttachFrame.__doc__ = """ - ATTACH performative. Attach a Link to a Session. + _code = 0x00000011 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.ushort, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.symbol, True), + FIELD(AMQPTypes.symbol, True), + FIELD(FieldDefinition.fields, False) + ] + remote_channel: Optional[int] + next_outgoing_id: int + incoming_window: int + outgoing_window: int + handle_max: int = 4294967295 + offered_capabilities: Optional[List[AnyStr]] = None + desired_capabilities: Optional[List[AnyStr]] = None + properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + + +class AttachFrame(Performative): + """ATTACH performative. Attach a Link to a Session. The attach frame indicates that a Link Endpoint has been attached to the Session. The opening flag is used to indicate that the Link Endpoint is newly created. @@ -221,13 +183,13 @@ Determines the settlement policy for unsettled deliveries received at the Receiver. When set at the Sender this indicates the desired value for the settlement mode at the Receiver. When set at the Receiver this indicates the actual settlement mode in use. - :param ~uamqp.messaging.Source source: The source for Messages. + :param ~pyamqp.Source source: The source for Messages. If no source is specified on an outgoing Link, then there is no source currently attached to the Link. A Link with no source will never produce outgoing Messages. - :param ~uamqp.messaging.Target target: The target for Messages. + :param ~pyamqp.Target target: The target for Messages. If no target is specified on an incoming Link, then there is no target currently attached to the Link. A Link with no target will never permit incoming Messages. - :param dict unsettled: Unsettled delivery state. + :param Dict[AnyStr, SETTLEMENT_TYPES] unsettled: Unsettled delivery state. This is used to indicate any unsettled delivery states when a suspended link is resumed. The map is keyed by delivery-tag with values indicating the delivery state. The local and remote delivery states for a given delivery-tag MUST be compared to resolve any in-doubt deliveries. If necessary, deliveries MAY be resent, @@ -249,50 +211,51 @@ This field indicates the maximum message size supported by the link endpoint. Any attempt to deliver a message larger than this results in a message-size-exceeded link-error. If this field is zero or unset, there is no maximum size imposed by the link endpoint. - :param list(str) offered_capabilities: The extension capabilities the sender supports. + :param List[AnyStr] offered_capabilities: The extension capabilities the sender supports. A list of commonly defined session capabilities and their meanings can be found here: http://www.amqp.org/specification/1.0/link-capabilities. - :param list(str) desired_capabilities: The extension capabilities the sender may use if the receiver + :param List[AnyStr] desired_capabilities: The extension capabilities the sender may use if the receiver supports them. - :param dict properties: Link properties. + :param Dict[AnyStr, AMQP_STRUCTURED_TYPES] properties: Link properties. The properties map contains a set of fields intended to indicate information about the link and its container. A list of commonly defined link properties and their meanings can be found here: http://www.amqp.org/specification/1.0/link-properties. """ - - -FlowFrame = namedtuple( - 'flow', - [ - 'next_incoming_id', - 'incoming_window', - 'next_outgoing_id', - 'outgoing_window', - 'handle', - 'delivery_count', - 'link_credit', - 'available', - 'drain', - 'echo', - 'properties' - ]) -FlowFrame.__new__.__defaults__ = (None, None, None, None, None, None, None) -FlowFrame._code = 0x00000013 # pylint:disable=protected-access -FlowFrame._definition = ( # pylint:disable=protected-access - FIELD("next_incoming_id", AMQPTypes.uint, False, None, False), - FIELD("incoming_window", AMQPTypes.uint, True, None, False), - FIELD("next_outgoing_id", AMQPTypes.uint, True, None, False), - FIELD("outgoing_window", AMQPTypes.uint, True, None, False), - FIELD("handle", AMQPTypes.uint, False, None, False), - FIELD("delivery_count", AMQPTypes.uint, False, None, False), - FIELD("link_credit", AMQPTypes.uint, False, None, False), - FIELD("available", AMQPTypes.uint, False, None, False), - FIELD("drain", AMQPTypes.boolean, False, False, False), - FIELD("echo", AMQPTypes.boolean, False, False, False), - FIELD("properties", FieldDefinition.fields, False, None, False)) -if _CAN_ADD_DOCSTRING: - FlowFrame.__doc__ = """ - FLOW performative. Update link state. + _code = 0x00000012 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.string, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.ubyte, False), + FIELD(AMQPTypes.ubyte, False), + FIELD(ObjDefinition.source, False), + FIELD(ObjDefinition.target, False), + FIELD(AMQPTypes.map, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.ulong, False), + FIELD(AMQPTypes.symbol, True), + FIELD(AMQPTypes.symbol, True), + FIELD(FieldDefinition.fields, False) + ] + name: str + handle: int + role: bool + send_settle_mode: int = 2 + rcv_settle_mode: int = 0 + source: Optional[Source] = None + target: Optional[Target] = None + unsettled: Dict[AnyStr, SETTLEMENT_TYPES] = None + incomplete_unsettled: bool = False + initial_delivery_count: Optional[int] = None + max_message_size: Optional[int] = None + offered_capabilities: Optional[List[AnyStr]] = None + desired_capabilities: Optional[List[AnyStr]] = None + properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + + +class FlowFrame(Performative): + """FLOW performative. Update link state. Updates the flow state for the specified Link. @@ -327,45 +290,39 @@ sender. When flow state is sent from the receiver to the sender, this field contains the desired drain mode of the receiver. When the handle field is not set, this field MUST NOT be set. :param bool echo: Request link state from other endpoint. - :param dict properties: Link state properties. + :param Dict[AnyStr, AMQP_STRUCTURED_TYPES] properties: Link state properties. A list of commonly defined link state properties and their meanings can be found here: http://www.amqp.org/specification/1.0/link-state-properties. """ - - -TransferFrame = namedtuple( - 'transfer', - [ - 'handle', - 'delivery_id', - 'delivery_tag', - 'message_format', - 'settled', - 'more', - 'rcv_settle_mode', - 'state', - 'resume', - 'aborted', - 'batchable', - 'payload' - ]) -TransferFrame._code = 0x00000014 # pylint:disable=protected-access -TransferFrame._definition = ( # pylint:disable=protected-access - FIELD("handle", AMQPTypes.uint, True, None, False), - FIELD("delivery_id", AMQPTypes.uint, False, None, False), - FIELD("delivery_tag", AMQPTypes.binary, False, None, False), - FIELD("message_format", AMQPTypes.uint, False, 0, False), - FIELD("settled", AMQPTypes.boolean, False, None, False), - FIELD("more", AMQPTypes.boolean, False, False, False), - FIELD("rcv_settle_mode", AMQPTypes.ubyte, False, None, False), - FIELD("state", ObjDefinition.delivery_state, False, None, False), - FIELD("resume", AMQPTypes.boolean, False, False, False), - FIELD("aborted", AMQPTypes.boolean, False, False, False), - FIELD("batchable", AMQPTypes.boolean, False, False, False), - None) -if _CAN_ADD_DOCSTRING: - TransferFrame.__doc__ = """ - TRANSFER performative. Transfer a Message. + _code: int = 0x00000013 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.boolean, False), + FIELD(FieldDefinition.fields, False) + ] + next_incoming_id: int + incoming_window: int + next_outgoing_id: int + outgoing_window: int + handle: Optional[int] = None + delivery_count: Optional[int] = None + link_credit: Optional[int] = None + available: Optional[int] = None + drain: bool = False + echo: bool = False + properties: Optional[Dict[AnyStr, AMQP_STRUCTURED_TYPES]] = None + + +class TransferFrame(Performative): + """TRANSFER performative. Transfer a Message. The transfer frame is used to send Messages across a Link. Messages may be carried by a single transfer up to the maximum negotiated frame size for the Connection. Larger Messages may be split across several @@ -432,29 +389,37 @@ for the delivery. The batchable value does not form part of the transfer state, and is not retained if a link is suspended and subsequently resumed. """ - - -DispositionFrame = namedtuple( - 'disposition', - [ - 'role', - 'first', - 'last', - 'settled', - 'state', - 'batchable' - ]) -DispositionFrame._code = 0x00000015 # pylint:disable=protected-access -DispositionFrame._definition = ( # pylint:disable=protected-access - FIELD("role", AMQPTypes.boolean, True, None, False), - FIELD("first", AMQPTypes.uint, True, None, False), - FIELD("last", AMQPTypes.uint, False, None, False), - FIELD("settled", AMQPTypes.boolean, False, False, False), - FIELD("state", ObjDefinition.delivery_state, False, None, False), - FIELD("batchable", AMQPTypes.boolean, False, False, False)) -if _CAN_ADD_DOCSTRING: - DispositionFrame.__doc__ = """ - DISPOSITION performative. Inform remote peer of delivery state changes. + _code: int = 0x00000014 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.binary, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.ubyte, False), + FIELD(ObjDefinition.delivery_state, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.boolean, False), + None + ] + handle: int + delivery_id: Optional[int] = None + delivery_tag: Optional[bytes] = None + message_format: int = 0 + settled: Optional[bool] = None + more: bool = False + rcv_settle_mode: Optional[int] = None + state: Optional[SETTLEMENT_TYPES] = None + resume: bool = False + aborted: bool = False + batchable: bool = False + payload: Optional[bytes] = None + + +class DispositionFrame(Performative): + """DISPOSITION performative. Inform remote peer of delivery state changes. The disposition frame is used to inform the remote peer of local changes in the state of deliveries. The disposition frame may reference deliveries from many different links associated with a session, @@ -476,93 +441,101 @@ this is taken to be the same as first. :param bool settled: Indicates deliveries are settled. If true, indicates that the referenced deliveries are considered settled by the issuing endpoint. - :param bytes state: Indicates state of deliveries. + :param ~pyamqp.SETTLEMENT_TYPES state: Indicates state of deliveries. Communicates the state of all the deliveries referenced by this disposition. :param bool batchable: Batchable hint. If true, then the issuer is hinting that there is no need for the peer to urgently communicate the impact of the updated delivery states. This hint may be used to artificially increase the amount of batching an implementation uses when communicating delivery states, and thereby save bandwidth. """ - -DetachFrame = namedtuple('detach', ['handle', 'closed', 'error']) -DetachFrame._code = 0x00000016 # pylint:disable=protected-access -DetachFrame._definition = ( # pylint:disable=protected-access - FIELD("handle", AMQPTypes.uint, True, None, False), - FIELD("closed", AMQPTypes.boolean, False, False, False), - FIELD("error", ObjDefinition.error, False, None, False)) -if _CAN_ADD_DOCSTRING: - DetachFrame.__doc__ = """ - DETACH performative. Detach the Link Endpoint from the Session. + _code: int = 0x00000015 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.boolean, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(ObjDefinition.delivery_state, False), + FIELD(AMQPTypes.boolean, False) + ] + role: bool + first: int + last: Optional[int] = None + settled: bool = False + state: Optional[SETTLEMENT_TYPES] = None + batchable: bool = False + + +class DetachFrame(Performative): + """DETACH performative. Detach the Link Endpoint from the Session. Detach the Link Endpoint from the Session. This un-maps the handle and makes it available for use by other Links :param int handle: The local handle of the link to be detached. :param bool handle: If true then the sender has closed the link. - :param ~uamqp.error.AMQPError error: Error causing the detach. + :param ~pyamqp.AMQPError error: Error causing the detach. If set, this field indicates that the Link is being detached due to an error condition. The value of the field should contain details on the cause of the error. """ + _code: int = 0x00000016 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.uint, False), + FIELD(AMQPTypes.boolean, False), + FIELD(ObjDefinition.error, False) + ] + handle: int + closed: bool = False + error: Optional[AMQPError] = None -EndFrame = namedtuple('end', ['error']) -EndFrame._code = 0x00000017 # pylint:disable=protected-access -EndFrame._definition = (FIELD("error", ObjDefinition.error, False, None, False),) # pylint:disable=protected-access -if _CAN_ADD_DOCSTRING: - EndFrame.__doc__ = """ - END performative. End the Session. +class EndFrame(Performative): + """END performative. End the Session. Indicates that the Session has ended. - :param ~uamqp.error.AMQPError error: Error causing the end. + :param ~pyamqp.AMQPError error: Error causing the end. If set, this field indicates that the Session is being ended due to an error condition. The value of the field should contain details on the cause of the error. """ + _code: int = 0x00000017 + _definition: List[Optional[FIELD]] = [FIELD(ObjDefinition.error, False)] + error: Optional[AMQPError] = None -CloseFrame = namedtuple('close', ['error']) -CloseFrame._code = 0x00000018 # pylint:disable=protected-access -CloseFrame._definition = (FIELD("error", ObjDefinition.error, False, None, False),) # pylint:disable=protected-access -if _CAN_ADD_DOCSTRING: - CloseFrame.__doc__ = """ - CLOSE performative. Signal a Connection close. +class CloseFrame(Performative): + """CLOSE performative. Signal a Connection close. Sending a close signals that the sender will not be sending any more frames (or bytes of any other kind) on the Connection. Orderly shutdown requires that this frame MUST be written by the sender. It is illegal to send any more frames (or bytes of any other kind) after sending a close frame. - :param ~uamqp.error.AMQPError error: Error causing the close. + :param ~pyamqp.AMQPError error: Error causing the close. If set, this field indicates that the Connection is being closed due to an error condition. The value of the field should contain details on the cause of the error. """ + _code: int = 0x00000018 + _definition: List[Optional[FIELD]] = [FIELD(ObjDefinition.error, False)] + error: Optional[AMQPError] = None -SASLMechanism = namedtuple('sasl_mechanism', ['sasl_server_mechanisms']) -SASLMechanism._code = 0x00000040 # pylint:disable=protected-access -SASLMechanism._definition = (FIELD('sasl_server_mechanisms', AMQPTypes.symbol, True, None, True),) # pylint:disable=protected-access -if _CAN_ADD_DOCSTRING: - SASLMechanism.__doc__ = """ - Advertise available sasl mechanisms. +class SASLMechanism(Performative): + """Advertise available sasl mechanisms. dvertises the available SASL mechanisms that may be used for authentication. - :param list(bytes) sasl_server_mechanisms: Supported sasl mechanisms. + :param List[AnyStr] sasl_server_mechanisms: Supported sasl mechanisms. A list of the sasl security mechanisms supported by the sending peer. It is invalid for this list to be null or empty. If the sending peer does not require its partner to authenticate with it, then it should send a list of one element with its value as the SASL mechanism ANONYMOUS. The server mechanisms are ordered in decreasing level of preference. """ + _code: int = 0x00000040 + _definition: List[Optional[FIELD]] = [FIELD(AMQPTypes.symbol, True)] + sasl_server_mechanisms: List[AnyStr] -SASLInit = namedtuple('sasl_init', ['mechanism', 'initial_response', 'hostname']) -SASLInit._code = 0x00000041 # pylint:disable=protected-access -SASLInit._definition = ( # pylint:disable=protected-access - FIELD('mechanism', AMQPTypes.symbol, True, None, False), - FIELD('initial_response', AMQPTypes.binary, False, None, False), - FIELD('hostname', AMQPTypes.string, False, None, False)) -if _CAN_ADD_DOCSTRING: - SASLInit.__doc__ = """ - Initiate sasl exchange. +class SASLInit(Performative): + """Initiate sasl exchange. Selects the sasl mechanism and provides the initial response if needed. @@ -583,43 +556,44 @@ in RFC-4366, if a TLS layer is used, in which case this field SHOULD benull or contain the same value. It is undefined what a different value to those already specific means. """ + _code: int = 0x00000041 + _definition: List[Optional[FIELD]] = [ + FIELD('mechanism', AMQPTypes.symbol, True, None, False), + FIELD('initial_response', AMQPTypes.binary, False, None, False), + FIELD('hostname', AMQPTypes.string, False, None, False) + ] + mechanism: AnyStr + initial_response: Optional[bytes] = None + hostname: Optional[AnyStr] = None -SASLChallenge = namedtuple('sasl_challenge', ['challenge']) -SASLChallenge._code = 0x00000042 # pylint:disable=protected-access -SASLChallenge._definition = (FIELD('challenge', AMQPTypes.binary, True, None, False),) # pylint:disable=protected-access -if _CAN_ADD_DOCSTRING: - SASLChallenge.__doc__ = """ - Security mechanism challenge. +class SASLChallenge(Performative): + """Security mechanism challenge. Send the SASL challenge data as defined by the SASL specification. :param bytes challenge: Security challenge data. Challenge information, a block of opaque binary data passed to the security mechanism. """ + _code: int = 0x00000042 + _definition: List[Optional[FIELD]] = [FIELD(AMQPTypes.binary, False)] + challenge: bytes -SASLResponse = namedtuple('sasl_response', ['response']) -SASLResponse._code = 0x00000043 # pylint:disable=protected-access -SASLResponse._definition = (FIELD('response', AMQPTypes.binary, True, None, False),) # pylint:disable=protected-access -if _CAN_ADD_DOCSTRING: - SASLResponse.__doc__ = """ - Security mechanism response. +class SASLResponse(Performative): + """Security mechanism response. Send the SASL response data as defined by the SASL specification. :param bytes response: Security response data. """ + _code: int = 0x00000043 + _definition: List[Optional[FIELD]] = [FIELD(AMQPTypes.binary, False)] + response: bytes -SASLOutcome = namedtuple('sasl_outcome', ['code', 'additional_data']) -SASLOutcome._code = 0x00000044 # pylint:disable=protected-access -SASLOutcome._definition = ( # pylint:disable=protected-access - FIELD('code', AMQPTypes.ubyte, True, None, False), - FIELD('additional_data', AMQPTypes.binary, False, None, False)) -if _CAN_ADD_DOCSTRING: - SASLOutcome.__doc__ = """ - Indicates the outcome of the sasl dialog. +class SASLOutcome(Performative): + """Indicates the outcome of the sasl dialog. This frame indicates the outcome of the SASL dialog. Upon successful completion of the SASL dialog the Security Layer has been established, and the peers must exchange protocol headers to either starta nested @@ -631,3 +605,10 @@ The additional-data field carries additional data on successful authentication outcomeas specified by the SASL specification (RFC-4422). If the authentication is unsuccessful, this field is not set. """ + _code: int = 0x00000044 + _definition: List[Optional[FIELD]] = [ + FIELD(AMQPTypes.ubyte, False), + FIELD(AMQPTypes.binary, False) + ] + code: int + additional_data: Optional[bytes] = None diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py index 554c254d00cb..8d0d999b089a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py @@ -6,12 +6,10 @@ import uuid import logging -from io import BytesIO -from typing import Optional, Union +from typing import Optional, Union, TYPE_CHECKING, Callable from ._decode import decode_payload -from .constants import DEFAULT_LINK_CREDIT, Role -from .endpoints import Target +from .endpoints import Source, Target from .link import Link from .message import Message, Properties, Header from .constants import ( @@ -19,7 +17,8 @@ SessionState, SessionTransferState, LinkDeliverySettleReason, - LinkState + LinkState, + Role ) from .performatives import ( AttachFrame, @@ -28,46 +27,86 @@ DispositionFrame, FlowFrame, ) -from .outcomes import ( - Received, - Accepted, - Rejected, - Released, - Modified -) - +from .outcomes import SETTLEMENT_TYPES +if TYPE_CHECKING: + from .session import Session _LOGGER = logging.getLogger(__name__) class ReceiverLink(Link): + """A definition of a Link that has the predefined role of a receiver.""" + + def __init__( + self, + * + session: "Session", + handle: int, + source: Union[str, Source], + on_transfer: Callable[[TransferFrame, Message], Optional[SETTLEMENT_TYPES]], + target: Optional[Union[str, Target]] = None, + name: Optional[str] = None, + **kwargs): + """Create a new Receiver link. - def __init__(self, session, handle, source_address, **kwargs): - name = kwargs.pop('name', None) or str(uuid.uuid4()) - role = Role.Receiver - if 'target_address' not in kwargs: - kwargs['target_address'] = "receiver-link-{}".format(name) - super(ReceiverLink, self).__init__(session, handle, name, role, source_address=source_address, **kwargs) - self._on_transfer = kwargs.pop('on_transfer') + This constructor should not be called directly - instead this object will be returned + from calling :func:~pyamqp.Session.create_receiver_link(). - def _process_incoming_message(self, frame, message): + :param ~pyamqp.Session session: The session to which this link will be established within. + :param int handle: The next available handle within the session to assign to the link. + :param source: The source endpoint to connect to and start receiving from. This could + be just a string address, or a fully formed AMQP 'source' type. + :paramtype source: Union[str, ~pyamqp.Source] + :param on_transfer: A callback function to be run with ever incoming Transfer frame and it's + message payload. Optionally this function can return an Outcome object, in which case the Message + will be immediately settled. Otherwise if None is returned, the message will not be actively + settled. + :paramtype on_transfer: Callable[[TransferFrame, Message], None] + :keyword target: An optional target for the receiver link. If supplied, it will be used as the + target address, if omitted a value will be generated in the format 'receiver-link-[name]'. + :paramtype target: Union[str, ~pyamqp.Target`] + :keyword str name: An optional name for the receiver link. If omitted, a UUID will be generated. + """ + name = name or str(uuid.uuid4()) + self._on_transfer = on_transfer + if not target: + target = "receiver-link-{}".format(name) + super().__init__( + session=session, + handle=handle, + name=name, + role=Role.Receiver, + source=source, + target=target, + **kwargs + ) + + def _incoming_message( + self, + frame: TransferFrame, + message: Message + ) -> Optional[SETTLEMENT_TYPES]: try: return self._on_transfer(frame, message) except Exception as e: - _LOGGER.error("Handler function failed with error: %r", e) + _LOGGER.error( + "Handler function 'on_transfer' failed with error: %r", + e, + extra=self.network_trace_params + ) return None - def _incoming_attach(self, frame): - super(ReceiverLink, self)._incoming_attach(frame) + def _incoming_attach(self, frame: AttachFrame) -> None: + super()._incoming_attach(frame) if frame[9] is None: # initial_delivery_count - _LOGGER.info("Cannot get initial-delivery-count. Detaching link") + _LOGGER.info("Cannot get initial-delivery-count. Detaching link.", extra=self.network_trace_params) self._remove_pending_deliveries() self._set_state(LinkState.DETACHED) # TODO: Send detach now? self.delivery_count = frame[9] self.current_link_credit = self.link_credit self._outgoing_flow() - def _incoming_transfer(self, frame): + def _incoming_transfer(self, frame: TransferFrame) -> None: if self.network_trace: _LOGGER.info("<- %r", TransferFrame(*frame), extra=self.network_trace_params) self.current_link_credit -= 1 @@ -83,7 +122,7 @@ def _incoming_transfer(self, frame): self._received_payload = bytearray() else: message = decode_payload(frame[11]) - delivery_state = self._process_incoming_message(frame, message) + delivery_state = self._incoming_message(frame, message) if not frame[4] and delivery_state: # settled self._outgoing_disposition(first=frame[1], settled=True, state=delivery_state) if self.current_link_credit <= 0: @@ -95,9 +134,9 @@ def _outgoing_disposition( first: int, last: Optional[int], settled: Optional[bool], - state: Optional[Union[Received, Accepted, Rejected, Released, Modified]], + state: Optional[SETTLEMENT_TYPES], batchable: Optional[bool] - ): + ) -> None: disposition_frame = DispositionFrame( role=self.role, first=first, @@ -116,9 +155,21 @@ def send_disposition( first_delivery_id: int, last_delivery_id: Optional[int] = None, settled: Optional[bool] = None, - delivery_state: Optional[Union[Received, Accepted, Rejected, Released, Modified]] = None, + delivery_state: Optional[SETTLEMENT_TYPES] = None, batchable: Optional[bool] = None - ): + ) -> None: + """Send a message disposition to a received transfer. + + :keyword int first_delivery_id: The delivery ID of the message to be settled. If settling a + range of messages, this will be the ID of the first. + :keyword int last_delivery_id: If a range of delivery IDs are being settled, this is the last + ID in the range. Default is None, meaning only the first delivery ID will be settled. + :keyword bool settled: Whether the disposition indicates that the message is settled. + :keyword delivery_state: If the message is being settled, the outcome of the settlement. + :paramtype delivery_state: Union[~pyamqp.Received, ~pyamqp.Rejected, ~pyamqp.Accepted, ~pyamqp.Modified, ~pyamqp.Released] + :keyword bool batchable: + :rtype: None + """ if self._is_closed: raise ValueError("Link already closed.") self._outgoing_disposition( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/types.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/types.py index db478af591c8..fd1c4f022fe1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/types.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/types.py @@ -5,13 +5,35 @@ #-------------------------------------------------------------------------- from enum import Enum +from uuid import uuid +from datetime import datetime +from typing import ( + NamedTuple, + Generic, + Literal, + TypeVar, + Union, + Dict, + List, + Optional, + Tuple +) +from typing_extensions import TypedDict + TYPE = 'TYPE' VALUE = 'VALUE' +AMQP_PRIMATIVE_TYPES = Union[int, str, bytes, None, bool, float, uuid, datetime] +AMQP_STRUCTURED_TYPES = Union[ + AMQP_PRIMATIVE_TYPES, + Dict[AMQP_PRIMATIVE_TYPES, AMQP_PRIMATIVE_TYPES], + List[AMQP_PRIMATIVE_TYPES] +] + -class AMQPTypes(object): # pylint: disable=no-init +class AMQPTypes(Enum): null = 'NULL' boolean = 'BOOL' ubyte = 'UBYTE' @@ -51,40 +73,37 @@ class ObjDefinition(Enum): error = "error" -class ConstructorBytes(object): # pylint: disable=no-init - null = b'\x40' - bool = b'\x56' - bool_true = b'\x41' - bool_false = b'\x42' - ubyte = b'\x50' - byte = b'\x51' - ushort = b'\x60' - short = b'\x61' - uint_0 = b'\x43' - uint_small = b'\x52' - int_small = b'\x54' - uint_large = b'\x70' - int_large = b'\x71' - ulong_0 = b'\x44' - ulong_small = b'\x53' - long_small = b'\x55' - ulong_large = b'\x80' - long_large = b'\x81' - float = b'\x72' - double = b'\x82' - timestamp = b'\x83' - uuid = b'\x98' - binary_small = b'\xA0' - binary_large = b'\xB0' - string_small = b'\xA1' - string_large = b'\xB1' - symbol_small = b'\xA3' - symbol_large = b'\xB3' - list_0 = b'\x45' - list_small = b'\xC0' - list_large = b'\xD0' - map_small = b'\xC1' - map_large = b'\xD1' - array_small = b'\xE0' - array_large = b'\xF0' - descriptor = b'\x00' +class FIELD(NamedTuple): + type: Union[AMQPTypes, FieldDefinition, ObjDefinition] + multiple: bool + + +class Performative(NamedTuple): + """Base for performatives.""" + _code: int = 0x00000000 + _definition: List[Optional[FIELD]] = [] + + +T = TypeVar('T', AMQPTypes) +V = TypeVar('V', AMQP_STRUCTURED_TYPES) + + +class AMQPDefinedType(TypedDict, Generic[T, V]): + """A wrapper for data that is going to be passed into the AMQP encoder.""" + TYPE: Optional[T] + VALUE: Optional[V] + + +class AMQPFieldType(TypedDict, Generic[V]): + """A wrapper for data that will be encoded as AMQP fields.""" + TYPE: Literal[AMQPTypes.map] + VALUE: List[Tuple[AMQPDefinedType[Literal[AMQPTypes.symbol], bytes], V]] + + +class AMQPAnnotationsType(TypedDict, Generic[V]): + """A wrapper for data that will be encoded as AMQP annotations.""" + TYPE: Literal[AMQPTypes.map] + VALUE: List[Tuple[AMQPDefinedType[Union[Literal[AMQPTypes.symbol], Literal[AMQPTypes.ulong]], Union[int, bytes]], V]] + + +NullDefinedType = AMQPDefinedType[Literal[AMQPTypes.null], None] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/utils.py index 72bf2dcce67a..1e99dd439949 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/utils.py @@ -4,62 +4,44 @@ # license information. #-------------------------------------------------------------------------- -import six import datetime from base64 import b64encode from hashlib import sha256 from hmac import HMAC +from typing import List, Union, Literal, Optional from urllib.parse import urlencode, quote_plus import time -from .types import TYPE, VALUE, AMQPTypes +from .types import AMQP_PRIMATIVE_TYPES, TYPE, VALUE, AMQPTypes, AMQPDefinedType from ._encode import encode_payload +from .message import Message -class UTC(datetime.tzinfo): - """Time Zone info for handling UTC""" +TZ_UTC = datetime.timezone.utc - def utcoffset(self, dt): - """UTF offset for UTC is 0.""" - return datetime.timedelta(0) - def tzname(self, dt): - """Timestamp representation.""" - return "Z" - - def dst(self, dt): - """No daylight saving for UTC.""" - return datetime.timedelta(hours=1) - - -try: - from datetime import timezone # pylint: disable=ungrouped-imports - - TZ_UTC = timezone.utc # type: ignore -except ImportError: - TZ_UTC = UTC() # type: ignore - - -def utc_from_timestamp(timestamp): +def utc_from_timestamp(timestamp: str) -> datetime.datetime: + """Convert string timestamp to datetime.datetime with UTC timezone.""" return datetime.datetime.fromtimestamp(timestamp, tz=TZ_UTC) -def utc_now(): +def utc_now() -> datetime.datetime: + """Get current datetime.datetime with UTC timezone""" return datetime.datetime.now(tz=TZ_UTC) -def encode(value, encoding='UTF-8'): - return value.encode(encoding) if isinstance(value, six.text_type) else value - - -def generate_sas_token(audience, policy, key, expiry=None): - """ - Generate a sas token according to the given audience, policy, key and expiry +def generate_sas_token( + audience: str, + policy: str, + key: str, + expiry: Optional[int] = None + ) -> str: + """Generate a sas token according to the given audience, policy, key and expiry. :param str audience: :param str policy: :param str key: - :param int expiry: abs expiry time + :param int expiry: Absolute expiry time. :rtype: str """ if not expiry: @@ -82,60 +64,117 @@ def generate_sas_token(audience, policy, key, expiry=None): return 'SharedAccessSignature ' + urlencode(result) -def add_batch(batch, message): - # Add a message to a batch +def add_batch(batch: Message, message: Message) -> None: + """Add a message to a batch. + + This will encode the message and add the bytes to the array in the + data field of the message. + + :param ~pyamqp.Message batch: The batch message to add to. + :param ~pyamqp.Message message: The message to append to the batch. + """ output = bytearray() encode_payload(output, message) batch[5].append(output) -def encode_str(data, encoding='utf-8'): +def _encode_str(data: Union[str, bytes], encoding: str) -> bytes: + """Encode a string with supplied encoding, otherwise return data unaltered. + + :param Union[str, bytes] data: A segment of an AMQP data payload. Either string or bytes. + :param str encoding: The encoding to use for any string data. + :rtype: bytes + """ try: return data.encode(encoding) except AttributeError: return data -def normalized_data_body(data, **kwargs): - # A helper method to normalize input into AMQP Data Body format +def normalized_data_body( + data: Union[str, bytes, List[Union[str, bytes]]], + **kwargs + ) -> List[bytes]: + """A helper method to normalize input into AMQP Data Body format. + + :param data: An AMQP data body to be formatted into a list of bytes. This might be bytes, string + or already formatted into a list of strings/bytes. + :keyword str encoding: The encoding to use for any string data. Default is UTF-8. + :rtype: List[bytes] + """ encoding = kwargs.get("encoding", "utf-8") if isinstance(data, list): - return [encode_str(item, encoding) for item in data] + return [_encode_str(item, encoding) for item in data] else: - return [encode_str(data, encoding)] + return [_encode_str(data, encoding)] def normalized_sequence_body(sequence): - # A helper method to normalize input into AMQP Sequence Body format + """A helper method to normalize input into AMQP Sequence Body format. + """ + # TODO: Why is this returning a list of lists? if isinstance(sequence, list) and all([isinstance(b, list) for b in sequence]): return sequence elif isinstance(sequence, list): return [sequence] -def get_message_encoded_size(message): +def get_message_encoded_size(message: Message) -> int: + """Get the size of a message once it has been encoded to an AMQP payload. + + :param ~pyamqp.Message message: The message to get the length of. + :rtype: int + """ output = bytearray() encode_payload(output, message) return len(output) -def amqp_long_value(value): - # A helper method to wrap a Python int as AMQP long +def amqp_long_value(value: int) -> AMQPDefinedType[Literal[AMQPTypes.long], int]: + """A helper method to wrap a Python int as AMQP long. + + :param int value: An integer to be defined as a long. + :rtype: Dict[str, Union[Literal[AMQPTypes.long], int]] + """ # TODO: wrapping one line in a function is expensive, find if there's a better way to do it return {TYPE: AMQPTypes.long, VALUE: value} -def amqp_uint_value(value): - # A helper method to wrap a Python int as AMQP uint +def amqp_uint_value(value: int) -> AMQPDefinedType[Literal[AMQPTypes.uint], int]: + """A helper method to wrap a Python int as AMQP uint. + + :param int value: An integer to be defined as a uint. + :rtype: Dict[str, Union[Literal[AMQPTypes.uint], int]] + """ return {TYPE: AMQPTypes.uint, VALUE: value} -def amqp_string_value(value): +def amqp_string_value(value: Union[str, bytes]) -> AMQPDefinedType[Literal[AMQPTypes.string], Union[str, bytes]]: + """A helper method to wrap a Python string or bytes as an AMQP string. + + This method will not encode string data to bytes, which will happen during + AMQP encode. + + :param Union[str, bytes] value: Bytes or string or be defined as a string. + :rtype: Dict[str, Union[Literal[AMQPTypes.string], int]] + """ return {TYPE: AMQPTypes.string, VALUE: value} -def amqp_symbol_value(value): +def amqp_symbol_value(value: Union[str, bytes]) -> AMQPDefinedType[Literal[AMQPTypes.symbol], Union[str, bytes]]: + """A helper method to wrap a Python string/bytes as AMQP symbol. + + :param int value: An integer to be defined as a long. + :rtype: Dict[str, Union[Literal[AMQPTypes.symbol], str, bytes]] + """ return {TYPE: AMQPTypes.symbol, VALUE: value} -def amqp_array_value(value): + +def amqp_array_value(value: List[AMQP_PRIMATIVE_TYPES]) -> AMQPDefinedType[Literal[AMQPTypes.array], List[AMQP_PRIMATIVE_TYPES]]: + """A helper method to wrap a Python list as an AMQP array. + + :param value: A list of homogeneous primary data types to define as an array. + :paramtype value: List[AMQP_PRIMATIVE_TYPES] + :rtype: Dict[str, Union[Literal[AMQPTypes.array], List[AMQP_PRIMATIVE_TYPES]]] + """ return {TYPE: AMQPTypes.array, VALUE: value}