Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Service Bus Namespace API Topic, Subscription, Rule and NamespaceProperties #12227

Merged
merged 25 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7d9ae38
Since resource names can be programmatically constructed, add scrubbe…
KieranBrantnerMagee Jun 8, 2020
ce93e76
Merge remote-tracking branch 'upstream/master' into kibrantn/misc/fix…
KieranBrantnerMagee Jun 9, 2020
63179c3
In CI (but not locally) self.test_class_instance was not set before _…
KieranBrantnerMagee Jun 9, 2020
5b97cae
Merge remote-tracking branch 'upstream/master' into kibrantn/misc/fix…
KieranBrantnerMagee Jun 25, 2020
3d31d36
Merge SB management client to central repo dev branch (#12205)
YijunXieMS Jun 25, 2020
f5cd8f6
resource_moniker isn't guaranteed to be populated, so use moniker ins…
KieranBrantnerMagee Jun 26, 2020
ff84d1c
Update validation and type hints
YijunXieMS Jun 27, 2020
69ba610
Update models from swagger
YijunXieMS Jun 27, 2020
ba58755
Fix pylint
YijunXieMS Jun 29, 2020
3c4df52
Fix update_subscription
YijunXieMS Jun 29, 2020
000cf04
Fix mypy errors
YijunXieMS Jun 29, 2020
944b86a
Remove start_index and max_page_size of list operations
YijunXieMS Jun 30, 2020
c75c34a
add test and fix minor bugs
yunhaoling Jun 30, 2020
4cedf5b
Merge branch 'servicebus_mgmt_client' of https://github.com/Azure/azu…
yunhaoling Jun 30, 2020
3a0be8b
fix pylint
yunhaoling Jun 30, 2020
7e8e863
Merge remote-tracking branch 'kieran/kibrantn/misc/fix-cached-prepare…
yunhaoling Jun 30, 2020
efa82aa
skip test of list with param and update recordings
yunhaoling Jun 30, 2020
8755448
Update msrest dependency to 0.6.17
YijunXieMS Jun 30, 2020
ce77f03
Fix code review feedback
YijunXieMS Jul 1, 2020
f4aa572
Fix pylint and mypy error
YijunXieMS Jul 1, 2020
1516c00
Small fix type annotation
YijunXieMS Jul 1, 2020
0effa08
Do not call copy() in update_xxx
YijunXieMS Jul 1, 2020
f487487
Update ivar
YijunXieMS Jul 1, 2020
6e11494
Link to swagger file in repo
YijunXieMS Jul 1, 2020
9914b0f
clear queue/topic before the actual test
yunhaoling Jul 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import cast
from xml.etree.ElementTree import ElementTree


import urllib.parse as urlparse

from azure.servicebus.management import _constants as constants
from ...management._handle_response_error import _handle_response_error

# This module defines functions get_next_template and extract_data_template.
# Application code uses functools.partial to substantialize their params and builds an
# azure.core.async_paging.AsyncItemPaged instance with the two substantialized functions.

# The following is an ATOM feed XML list of QueueDescription with page size = 2.
# Tag <feed> has 2 (the page size) children <entry> tags.
# Tag <link rel="next" .../> tells the link to the next page.
# The whole XML will be deserialized into an XML ElementTree.
# Then model class QueueDescriptionFeed deserializes the ElementTree into a QueueDescriptionFeed instance.
# (QueueDescriptionFeed is defined in file ../../management/_generated/models/_models.py and _models_py3.py)
# Function get_next_template gets the next page of XML data like this one and returns the ElementTree.
# Function extract_data_template deserialize data from the ElementTree and provide link to the next page.
# azure.core.async_paging.AsyncItemPaged orchestrates the data flow between them.

# <feed xmlns="http://www.w3.org/2005/Atom">
# <title type="text">Queues</title>
# <id>https://servicebusname.servicebus.windows.net/$Resources/queues?$skip=0&amp;$top=2&amp;api-version=2017-04</id>
# <updated>2020-06-30T23:49:41Z</updated>
# <link rel="self" href="https://servicebusname.servicebus.windows.net/$Resources/queues?
# $skip=0&amp;$top=2&amp;api-version=2017-04"/>
# <link rel="next" href="https://servicebusname.servicebus.windows.net/$Resources/queues?
# %24skip=2&amp;%24top=2&amp;api-version=2017-04"/>
#
# <entry xml:base="https://servicebusname.servicebus.windows.net/$Resources/queues?
# $skip=0&amp;$top=2&amp;api-version=2017-04">
# <id>https://servicebusname.servicebus.windows.net/5?api-version=2017-04</id>
# <title type="text">5</title>
# <published>2020-06-05T00:24:34Z</published>
# <updated>2020-06-25T05:57:29Z</updated>
# <author>
# <name>servicebusname</name>
# </author>
# <link rel="self" href="../5?api-version=2017-04"/>
# <content type="application/xml">
# <QueueDescription xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"
# xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
# ...
# </QueueDescription>
# </content>
# </entry>
# <entry xml:base="https://servicebusname.servicebus.windows.net/$Resources/queues?
# $skip=0&amp;$top=2&amp;api-version=2017-04">
# <id>https://servicebusname.servicebus.windows.net/6?api-version=2017-04</id>
# <title type="text">6</title>
# <published>2020-06-15T19:49:35Z</published>
# <updated>2020-06-15T19:49:35Z</updated>
# <author>
# <name>servicebusname</name>
# </author>
# <link rel="self" href="../6?api-version=2017-04"/>
# <content type="application/xml">
# <QueueDescription xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect"
# xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
# ...
# </QueueDescription>
# </content>
# </entry>
# </feed>

async def extract_data_template(feed_class, convert, feed_element):
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved
annatisch marked this conversation as resolved.
Show resolved Hide resolved
"""A function that will be partialized to build a function used by AsyncItemPaged.

It deserializes the ElementTree returned from function `get_next_template`, returns data in an iterator and
the link to next page.

azure.core.async_paging.AsyncItemPaged will use the returned next page to call a partial function created
from `get_next_template` to fetch data of next page.

"""
deserialized = feed_class.deserialize(feed_element)
list_of_qd = [convert(x) if convert else x for x in deserialized.entry]
next_link = None
# when the response xml has two <link> tags, the 2nd if the next-page link.
if deserialized.link and len(deserialized.link) == 2:
next_link = deserialized.link[1].href
return next_link, iter(list_of_qd) # when next_page is None, AsyncPagedItem will stop fetch next page data.


async def get_next_template(list_func, *args, start_index=0, max_page_size=100, **kwargs):
"""Call list_func to get the XML data and deserialize it to XML ElementTree.

azure.core.async_paging.AsyncItemPaged will call `extract_data_template` and use the returned
XML ElementTree to call a partial function created from `extrat_data_template`.

"""
api_version = constants.API_VERSION
if args[0]: # It's next link. It's None for the first page.
queries = urlparse.parse_qs(urlparse.urlparse(args[0]).query)
start_index = int(queries[constants.LIST_OP_SKIP][0])
max_page_size = int(queries[constants.LIST_OP_TOP][0])
api_version = queries[constants.API_VERSION_PARAM_NAME][0]
with _handle_response_error():
feed_element = cast(
ElementTree,
await list_func(
skip=start_index, top=max_page_size,
api_version=api_version,
**kwargs
)
)
return feed_element
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,29 @@

from ._management_client import ServiceBusManagementClient
from ._generated.models import AuthorizationRule, MessageCountDetails, \
AccessRights, EntityAvailabilityStatus, EntityStatus
from ._models import QueueRuntimeInfo, QueueDescription
AccessRights, EntityAvailabilityStatus, EntityStatus, \
NamespaceProperties, MessagingSku, NamespaceType

from ._models import QueueRuntimeInfo, QueueDescription, TopicRuntimeInfo, TopicDescription, \
SubscriptionDescription, SubscriptionRuntimeInfo, RuleDescription, \
TrueRuleFilter, FalseRuleFilter, SqlRuleFilter, CorrelationRuleFilter, \
SqlRuleAction

__all__ = [
"ServiceBusManagementClient",
'ServiceBusManagementClient',
'AuthorizationRule',
'MessageCountDetails',
'QueueDescription',
'QueueRuntimeInfo',
'TopicDescription',
'TopicRuntimeInfo',
'SubscriptionDescription',
'SubscriptionRuntimeInfo',
'AccessRights',
'EntityAvailabilityStatus',
'EntityStatus',
'RuleDescription',
'CorrelationRuleFilter', 'SqlRuleFilter', 'TrueRuleFilter', 'FalseRuleFilter',
'SqlRuleAction',
'NamespaceProperties', 'MessagingSku', 'NamespaceType',
]
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

API_VERSION_PARAM_NAME = "api-version"
API_VERSION = "2017-04"
ENTRY_TAG = "{http://www.w3.org/2005/Atom}entry"
CONTENT_TAG = "{http://www.w3.org/2005/Atom}content"
Expand All @@ -11,3 +12,7 @@
TITLE_TAG = "{http://www.w3.org/2005/Atom}title"

ENTITY_TYPE_QUEUES = "queues"
ENTITY_TYPE_TOPICS = "topics"

LIST_OP_SKIP = "$skip"
LIST_OP_TOP = "$top"
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@
from typing import Any, Optional

from ._configuration import ServiceBusManagementClientConfiguration
from .operations import QueueOperations
from .operations import EntityOperations
from .operations import ServiceBusManagementClientOperationsMixin
from .operations import SubscriptionOperations
from .operations import RuleOperations
from .operations import NamespaceOperations
from . import models


class ServiceBusManagementClient(ServiceBusManagementClientOperationsMixin):
"""Azure Service Bus client for managing Queues, Topics, and Subscriptions.

:ivar queue: QueueOperations operations
:vartype queue: azure.servicebus.management._generated.operations.QueueOperations
:ivar entity: EntityOperations operations
:vartype entity: azure.servicebus.management._generated.operations.EntityOperations
:ivar subscription: SubscriptionOperations operations
:vartype subscription: azure.servicebus.management._generated.operations.SubscriptionOperations
:ivar rule: RuleOperations operations
:vartype rule: azure.servicebus.management._generated.operations.RuleOperations
:ivar namespace: NamespaceOperations operations
:vartype namespace: azure.servicebus.management._generated.operations.NamespaceOperations
:param endpoint: The Service Bus fully qualified domain name.
:type endpoint: str
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
Expand All @@ -45,7 +54,13 @@ def __init__(
self._serialize = Serializer(client_models)
self._deserialize = Deserializer(client_models)

self.queue = QueueOperations(
self.entity = EntityOperations(
self._client, self._config, self._serialize, self._deserialize)
self.subscription = SubscriptionOperations(
self._client, self._config, self._serialize, self._deserialize)
self.rule = RuleOperations(
self._client, self._config, self._serialize, self._deserialize)
self.namespace = NamespaceOperations(
self._client, self._config, self._serialize, self._deserialize)

def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@
from msrest import Deserializer, Serializer

from ._configuration_async import ServiceBusManagementClientConfiguration
from .operations_async import QueueOperations
from .operations_async import EntityOperations
from .operations_async import ServiceBusManagementClientOperationsMixin
from .operations_async import SubscriptionOperations
from .operations_async import RuleOperations
from .operations_async import NamespaceOperations
from .. import models


class ServiceBusManagementClient(ServiceBusManagementClientOperationsMixin):
"""Azure Service Bus client for managing Queues, Topics, and Subscriptions.

:ivar queue: QueueOperations operations
:vartype queue: azure.servicebus.management._generated.aio.operations_async.QueueOperations
:ivar entity: EntityOperations operations
:vartype entity: azure.servicebus.management._generated.aio.operations_async.EntityOperations
:ivar subscription: SubscriptionOperations operations
:vartype subscription: azure.servicebus.management._generated.aio.operations_async.SubscriptionOperations
:ivar rule: RuleOperations operations
:vartype rule: azure.servicebus.management._generated.aio.operations_async.RuleOperations
:ivar namespace: NamespaceOperations operations
:vartype namespace: azure.servicebus.management._generated.aio.operations_async.NamespaceOperations
:param endpoint: The Service Bus fully qualified domain name.
:type endpoint: str
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
Expand All @@ -40,7 +49,13 @@ def __init__(
self._serialize = Serializer(client_models)
self._deserialize = Deserializer(client_models)

self.queue = QueueOperations(
self.entity = EntityOperations(
self._client, self._config, self._serialize, self._deserialize)
self.subscription = SubscriptionOperations(
self._client, self._config, self._serialize, self._deserialize)
self.rule = RuleOperations(
self._client, self._config, self._serialize, self._deserialize)
self.namespace = NamespaceOperations(
self._client, self._config, self._serialize, self._deserialize)

async def close(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from ._queue_operations_async import QueueOperations
from ._entity_operations_async import EntityOperations
from ._service_bus_management_client_operations_async import ServiceBusManagementClientOperationsMixin
from ._subscription_operations_async import SubscriptionOperations
from ._rule_operations_async import RuleOperations
from ._namespace_operations_async import NamespaceOperations

__all__ = [
'QueueOperations',
'EntityOperations',
'ServiceBusManagementClientOperationsMixin',
'SubscriptionOperations',
'RuleOperations',
'NamespaceOperations',
]
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]

class QueueOperations:
"""QueueOperations async operations.
class EntityOperations:
"""EntityOperations async operations.

You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
Expand All @@ -41,17 +41,17 @@ def __init__(self, client, config, serializer, deserializer) -> None:

async def get(
self,
queue_name: str,
entity_name: str,
enrich: Optional[bool] = False,
api_version: Optional[str] = "2017_04",
**kwargs
) -> object:
"""Get the details about the Queue with the given queueName.
"""Get the details about the Queue or Topic with the given entityName.

Get Queue.
Get Queue or Topic.

:param queue_name: The name of the queue relative to the Service Bus namespace.
:type queue_name: str
:param entity_name: The name of the queue or topic relative to the Service Bus namespace.
:type entity_name: str
:param enrich: A query parameter that sets enrich to true or false.
:type enrich: bool
:param api_version: Api Version.
Expand All @@ -69,7 +69,7 @@ async def get(
url = self.get.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'queueName': self._serialize.url("queue_name", queue_name, 'str', min_length=1),
'entityName': self._serialize.url("entity_name", entity_name, 'str', min_length=1),
}
url = self._client.format_url(url, **path_format_arguments)

Expand Down Expand Up @@ -100,21 +100,21 @@ async def get(
return cls(pipeline_response, deserialized, {})

return deserialized
get.metadata = {'url': '/{queueName}'} # type: ignore
get.metadata = {'url': '/{entityName}'} # type: ignore

async def put(
self,
queue_name: str,
entity_name: str,
request_body: object,
api_version: Optional[str] = "2017_04",
if_match: Optional[str] = None,
**kwargs
) -> object:
"""Create or update a queue at the provided queuePath.
"""Create or update a queue or topic at the provided entityName.

:param queue_name: The name of the queue relative to the Service Bus namespace.
:type queue_name: str
:param request_body: Parameters required to make or edit a queue.
:param entity_name: The name of the queue or topic relative to the Service Bus namespace.
:type entity_name: str
:param request_body: Parameters required to make or edit a queue or topic.
:type request_body: object
:param api_version: Api Version.
:type api_version: str
Expand All @@ -131,13 +131,13 @@ async def put(
cls = kwargs.pop('cls', None) # type: ClsType[object]
error_map = {404: ResourceNotFoundError, 409: ResourceExistsError}
error_map.update(kwargs.pop('error_map', {}))
content_type = kwargs.pop("content_type", "application/xml")
content_type = kwargs.pop("content_type", "application/atom+xml")

# Construct URL
url = self.put.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'queueName': self._serialize.url("queue_name", queue_name, 'str', min_length=1),
'entityName': self._serialize.url("entity_name", entity_name, 'str', min_length=1),
}
url = self._client.format_url(url, **path_format_arguments)

Expand Down Expand Up @@ -178,20 +178,20 @@ async def put(
return cls(pipeline_response, deserialized, {})

return deserialized
put.metadata = {'url': '/{queueName}'} # type: ignore
put.metadata = {'url': '/{entityName}'} # type: ignore

async def delete(
self,
queue_name: str,
entity_name: str,
api_version: Optional[str] = "2017_04",
**kwargs
) -> object:
"""Delete the Queue with the given queueName.
"""Delete the Queue or Topic with the given entityName.

Delete Queue.
Delete Queue or Topic.

:param queue_name: The name of the queue relative to the Service Bus namespace.
:type queue_name: str
:param entity_name: The name of the queue or topic relative to the Service Bus namespace.
:type entity_name: str
:param api_version: Api Version.
:type api_version: str
:keyword callable cls: A custom type or function that will be passed the direct response
Expand All @@ -207,7 +207,7 @@ async def delete(
url = self.delete.metadata['url'] # type: ignore
path_format_arguments = {
'endpoint': self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
'queueName': self._serialize.url("queue_name", queue_name, 'str', min_length=1),
'entityName': self._serialize.url("entity_name", entity_name, 'str', min_length=1),
}
url = self._client.format_url(url, **path_format_arguments)

Expand Down Expand Up @@ -236,4 +236,4 @@ async def delete(
return cls(pipeline_response, deserialized, {})

return deserialized
delete.metadata = {'url': '/{queueName}'} # type: ignore
delete.metadata = {'url': '/{entityName}'} # type: ignore
Loading