From 3875e84abe65aaf1c46304ad024bdd05eb11dc35 Mon Sep 17 00:00:00 2001 From: swathipil <76007337+swathipil@users.noreply.github.com> Date: Thu, 21 Oct 2021 12:27:58 -0700 Subject: [PATCH] [SchemaRegistry] add async version of avro serializer (#21026) fixes: #20709 --- .../CHANGELOG.md | 2 + .../_schema_registry_avro_serializer.py | 14 +- .../serializer/avroserializer/_utils.py | 13 + .../serializer/avroserializer/aio/__init__.py | 30 +++ .../avroserializer/aio/_async_lru.py | 230 ++++++++++++++++++ .../_schema_registry_avro_serializer_async.py | 163 +++++++++++++ .../dev_requirements.txt | 2 +- .../samples/README.md | 15 +- .../async_samples/avro_serializer_async.py | 93 +++++++ .../eventhub_receive_integration_async.py | 73 ++++++ .../eventhub_send_integration_async.py | 80 ++++++ .../{ => sync_samples}/avro_serializer.py | 0 .../eventhub_receive_integration.py | 0 .../eventhub_send_integration.py | 0 .../tests/conftest.py | 16 ++ ...serializer_with_auto_register_schemas.yaml | 24 +- ...ializer_without_auto_register_schemas.yaml | 24 +- ...serializer_with_auto_register_schemas.yaml | 102 ++++++++ ...ializer_without_auto_register_schemas.yaml | 102 ++++++++ .../tests/test_avro_serializer.py | 6 + .../tests/test_avro_serializer_async.py | 95 ++++++++ 21 files changed, 1043 insertions(+), 41 deletions(-) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/{ => sync_samples}/avro_serializer.py (100%) rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/{ => sync_samples}/eventhub_receive_integration.py (100%) rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/{ => sync_samples}/eventhub_send_integration.py (100%) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md b/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md index eb4f375965e6..ea56671d39ce 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Async version of `AvroSerializer` has been added under `azure.schemaregistry.serializer.avroserializer.aio`. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py index 58613a1f9a52..387c9007a299 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py @@ -29,10 +29,10 @@ from backports.functools_lru_cache import lru_cache from io import BytesIO from typing import Any, Dict, Mapping -import avro from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX from ._avro_serializer import AvroObjectSerializer +from ._utils import parse_schema class AvroSerializer(object): @@ -45,7 +45,7 @@ class AvroSerializer(object): :paramtype client: ~azure.schemaregistry.SchemaRegistryClient :keyword str group_name: Required. Schema group under which schema should be registered. :keyword bool auto_register_schemas: When true, register new schemas passed to serialize. - Otherwise, and by default, fail if it has not been pre-registered in the registry. + Otherwise, and by default, serialization will fail if the schema has not been pre-registered in the registry. """ @@ -89,8 +89,7 @@ def _get_schema_id(self, schema_name, schema_str, **kwargs): :param schema_name: Name of the schema :type schema_name: str - :param schema: Schema object - :type schema: avro.schema.Schema + :param str schema_str: Schema string :return: Schema Id :rtype: str """ @@ -114,11 +113,6 @@ def _get_schema(self, schema_id, **kwargs): ).schema_definition return schema_str - @classmethod - @lru_cache(maxsize=128) - def _parse_schema(cls, schema): - return avro.schema.parse(schema) - def serialize(self, value, **kwargs): # type: (Mapping[str, Any], Any) -> bytes """ @@ -137,7 +131,7 @@ def serialize(self, value, **kwargs): except KeyError as e: raise TypeError("'{}' is a required keyword.".format(e.args[0])) - cached_schema = AvroSerializer._parse_schema(raw_input_schema) + cached_schema = parse_schema(raw_input_schema) record_format_identifier = b"\0\0\0\0" schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) data_bytes = self._avro_serializer.serialize(value, cached_schema) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py new file mode 100644 index 000000000000..6a89471e996b --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py @@ -0,0 +1,13 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +try: + from functools import lru_cache +except ImportError: + from backports.functools_lru_cache import lru_cache +import avro + +@lru_cache(maxsize=128) +def parse_schema(schema): + return avro.schema.parse(schema) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py new file mode 100644 index 000000000000..14d743e38828 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py @@ -0,0 +1,30 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from ._schema_registry_avro_serializer_async import AvroSerializer + +__all__ = [ + "AvroSerializer" +] diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py new file mode 100644 index 000000000000..c51dfdf0c9dd --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py @@ -0,0 +1,230 @@ +# -------------------------------------------------------------------------- +# The MIT License +# +# Copyright (c) 2018 aio-libs team https://github.com/aio-libs/ +# Copyright (c) 2017 Ocean S. A. https://ocean.io/ +# Copyright (c) 2016-2017 WikiBusiness Corporation http://wikibusiness.org/ +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# -------------------------------------------------------------------------- +# Copying over `async_lru.py`[https://github.com/aio-libs/async-lru/blob/master/async_lru.py] +# from `aio-libs`[https://github.com/aio-libs/async-lru] for the following reasons: +# 1. There has not been an official release of `async_lru` in 2 years. +# 2. The last update to the library was a year ago, so it seems the library is +# not being actively maintained. + +import asyncio +from collections import OrderedDict +from functools import _CacheInfo, _make_key, partial, wraps + + +__version__ = "1.0.2" + +__all__ = ("alru_cache",) + + +def unpartial(fn): + while hasattr(fn, "func"): + fn = fn.func + + return fn + + +def _done_callback(fut, task): + if task.cancelled(): + fut.cancel() + return + + exc = task.exception() + if exc is not None: + fut.set_exception(exc) + return + + fut.set_result(task.result()) + + +def _cache_invalidate(wrapped, typed, *args, **kwargs): + # pylint: disable=protected-access + key = _make_key(args, kwargs, typed) + + exists = key in wrapped._cache + + if exists: + wrapped._cache.pop(key) + + return exists + + +def _cache_clear(wrapped): + # pylint: disable=protected-access + wrapped.hits = wrapped.misses = 0 + wrapped._cache = OrderedDict() + wrapped.tasks = set() + + +def _open(wrapped): + if not wrapped.closed: + raise RuntimeError("alru_cache is not closed") + + # pylint: disable=protected-access + was_closed = ( + wrapped.hits == wrapped.misses == len(wrapped.tasks) == len(wrapped._cache) == 0 + ) + + if not was_closed: + raise RuntimeError("alru_cache was not closed correctly") + + wrapped.closed = False + + +def _close(wrapped, *, cancel=False, return_exceptions=True): + if wrapped.closed: + raise RuntimeError("alru_cache is closed") + + wrapped.closed = True + + if cancel: + for task in wrapped.tasks: + if not task.done(): # not sure is it possible + task.cancel() + + return _wait_closed(wrapped, return_exceptions=return_exceptions) + + +async def _wait_closed(wrapped, *, return_exceptions): + wait_closed = asyncio.gather(*wrapped.tasks, return_exceptions=return_exceptions) + + wait_closed.add_done_callback(partial(_close_waited, wrapped)) + + ret = await wait_closed + + # hack to get _close_waited callback to be executed + await asyncio.sleep(0) + + return ret + + +def _close_waited(wrapped, _): + wrapped.cache_clear() + + +def _cache_info(wrapped, maxsize): + # pylint: disable=protected-access + return _CacheInfo( + wrapped.hits, + wrapped.misses, + maxsize, + len(wrapped._cache), + ) + + +def __cache_touch(wrapped, key): + # pylint: disable=protected-access + try: + wrapped._cache.move_to_end(key) + except KeyError: # not sure is it possible + pass + + +def _cache_hit(wrapped, key): + wrapped.hits += 1 + __cache_touch(wrapped, key) + + +def _cache_miss(wrapped, key): + wrapped.misses += 1 + __cache_touch(wrapped, key) + + +def alru_cache( + fn=None, + maxsize=128, + typed=False, + *, + cache_exceptions=True, +): + def wrapper(fn): + # pylint: disable=protected-access + _origin = unpartial(fn) + + if not asyncio.iscoroutinefunction(_origin): + raise RuntimeError("Coroutine function is required, got {}".format(fn)) + + # functools.partialmethod support + if hasattr(fn, "_make_unbound_method"): + fn = fn._make_unbound_method() + + @wraps(fn) + async def wrapped(*fn_args, **fn_kwargs): + if wrapped.closed: + raise RuntimeError("alru_cache is closed for {}".format(wrapped)) + + loop = asyncio.get_event_loop() + + key = _make_key(fn_args, fn_kwargs, typed) + + fut = wrapped._cache.get(key) + + if fut is not None: + if not fut.done(): + _cache_hit(wrapped, key) + return await asyncio.shield(fut) + + exc = fut._exception + + if exc is None or cache_exceptions: + _cache_hit(wrapped, key) + return fut.result() + + # exception here and cache_exceptions == False + wrapped._cache.pop(key) + + fut = loop.create_future() + task = loop.create_task(fn(*fn_args, **fn_kwargs)) + task.add_done_callback(partial(_done_callback, fut)) + + wrapped.tasks.add(task) + task.add_done_callback(wrapped.tasks.remove) + + wrapped._cache[key] = fut + + if maxsize is not None and len(wrapped._cache) > maxsize: + wrapped._cache.popitem(last=False) + + _cache_miss(wrapped, key) + return await asyncio.shield(fut) + + _cache_clear(wrapped) + wrapped._origin = _origin + wrapped.closed = False + wrapped.cache_info = partial(_cache_info, wrapped, maxsize) + wrapped.cache_clear = partial(_cache_clear, wrapped) + wrapped.invalidate = partial(_cache_invalidate, wrapped, typed) + wrapped.close = partial(_close, wrapped) + wrapped.open = partial(_open, wrapped) + + return wrapped + + if fn is None: + return wrapper + + if callable(fn) or hasattr(fn, "_make_unbound_method"): + return wrapper(fn) + + raise NotImplementedError("{} decorating is not supported".format(fn)) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py new file mode 100644 index 000000000000..e0e9f28e43f7 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py @@ -0,0 +1,163 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from io import BytesIO +from typing import Any, Dict, Mapping +from ._async_lru import alru_cache +from .._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX +from .._avro_serializer import AvroObjectSerializer +from .._utils import parse_schema + + +class AvroSerializer(object): + """ + AvroSerializer provides the ability to serialize and deserialize data according + to the given avro schema. It would automatically register, get and cache the schema. + + :keyword client: Required. The schema registry client + which is used to register schema and retrieve schema from the service. + :paramtype client: ~azure.schemaregistry.aio.SchemaRegistryClient + :keyword str group_name: Required. Schema group under which schema should be registered. + :keyword bool auto_register_schemas: When true, register new schemas passed to serialize. + Otherwise, and by default, serialization will fail if the schema has not been pre-registered in the registry. + + """ + + def __init__(self, **kwargs): + # type: (Any) -> None + try: + self._schema_group = kwargs.pop("group_name") + self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient" + except KeyError as e: + raise TypeError("'{}' is a required keyword.".format(e.args[0])) + self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec")) + self._auto_register_schemas = kwargs.get("auto_register_schemas", False) + self._auto_register_schema_func = ( + self._schema_registry_client.register_schema + if self._auto_register_schemas + else self._schema_registry_client.get_schema_properties + ) + + async def __aenter__(self): + # type: () -> SchemaRegistryAvroSerializer + await self._schema_registry_client.__aenter__() + return self + + async def __aexit__(self, *exc_details): + # type: (Any) -> None + await self._schema_registry_client.__aexit__(*exc_details) + + async def close(self): + # type: () -> None + """This method is to close the sockets opened by the client. + It need not be used when using with a context manager. + """ + await self._schema_registry_client.close() + + @alru_cache(maxsize=128, cache_exceptions=False) + async def _get_schema_id(self, schema_name, schema_str, **kwargs): + # type: (str, str, Any) -> str + """ + Get schema id from local cache with the given schema. + If there is no item in the local cache, get schema id from the service and cache it. + + :param schema_name: Name of the schema + :type schema_name: str + :param str schema_str: Schema string + :return: Schema Id + :rtype: str + """ + schema_properties = await self._auto_register_schema_func( + self._schema_group, schema_name, schema_str, "Avro", **kwargs + ) + return schema_properties.id + + @alru_cache(maxsize=128, cache_exceptions=False) + async def _get_schema(self, schema_id, **kwargs): + # type: (str, Any) -> str + """ + Get schema definition from local cache with the given schema id. + If there is no item in the local cache, get schema from the service and cache it. + + :param str schema_id: Schema id + :return: Schema definition + """ + schema = await self._schema_registry_client.get_schema( + schema_id, **kwargs + ) + return schema.schema_definition + + async def serialize(self, value, **kwargs): + # type: (Mapping[str, Any], Any) -> bytes + """ + Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes + denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry + service. The remaining bytes are the real data payload. + + :param value: The data to be encoded. + :type value: Mapping[str, Any] + :keyword schema: Required. The schema used to encode the data. + :paramtype schema: str + :rtype: bytes + """ + try: + raw_input_schema = kwargs.pop("schema") + except KeyError as e: + raise TypeError("'{}' is a required keyword.".format(e.args[0])) + + cached_schema = parse_schema(raw_input_schema) + record_format_identifier = b"\0\0\0\0" + schema_id = await self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) + data_bytes = self._avro_serializer.serialize(value, cached_schema) + + stream = BytesIO() + + stream.write(record_format_identifier) + stream.write(schema_id.encode("utf-8")) + stream.write(data_bytes) + stream.flush() + + payload = stream.getvalue() + stream.close() + return payload + + async def deserialize(self, value, **kwargs): + # type: (bytes, Any) -> Dict[str, Any] + """ + Decode bytes data. + + :param bytes value: The bytes data needs to be decoded. + :rtype: Dict[str, Any] + """ + # record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier. + schema_id = value[ + SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH) + ].decode("utf-8") + schema_definition = await self._get_schema(schema_id, **kwargs) + + dict_value = self._avro_serializer.deserialize( + value[DATA_START_INDEX:], schema_definition + ) + return dict_value diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt b/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt index c2d5098aa720..d53cebff6287 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt @@ -1,4 +1,4 @@ -e ../../../tools/azure-devtools -e ../../../tools/azure-sdk-tools -e ../../identity/azure-identity -../azure-schemaregistry \ No newline at end of file +aiohttp>=3.0; python_version >= '3.5' \ No newline at end of file diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md index efb9eca6a478..a7cdd44591c0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md @@ -14,12 +14,12 @@ These are code samples that show common scenario operations with the Schema Regi Several Schema Registry Avro Serializer Python SDK samples are available to you in the SDK's GitHub repository. These samples provide example code for additional scenarios commonly encountered while working with Schema Registry Avro Serializer: -* [avro_serializer.py][avro_serializer_sample] - Examples for common Schema Registry Avro Serializer tasks: +* [avro_serializer.py][avro_serializer_sample] ([async version][avro_serializer_async_sample]) - Examples for common Schema Registry Avro Serializer tasks: * Serialize data according to the given schema * Deserialize data -* [eventhub_send_integration.py][eventhub_send_integration_sample] - Examples for integration with EventHub in sending tasks: +* [eventhub_send_integration.py][eventhub_send_integration_sample] ([async version][eventhub_send_integration_async_sample]) - Examples for integration with EventHub in sending tasks: * Serialize data with the given schema and send `EventData` to Event Hubs. -* [eventhub_receive_integration.py][eventhub_receive_integration_sample] - Examples for integration with EventHub in receiving tasks: +* [eventhub_receive_integration.py][eventhub_receive_integration_sample] ([async version][eventhub_receive_integration_async_sample]) - Examples for integration with EventHub in receiving tasks: * Receive `EventData` from Event Hubs and deserialize the received bytes. ## Prerequisites @@ -50,7 +50,10 @@ Check out the [API reference documentation][api_reference] to learn more about what you can do with the Azure Schema Registry Avro Serializer library. -[avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py -[eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py -[eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py +[avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py +[eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py +[eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py +[avro_serializer_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py +[eventhub_send_integration_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py +[eventhub_receive_integration_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py [api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry-avroserializer/latest/index.html diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py new file mode 100644 index 000000000000..20595010e117 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py @@ -0,0 +1,93 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import os +import asyncio + +from azure.identity.aio import ClientSecretCredential +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer + +TENANT_ID=os.environ['AZURE_TENANT_ID'] +CLIENT_ID=os.environ['AZURE_CLIENT_ID'] +CLIENT_SECRET=os.environ['AZURE_CLIENT_SECRET'] + +SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE'] +GROUP_NAME=os.environ['SCHEMAREGISTRY_GROUP'] +SCHEMA_STRING = """ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +}""" + + +token_credential = ClientSecretCredential( + tenant_id=TENANT_ID, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET +) + + +async def serialize(serializer): + dict_data_ben = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + dict_data_alice = {"name": u"Alice", "favorite_number": 15, "favorite_color": u"green"} + + # Schema would be automatically registered into Schema Registry and cached locally. + payload_ben = await serializer.serialize(dict_data_ben, schema=SCHEMA_STRING) + # The second call won't trigger a service call. + payload_alice = await serializer.serialize(dict_data_alice, schema=SCHEMA_STRING) + + print('Encoded bytes are: ', payload_ben) + print('Encoded bytes are: ', payload_alice) + return [payload_ben, payload_alice] + + +async def deserialize(serializer, bytes_payload): + # serializer.deserialize would extract the schema id from the payload, + # retrieve schema from Schema Registry and cache the schema locally. + # If the schema id is the local cache, the call won't trigger a service call. + dict_data = await serializer.deserialize(bytes_payload) + + print('Deserialized data is: ', dict_data) + return dict_data + + +async def main(): + schema_registry = SchemaRegistryClient(fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential) + serializer = AvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True) + bytes_data_ben, bytes_data_alice = await serialize(serializer) + dict_data_ben = await deserialize(serializer, bytes_data_ben) + dict_data_alice = await deserialize(serializer, bytes_data_alice) + await serializer.close() + await token_credential.close() + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py new file mode 100644 index 000000000000..df9b21482378 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Examples to show receiving events from EventHub with AvroSerializer integrated for data deserialization. +""" + +# pylint: disable=C0111 +import os +import asyncio +from azure.eventhub.aio import EventHubConsumerClient +from azure.identity.aio import DefaultAzureCredential +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer + +EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] + +SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE'] +GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP'] + + +# create an EventHubConsumerClient instance +eventhub_consumer = EventHubConsumerClient.from_connection_string( + conn_str=EVENTHUB_CONNECTION_STR, + consumer_group='$Default', + eventhub_name=EVENTHUB_NAME, +) +# create a AvroSerializer instance +azure_credential = DefaultAzureCredential() +avro_serializer = AvroSerializer( + client=SchemaRegistryClient( + fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, + credential=azure_credential + ), + group_name=GROUP_NAME, + auto_register_schemas=True +) + +async def on_event(partition_context, event): + print("Received event from partition: {}.".format(partition_context.partition_id)) + + bytes_payload = b"".join(b for b in event.body) + print('The received bytes of the EventData is {}.'.format(bytes_payload)) + + # Use the deserialize method to convert bytes to dict object. + # The deserialize method would extract the schema id from the payload, and automatically retrieve the Avro Schema + # from the Schema Registry Service. The schema would be cached locally for future usage. + deserialized_data = await avro_serializer.deserialize(bytes_payload) + print('The dict data after deserialization is {}'.format(deserialized_data)) + + +async def main(): + try: + async with eventhub_consumer, avro_serializer: + await eventhub_consumer.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) + except KeyboardInterrupt: + print('Stopped receiving.') + finally: + await avro_serializer.close() + await azure_credential.close() + await eventhub_consumer.close() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py new file mode 100644 index 000000000000..4d1dfe687d7a --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Examples to show sending event to EventHub with AvroSerializer integrated for data serialization. +""" + +# pylint: disable=C0111 + +import os +import asyncio +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient +from azure.identity.aio import DefaultAzureCredential +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer + +EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] + +SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE'] +GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP'] + +SCHEMA_STRING = """ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +}""" + +# create an EventHubProducerClient instance +eventhub_producer = EventHubProducerClient.from_connection_string( + conn_str=EVENTHUB_CONNECTION_STR, + eventhub_name=EVENTHUB_NAME +) +# create a AvroSerializer instance +azure_credential = DefaultAzureCredential() +avro_serializer = AvroSerializer( + client=SchemaRegistryClient( + fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, + credential=azure_credential + ), + group_name=GROUP_NAME, + auto_register_schemas=True +) + +async def send_event_data_batch(producer, serializer): + event_data_batch = await producer.create_batch() + + dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"} + # Use the serialize method to convert dict object to bytes with the given avro schema. + # The serialize method would automatically register the schema into the Schema Registry Service and + # schema would be cached locally for future usage. + payload_bytes = await serializer.serialize(value=dict_data, schema=SCHEMA_STRING) + print('The bytes of serialized dict data is {}.'.format(payload_bytes)) + + event_data = EventData(body=payload_bytes) # pass the bytes data to the body of an EventData + event_data_batch.add(event_data) + await producer.send_batch(event_data_batch) + print('Send is done.') + + +async def main(): + + await send_event_data_batch(eventhub_producer, avro_serializer) + await avro_serializer.close() + await azure_credential.close() + await eventhub_producer.close() + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py new file mode 100644 index 000000000000..dc548de50415 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py @@ -0,0 +1,16 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- + +import os +import sys +import uuid + +import pytest + +# Ignore async tests for Python < 3.5 +collect_ignore_glob = [] +if sys.version_info < (3, 5): + collect_ignore_glob.append("*_async.py") diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 8cc336c7d639..13f51e4c25c6 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -22,18 +22,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 04 Oct 2021 18:52:03 GMT + - Wed, 20 Oct 2021 19:04:59 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -72,18 +72,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 04 Oct 2021 18:52:03 GMT + - Wed, 20 Oct 2021 19:05:00 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -111,7 +111,7 @@ interactions: User-Agent: - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) method: GET - uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview response: body: string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' @@ -119,13 +119,13 @@ interactions: content-type: - application/json date: - - Mon, 04 Oct 2021 18:52:03 GMT + - Wed, 20 Oct 2021 19:05:00 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index e4ed567df1ee..6ecb68a231d0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -22,18 +22,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 04 Oct 2021 18:52:04 GMT + - Wed, 20 Oct 2021 19:05:01 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -72,18 +72,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 04 Oct 2021 18:52:05 GMT + - Wed, 20 Oct 2021 19:05:02 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -111,7 +111,7 @@ interactions: User-Agent: - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) method: GET - uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview response: body: string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' @@ -119,13 +119,13 @@ interactions: content-type: - application/json date: - - Mon, 04 Oct 2021 18:52:05 GMT + - Wed, 20 Oct 2021 19:05:02 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml new file mode 100644 index 000000000000..068d169df789 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -0,0 +1,102 @@ +interactions: +- request: + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' + headers: + Accept: + - application/json + Content-Length: + - '221' + Content-Type: + - application/json + Serialization-Type: + - Avro + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: PUT + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview + response: + body: + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' + headers: + content-type: application/json + date: Wed, 20 Oct 2021 19:05:03 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' + headers: + Accept: + - application/json + Content-Length: + - '221' + Content-Type: + - application/json + Serialization-Type: + - Avro + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: POST + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview + response: + body: + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' + headers: + content-type: application/json + date: Wed, 20 Oct 2021 19:05:03 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: null + headers: + Accept: + - text/plain; charset=utf-8 + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: GET + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + response: + body: + string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' + headers: + content-type: application/json + date: Wed, 20 Oct 2021 19:05:04 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview +version: 1 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml new file mode 100644 index 000000000000..a1c8635eef54 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -0,0 +1,102 @@ +interactions: +- request: + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' + headers: + Accept: + - application/json + Content-Length: + - '221' + Content-Type: + - application/json + Serialization-Type: + - Avro + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: PUT + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview + response: + body: + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' + headers: + content-type: application/json + date: Wed, 20 Oct 2021 19:05:05 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' + headers: + Accept: + - application/json + Content-Length: + - '221' + Content-Type: + - application/json + Serialization-Type: + - Avro + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: POST + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview + response: + body: + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' + headers: + content-type: application/json + date: Wed, 20 Oct 2021 19:05:06 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: null + headers: + Accept: + - text/plain; charset=utf-8 + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: GET + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + response: + body: + string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' + headers: + content-type: application/json + date: Wed, 20 Oct 2021 19:05:06 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview +version: 1 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py index b925800685a7..03cda0b26ad0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py @@ -1,3 +1,9 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the ""Software""), to # deal in the Software without restriction, including without limitation the diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py new file mode 100644 index 000000000000..b06e1363b46c --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py @@ -0,0 +1,95 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import functools +import pytest +import uuid +import avro +import avro.io +from io import BytesIO +import pytest + +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer +from azure.identity.aio import ClientSecretCredential +from azure.core.exceptions import ClientAuthenticationError, ServiceRequestError, HttpResponseError + +from devtools_testutils import AzureTestCase, PowerShellPreparer + +SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup") + +class AvroSerializerAsyncTests(AzureTestCase): + + def create_client(self, fully_qualified_namespace): + credential = self.get_credential(SchemaRegistryClient, is_async=True) + return self.create_client_from_credential(SchemaRegistryClient, credential, fully_qualified_namespace=fully_qualified_namespace, is_async=True) + + @pytest.mark.asyncio + @SchemaRegistryPowerShellPreparer() + async def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs): + sr_client = self.create_client(schemaregistry_fully_qualified_namespace) + sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True) + + async with sr_client: + schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema_str = "{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], \"name\": \"favorite_color\"}]}" + schema = avro.schema.parse(schema_str) + + dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) + + assert encoded_data[0:4] == b'\0\0\0\0' + schema_properties = await sr_client.get_schema_properties(schemaregistry_group, schema.fullname, str(schema), "Avro") + schema_id = schema_properties.id + assert encoded_data[4:36] == schema_id.encode("utf-8") + + decoded_data = await sr_avro_serializer.deserialize(encoded_data) + assert decoded_data["name"] == u"Ben" + assert decoded_data["favorite_number"] == 7 + assert decoded_data["favorite_color"] == u"red" + + @pytest.mark.asyncio + @SchemaRegistryPowerShellPreparer() + async def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs): + sr_client = self.create_client(schemaregistry_fully_qualified_namespace) + sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True) + + async with sr_client: + schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema_str = "{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], \"name\": \"favorite_color\"}]}" + schema = avro.schema.parse(schema_str) + + dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) + + assert encoded_data[0:4] == b'\0\0\0\0' + schema_properties = await sr_client.get_schema_properties(schemaregistry_group, schema.fullname, str(schema), "Avro") + schema_id = schema_properties.id + assert encoded_data[4:36] == schema_id.encode("utf-8") + + decoded_data = await sr_avro_serializer.deserialize(encoded_data) + assert decoded_data["name"] == u"Ben" + assert decoded_data["favorite_number"] == 7 + assert decoded_data["favorite_color"] == u"red"