Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs&ServiceBus] merge sb and eh pyamqp #26548

Merged
6 changes: 3 additions & 3 deletions eng/pipelines/templates/steps/build-artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ steps:
Write-Host "##vso[task.setvariable variable=PIP_INDEX_URL]https://pypi.python.org/simple"
displayName: Reset PIP Index For APIStubGen

#- template: /eng/pipelines/templates/steps/run_apistub.yml
# parameters:
# ServiceDirectory: ${{ parameters.ServiceDirectory }}
- template: /eng/pipelines/templates/steps/run_apistub.yml
parameters:
ServiceDirectory: ${{ parameters.ServiceDirectory }}

- ${{ parameters.BeforePublishSteps }}

Expand Down
2 changes: 0 additions & 2 deletions eng/tox/allowed_pylint_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,5 @@
"azure-agrifood-farming",
"azure-ai-language-questionanswering",
"azure-ai-language-conversations",
"azure-eventhub",
"azure-eventhub-checkpointstoreblob-aio",
"azure-developer-loadtesting"
]
1 change: 1 addition & 0 deletions eng/tox/mypy_hard_failure_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

MYPY_HARD_FAILURE_OPTED = [
"azure-core",
"azure-eventhub",
"azure-identity",
"azure-keyvault-administration",
"azure-keyvault-certificates",
Expand Down
5 changes: 2 additions & 3 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import collections
from typing import Any, Dict, Tuple, List, Optional, TYPE_CHECKING, cast, Union
try:
from typing import TypeAlias
from typing import TypeAlias # type: ignore
except ImportError:
from typing_extensions import TypeAlias
from datetime import timedelta
Expand All @@ -25,11 +25,10 @@
from azure.core.utils import parse_connection_string as core_parse_connection_string
from azure.core.pipeline.policies import RetryMode


try:
from ._transport._uamqp_transport import UamqpTransport
except ImportError:
UamqpTransport = None
UamqpTransport = None # type: ignore
from ._transport._pyamqp_transport import PyamqpTransport
from .exceptions import ClientClosedError
from ._configuration import Configuration
Expand Down
80 changes: 59 additions & 21 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@

if TYPE_CHECKING:
try:
from uamqp import Message as uamqp_Message, BatchMessage
from uamqp import ( # pylint: disable=unused-import
Message as uamqp_Message,
BatchMessage,
)
except ImportError:
uamqp_Message = None
BatchMessage = None
Expand Down Expand Up @@ -131,8 +134,8 @@ def __init__(
self._raw_amqp_message = AmqpAnnotatedMessage( # type: ignore
data_body=body, annotations={}, application_properties={}
)
self._uamqp_message = None
self._message = None
self._uamqp_message: Optional[Union[LegacyMessage, uamqp_Message]] = None
self._message: Message = None # type: ignore
self._raw_amqp_message.header = AmqpMessageHeader()
self._raw_amqp_message.properties = AmqpMessageProperties()
self.message_id = None
Expand Down Expand Up @@ -231,7 +234,11 @@ def _from_message(
event_data = cls(body="")
# pylint: disable=protected-access
event_data._message = message
event_data._raw_amqp_message = raw_amqp_message if raw_amqp_message else AmqpAnnotatedMessage(message=message)
event_data._raw_amqp_message = (
raw_amqp_message
if raw_amqp_message
else AmqpAnnotatedMessage(message=message)
)
return event_data

def _decode_non_data_body_as_str(self, encoding: str = "UTF-8") -> str:
Expand All @@ -252,7 +259,10 @@ def message(self) -> LegacyMessage:

:rtype: LegacyMessage
"""
warnings.warn("The `message` property is deprecated and will be removed in future versions.", DeprecationWarning)
warnings.warn(
"The `message` property is deprecated and will be removed in future versions.",
DeprecationWarning,
)
if not self._uamqp_message:
self._uamqp_message = LegacyMessage(
self._raw_amqp_message,
Expand All @@ -263,9 +273,12 @@ def message(self) -> LegacyMessage:
@message.setter
def message(self, value: "uamqp_Message") -> None:
"""DEPRECATED: Set the underlying Message.
This is deprecated and will be removed in a later release.
This is deprecated and will be removed in a later release.
"""
warnings.warn("The `message` property is deprecated and will be removed in future versions.", DeprecationWarning)
warnings.warn(
"The `message` property is deprecated and will be removed in future versions.",
DeprecationWarning,
)
self._uamqp_message = value

@property
Expand Down Expand Up @@ -534,14 +547,20 @@ def __init__(
self._partition_key = partition_key

self._message = self._amqp_transport.build_batch_message(data=[])
self._message = self._amqp_transport.set_message_partition_key(self._message, self._partition_key)
self._message = self._amqp_transport.set_message_partition_key(
self._message, self._partition_key
)
self._size = self._amqp_transport.get_batch_message_encoded_size(self._message)
self.max_size_in_bytes = max_size_in_bytes or self._amqp_transport.MAX_MESSAGE_LENGTH_BYTES
self.max_size_in_bytes = (
max_size_in_bytes or self._amqp_transport.MAX_MESSAGE_LENGTH_BYTES
)

self._count = 0
self._internal_events: List[Union[EventData, AmqpAnnotatedMessage]] = []
self._uamqp_message = (
None if PyamqpTransport.TIMEOUT_FACTOR == self._amqp_transport.TIMEOUT_FACTOR else self._message
None
if PyamqpTransport.TIMEOUT_FACTOR == self._amqp_transport.TIMEOUT_FACTOR
else self._message
)

def __repr__(self) -> str:
Expand All @@ -557,15 +576,18 @@ def __len__(self) -> int:
@classmethod
def _from_batch(
cls,
batch_data: Iterable[EventData],
batch_data: Iterable[Union[AmqpAnnotatedMessage, EventData]],
amqp_transport: AmqpTransport,
partition_key: Optional[AnyStr] = None,
*,
max_size_in_bytes: Optional[int] = None,
partition_id: Optional[str] = None,
) -> EventDataBatch:
outgoing_batch_data = [
transform_outbound_single_message(m, EventData, amqp_transport.to_outgoing_amqp_message) for m in batch_data
transform_outbound_single_message(
m, EventData, amqp_transport.to_outgoing_amqp_message
)
for m in batch_data
]
batch_data_instance = cls(
partition_key=partition_key,
Expand Down Expand Up @@ -596,20 +618,27 @@ def message(self) -> Union["BatchMessage", LegacyBatchMessage]:

:rtype: uamqp.BatchMessage or LegacyBatchMessage
"""
warnings.warn("The `message` property is deprecated and will be removed in future versions.", DeprecationWarning)
warnings.warn(
"The `message` property is deprecated and will be removed in future versions.",
DeprecationWarning,
)
if not self._uamqp_message:
message = AmqpAnnotatedMessage(message=Message(*self._message))
self._uamqp_message = LegacyBatchMessage(
message, to_outgoing_amqp_message=PyamqpTransport().to_outgoing_amqp_message
message,
to_outgoing_amqp_message=PyamqpTransport().to_outgoing_amqp_message,
)
return self._uamqp_message

@message.setter
def message(self, value: "BatchMessage") -> None:
"""DEPRECATED: Set the underlying BatchMessage.
This is deprecated and will be removed in a later release.
This is deprecated and will be removed in a later release.
"""
warnings.warn("The `message` property is deprecated and will be removed in future versions.", DeprecationWarning)
warnings.warn(
"The `message` property is deprecated and will be removed in future versions.",
DeprecationWarning,
)
self._uamqp_message = value

@property
Expand Down Expand Up @@ -638,10 +667,15 @@ def add(self, event_data: Union[EventData, AmqpAnnotatedMessage]) -> None:
)

if self._partition_key:
if outgoing_event_data.partition_key and outgoing_event_data.partition_key != self._partition_key:
raise ValueError("The partition key of event_data does not match the partition key of this batch.")
if (
outgoing_event_data.partition_key
and outgoing_event_data.partition_key != self._partition_key
):
raise ValueError(
"The partition key of event_data does not match the partition key of this batch."
)
if not outgoing_event_data.partition_key:
outgoing_event_data._message = self._amqp_transport.set_message_partition_key( # pylint: disable=protected-access
outgoing_event_data._message = self._amqp_transport.set_message_partition_key( # pylint: disable=protected-access
outgoing_event_data._message, # pylint: disable=protected-access
self._partition_key,
)
Expand All @@ -653,11 +687,15 @@ def add(self, event_data: Union[EventData, AmqpAnnotatedMessage]) -> None:
# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
# message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes.
size_after_add = (
self._size + event_data_size + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1]
self._size
+ event_data_size
+ _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1]
)

if size_after_add > self.max_size_in_bytes:
raise ValueError(f"EventDataBatch has reached its size limit: {self.max_size_in_bytes}")
raise ValueError(
f"EventDataBatch has reached its size limit: {self.max_size_in_bytes}"
)

self._amqp_transport.add_batch(self, outgoing_event_data, event_data)
self._size = size_after_add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ._pyamqp._connection import Connection
from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
from uamqp import Connection as uamqp_Connection
from ._transport._base import AmqpTransport

try:
from typing_extensions import Protocol
Expand Down Expand Up @@ -60,7 +61,7 @@ def __init__(self, **kwargs):
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")
self._amqp_transport = kwargs.get("amqp_transport")
self._amqp_transport: AmqpTransport = kwargs.pop("amqp_transport")

def get_connection(
self,
Expand Down
57 changes: 36 additions & 21 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
import logging
from collections import deque
from typing import TYPE_CHECKING, Callable, Dict, Optional, Any, Deque, Union
from typing import TYPE_CHECKING, Callable, Dict, Optional, Any, Deque, Union, cast

from ._common import EventData
from ._client_base import ConsumerProducerMixin
Expand All @@ -20,6 +20,11 @@
)

if TYPE_CHECKING:
from ._pyamqp import types
from ._pyamqp.message import Message
from ._pyamqp.authentication import JWTTokenAuth
from ._pyamqp.client import ReceiveClient

try:
from uamqp import ReceiveClient as uamqp_ReceiveClient, Message as uamqp_Message
from uamqp.types import AMQPType as uamqp_AMQPType
Expand All @@ -29,10 +34,6 @@
uamqp_Message = None
uamqp_AMQPType = None
uamqp_JWTTokenAuth = None
from ._pyamqp import types
from ._pyamqp.client import ReceiveClient
from ._pyamqp.message import Message
from ._pyamqp.authentication import JWTTokenAuth
from ._consumer_client import EventHubConsumerClient


Expand Down Expand Up @@ -75,7 +76,9 @@ class EventHubConsumer(
It is set to `False` by default.
"""

def __init__(self, client: "EventHubConsumerClient", source: str, **kwargs: Any) -> None:
def __init__(
self, client: "EventHubConsumerClient", source: str, **kwargs: Any
) -> None:
event_position = kwargs.get("event_position", None)
prefetch = kwargs.get("prefetch", 300)
owner_level = kwargs.get("owner_level", None)
Expand Down Expand Up @@ -103,39 +106,52 @@ def __init__(self, client: "EventHubConsumerClient", source: str, **kwargs: Any)
self._owner_level = owner_level
self._keep_alive = keep_alive
self._auto_reconnect = auto_reconnect
self._retry_policy = self._amqp_transport.create_retry_policy(self._client._config)
self._retry_policy = self._amqp_transport.create_retry_policy(
self._client._config
)
self._reconnect_backoff = 1
link_properties: Union[Dict[uamqp_AMQPType, uamqp_AMQPType], Dict[types.AMQPTypes, types.AMQPTypes]] = {}
link_properties: Dict[bytes, int] = {}
self._error = None
self._timeout = 0
self._idle_timeout = (idle_timeout * self._amqp_transport.TIMEOUT_FACTOR) if idle_timeout else None
self._idle_timeout = (
(idle_timeout * self._amqp_transport.TIMEOUT_FACTOR)
if idle_timeout
else None
)
self._partition = self._source.split("/")[-1]
self._name = f"EHConsumer-{uuid.uuid4()}-partition{self._partition}"
if owner_level is not None:
link_properties[EPOCH_SYMBOL] = int(owner_level)
link_property_timeout_ms = (
self._client._config.receive_timeout or self._timeout # pylint:disable=protected-access
self._client._config.receive_timeout
or self._timeout # pylint:disable=protected-access
) * self._amqp_transport.TIMEOUT_FACTOR
link_properties[TIMEOUT_SYMBOL] = int(link_property_timeout_ms)
self._link_properties = self._amqp_transport.create_link_properties(link_properties)
self._link_properties: Union[
Dict[uamqp_AMQPType, uamqp_AMQPType], Dict[types.AMQPTypes, types.AMQPTypes]
] = self._amqp_transport.create_link_properties(link_properties)
self._handler: Optional[Union[uamqp_ReceiveClient, ReceiveClient]] = None
self._track_last_enqueued_event_properties = (
track_last_enqueued_event_properties
)
self._message_buffer: Deque[uamqp_Message] = deque()
self._last_received_event: Optional[EventData] = None
self._receive_start_time: Optional[float]= None
self._receive_start_time: Optional[float] = None

def _create_handler(self, auth: Union[uamqp_JWTTokenAuth, JWTTokenAuth]) -> None:
source = self._amqp_transport.create_source(
self._source,
self._offset,
event_position_selector(self._offset, self._offset_inclusive)
event_position_selector(self._offset, self._offset_inclusive),
)
desired_capabilities = (
[RECEIVER_RUNTIME_METRIC_SYMBOL]
if self._track_last_enqueued_event_properties
else None
)
desired_capabilities = [RECEIVER_RUNTIME_METRIC_SYMBOL] if self._track_last_enqueued_event_properties else None

self._handler = self._amqp_transport.create_receive_client(
config=self._client._config, # pylint:disable=protected-access
config=self._client._config, # pylint:disable=protected-access
source=source,
auth=auth,
network_trace=self._client._config.network_tracing, # pylint:disable=protected-access
Expand All @@ -147,7 +163,8 @@ def _create_handler(self, auth: Union[uamqp_JWTTokenAuth, JWTTokenAuth]) -> None
keep_alive_interval=self._keep_alive,
client_name=self._name,
properties=create_properties(
self._client._config.user_agent, amqp_transport=self._amqp_transport # pylint:disable=protected-access
self._client._config.user_agent, # pylint:disable=protected-access
amqp_transport=self._amqp_transport,
),
desired_capabilities=desired_capabilities,
streaming_receive=True,
Expand All @@ -169,8 +186,7 @@ def _next_message_in_buffer(self):
return event_data

def _open(self) -> bool:
"""Open the EventHubConsumer/EventHubProducer using the supplied connection.
"""
"""Open the EventHubConsumer/EventHubProducer using the supplied connection."""
# pylint: disable=protected-access
if not self.running:
if self._handler:
Expand All @@ -180,6 +196,7 @@ def _open(self) -> bool:
conn = self._client._conn_manager.get_connection( # pylint: disable=protected-access
host=self._client._address.hostname, auth=auth
)
self._handler = cast("ReceiveClient", self._handler)
self._handler.open(connection=conn)
while not self._handler.client_ready():
time.sleep(0.05)
Expand All @@ -194,9 +211,7 @@ def receive(self, batch=False, max_batch_size=300, max_wait_time=None):
self._client._config.max_retries # pylint:disable=protected-access
)
self._receive_start_time = self._receive_start_time or time.time()
deadline = self._receive_start_time + (
max_wait_time or 0
)
deadline = self._receive_start_time + (max_wait_time or 0)
if len(self._message_buffer) < max_batch_size:
# TODO: the retry here is a bit tricky as we are using low-level api from the amqp client.
# Currently we create a new client with the latest received event's offset per retry.
Expand Down
Loading