Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
swathipil committed Aug 2, 2022
1 parent 9625aad commit 0a6d1b8
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 51 deletions.
3 changes: 1 addition & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,8 @@ def __init__(
**kwargs,
) -> None:
# TODO: this changes API, check with Anna if valid -
# Need move out message creation to right before sending.
# Might need move out message creation to right before sending.
# Might take more time to loop through events and add them all to batch in `send` than in `add` here
# Default async vs sync might cause issues.
self._amqp_transport = kwargs.pop("amqp_transport", UamqpTransport)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def _send_event_data(
last_exception: Optional[Exception] = None,
) -> None:
if self._unsent_events:
self._amqp_transport.send_messages(
await self._amqp_transport.send_messages(
self, timeout_time, last_exception, _LOGGER
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,16 @@ def to_outgoing_amqp_message(annotated_message):
"""

@staticmethod
async def get_batch_message_encoded_size(message):
def get_batch_message_encoded_size(message):
"""
Gets the batch message encoded size given an underlying Message.
:param uamqp.BatchMessage message: Message to get encoded size of.
:rtype: int
"""
return await message.gather()[0].get_message_encoded_size()

@staticmethod
@abstractmethod
async def get_message_encoded_size(message):
"""
Gets the message encoded size given an underlying Message.
:param uamqp.Message or pyamqp.Message message: Message to get encoded size of.
:rtype: int
"""

@staticmethod
@abstractmethod
async def get_remote_max_message_size(handler):
def get_remote_max_message_size(handler):
"""
Returns max peer message size.
:param AMQPClient handler: Client to get remote max message size on link from.
Expand All @@ -67,15 +57,15 @@ async def get_remote_max_message_size(handler):

@staticmethod
@abstractmethod
async def create_retry_policy(config):
def create_retry_policy(config):
"""
Creates the error retry policy.
:param ~azure.eventhub._configuration.Configuration config: Configuration.
"""

@staticmethod
@abstractmethod
async def create_link_properties(link_properties):
def create_link_properties(link_properties):
"""
Creates and returns the link properties.
:param dict[bytes, int] link_properties: The dict of symbols and corresponding values.
Expand All @@ -84,7 +74,7 @@ async def create_link_properties(link_properties):

@staticmethod
@abstractmethod
async def create_send_client(*, config, **kwargs):
def create_send_client(*, config, **kwargs):
"""
Creates and returns the send client.
:param ~azure.eventhub._configuration.Configuration config: The configuration.
Expand Down Expand Up @@ -113,7 +103,7 @@ async def send_messages(producer, timeout_time, last_exception, logger):

@staticmethod
@abstractmethod
async def set_message_partition_key(message, partition_key, **kwargs):
def set_message_partition_key(message, partition_key, **kwargs):
"""Set the partition key as an annotation on a uamqp message.
:param message: The message to update.
Expand All @@ -123,18 +113,7 @@ async def set_message_partition_key(message, partition_key, **kwargs):

@staticmethod
@abstractmethod
async def add_batch(batch_message, outgoing_event_data, event_data):
"""
Add EventData to the data body of the BatchMessage.
:param batch_message: BatchMessage to add data to.
:param outgoing_event_data: Transformed EventData for sending.
:param event_data: EventData to add to internal batch events. uamqp use only.
:rtype: None
"""

@staticmethod
@abstractmethod
async def create_source(source, offset, selector):
def create_source(source, offset, selector):
"""
Creates and returns the Source.
Expand All @@ -145,7 +124,7 @@ async def create_source(source, offset, selector):

@staticmethod
@abstractmethod
async def create_receive_client(*, config, **kwargs):
def create_receive_client(*, config, **kwargs):
"""
Creates and returns the receive client.
:param ~azure.eventhub._configuration.Configuration config: The configuration.
Expand Down Expand Up @@ -192,22 +171,14 @@ async def create_token_auth(auth_uri, get_token, token_type, config, **kwargs):

@staticmethod
@abstractmethod
async def create_mgmt_client(address, mgmt_auth, config):
def create_mgmt_client(address, mgmt_auth, config):
"""
Creates and returns the mgmt AMQP client.
:param _Address address: Required. The Address.
:param JWTTokenAuth mgmt_auth: Auth for client.
:param ~azure.eventhub._configuration.Configuration config: The configuration.
"""

@staticmethod
@abstractmethod
async def get_updated_token(mgmt_auth):
"""
Return updated auth token.
:param mgmt_auth: Auth.
"""

@staticmethod
@abstractmethod
async def mgmt_client_request(mgmt_client, mgmt_msg, **kwargs):
Expand All @@ -223,7 +194,7 @@ async def mgmt_client_request(mgmt_client, mgmt_msg, **kwargs):

@staticmethod
@abstractmethod
async def get_error(error, message, *, condition=None):
def get_error(error, message, *, condition=None):
"""
Gets error and passes in error message, and, if applicable, condition.
:param error: The error to raise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,6 @@ class UamqpTransportAsync(UamqpTransport, AmqpTransportAsync):
Class which defines uamqp-based methods used by the producer and consumer.
"""

@staticmethod
async def get_batch_message_encoded_size(message):
"""
Gets the batch message encoded size given an underlying Message.
:param uamqp.BatchMessage message: Message to get encoded size of.
:rtype: int
"""
return await message.gather()[0].get_message_encoded_size()

@staticmethod
def create_send_client(*, config, **kwargs): # pylint:disable=unused-argument
"""
Expand Down

0 comments on commit 0a6d1b8

Please sign in to comment.