From 221c0453e44e1a49b048ef5f31ad6ce1c3b20033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 10 Jan 2022 13:14:47 +0100 Subject: [PATCH 1/2] ISSUE #677 * Add `batch_mode` argument. --- minos/common/model/abc.py | 14 +++++++++----- minos/common/protocol/avro/base.py | 7 +++++-- .../test_model/test_declarative/test_avro.py | 8 ++++---- .../test_model/test_dynamic/test_abc.py | 4 ++-- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/minos/common/model/abc.py b/minos/common/model/abc.py index fdf2a2f2..78cbab21 100644 --- a/minos/common/model/abc.py +++ b/minos/common/model/abc.py @@ -102,27 +102,31 @@ def from_model_type(cls: Type[T], model_type: ModelType, *args, **kwargs) -> T: """ @classmethod - def from_avro_str(cls: Type[T], raw: str) -> Union[T, list[T]]: + def from_avro_str(cls: Type[T], raw: str, **kwargs) -> Union[T, list[T]]: """Build a single instance or a sequence of instances from bytes :param raw: A ``str`` representation of the model. :return: A single instance or a sequence of instances. """ raw = b64decode(raw.encode()) - return cls.from_avro_bytes(raw) + return cls.from_avro_bytes(raw, **kwargs) @classmethod - def from_avro_bytes(cls: Type[T], raw: bytes) -> Union[T, list[T]]: + def from_avro_bytes(cls: Type[T], raw: bytes, batch_mode: bool = False, **kwargs) -> Union[T, list[T]]: """Build a single instance or a sequence of instances from bytes :param raw: A ``bytes`` representation of the model. + :param batch_mode: If ``True`` the data is processed as a list of models, otherwise the data is processed as a + single model. + :param kwargs: Additional named arguments. :return: A single instance or a sequence of instances. """ schema = MinosAvroProtocol.decode_schema(raw) data = MinosAvroProtocol.decode(raw) - if isinstance(data, list): + if batch_mode: return [cls.from_avro(schema, entry) for entry in data] + return cls.from_avro(schema, data) @classmethod @@ -168,7 +172,7 @@ def to_avro_bytes(cls: Type[T], models: list[T]) -> bytes: avro_schema = models[0].avro_schema # noinspection PyTypeChecker - return MinosAvroProtocol().encode([model.avro_data for model in models], avro_schema) + return MinosAvroProtocol().encode([model.avro_data for model in models], avro_schema, batch_mode=True) # noinspection PyMethodParameters @property_or_classproperty diff --git a/minos/common/protocol/avro/base.py b/minos/common/protocol/avro/base.py index 05d7f585..a4359b29 100644 --- a/minos/common/protocol/avro/base.py +++ b/minos/common/protocol/avro/base.py @@ -22,7 +22,7 @@ class MinosAvroProtocol(MinosBinaryProtocol): """Minos Avro Protocol class.""" @classmethod - def encode(cls, value: Any, schema: Any, *args, **kwargs) -> bytes: + def encode(cls, value: Any, schema: Any, *args, batch_mode: bool = False, **kwargs) -> bytes: """Encoder in avro for database Values all the headers are converted in fields with double underscore name the body is a set fields coming from the data type. @@ -30,11 +30,14 @@ def encode(cls, value: Any, schema: Any, *args, **kwargs) -> bytes: :param value: The data to be stored. :param schema: The schema relative to the data. :param args: Additional positional arguments. + :param batch_mode: If ``True`` the data is processed as a list of models, otherwise the data is processed as a + single model. :param kwargs: Additional named arguments. :return: A bytes object. """ - if not isinstance(value, list): + if not batch_mode: value = [value] + if not isinstance(schema, list): schema = [schema] diff --git a/tests/test_common/test_model/test_declarative/test_avro.py b/tests/test_common/test_model/test_declarative/test_avro.py index 984e5c0b..8280b009 100644 --- a/tests/test_common/test_model/test_declarative/test_avro.py +++ b/tests/test_common/test_model/test_declarative/test_avro.py @@ -270,11 +270,11 @@ def test_from_avro_bytes_single(self): decoded_customer = Customer.from_avro_bytes(avro_bytes) self.assertEqual(customer, decoded_customer) - def test_from_avro_bytes_sequence(self): + def test_from_avro_bytes_in_batch(self): customers = [Customer(1234), Customer(5678)] avro_bytes = Customer.to_avro_bytes(customers) self.assertIsInstance(avro_bytes, bytes) - decoded_customer = Customer.from_avro_bytes(avro_bytes) + decoded_customer = Customer.from_avro_bytes(avro_bytes, batch_mode=True) self.assertEqual(customers, decoded_customer) def test_from_avro_bytes_composed(self): @@ -310,11 +310,11 @@ def test_from_avro_str_single(self): decoded_customer = Customer.from_avro_str(avro_str) self.assertEqual(customer, decoded_customer) - def test_from_avro_str_sequence(self): + def test_from_avro_str_in_batch(self): customers = [Customer(1234), Customer(5678)] avro_str = Customer.to_avro_str(customers) - decoded_customer = Customer.from_avro_str(avro_str) + decoded_customer = Customer.from_avro_str(avro_str, batch_mode=True) self.assertEqual(customers, decoded_customer) diff --git a/tests/test_common/test_model/test_dynamic/test_abc.py b/tests/test_common/test_model/test_dynamic/test_abc.py index 0354549a..26da4e75 100644 --- a/tests/test_common/test_model/test_dynamic/test_abc.py +++ b/tests/test_common/test_model/test_dynamic/test_abc.py @@ -15,9 +15,9 @@ def test_from_avro_bytes(self): model = DynamicModel.from_avro_bytes(original.avro_bytes) self.assertEqual("hello", model.text) - def test_from_avro_bytes_multiple(self): + def test_from_avro_bytes_in_batch(self): encoded = Foo.to_avro_bytes([Foo("hello"), Foo("bye")]) - decoded = DynamicModel.from_avro_bytes(encoded) + decoded = DynamicModel.from_avro_bytes(encoded, batch_mode=True) self.assertEqual("hello", decoded[0].text) self.assertEqual("bye", decoded[1].text) From f9ed4204659364e58c07df299d246279c0eec376 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Mon, 10 Jan 2022 15:16:04 +0100 Subject: [PATCH 2/2] ISSUE #677 * Deprecate `flatten` in favor of `batch_mode`. --- minos/common/model/abc.py | 7 ++++--- minos/common/protocol/abc.py | 2 +- minos/common/protocol/avro/base.py | 14 ++++++++++---- minos/common/protocol/avro/databases.py | 3 +-- minos/common/protocol/avro/messages.py | 2 +- minos/common/protocol/json.py | 2 +- .../test_protocol/test_avro/test_base.py | 19 +++++++++++++++++++ 7 files changed, 37 insertions(+), 12 deletions(-) diff --git a/minos/common/model/abc.py b/minos/common/model/abc.py index 78cbab21..cf9a7a35 100644 --- a/minos/common/model/abc.py +++ b/minos/common/model/abc.py @@ -111,6 +111,7 @@ def from_avro_str(cls: Type[T], raw: str, **kwargs) -> Union[T, list[T]]: raw = b64decode(raw.encode()) return cls.from_avro_bytes(raw, **kwargs) + # noinspection PyUnusedLocal @classmethod def from_avro_bytes(cls: Type[T], raw: bytes, batch_mode: bool = False, **kwargs) -> Union[T, list[T]]: """Build a single instance or a sequence of instances from bytes @@ -122,7 +123,7 @@ def from_avro_bytes(cls: Type[T], raw: bytes, batch_mode: bool = False, **kwargs :return: A single instance or a sequence of instances. """ schema = MinosAvroProtocol.decode_schema(raw) - data = MinosAvroProtocol.decode(raw) + data = MinosAvroProtocol.decode(raw, batch_mode=batch_mode) if batch_mode: return [cls.from_avro(schema, entry) for entry in data] @@ -172,7 +173,7 @@ def to_avro_bytes(cls: Type[T], models: list[T]) -> bytes: avro_schema = models[0].avro_schema # noinspection PyTypeChecker - return MinosAvroProtocol().encode([model.avro_data for model in models], avro_schema, batch_mode=True) + return MinosAvroProtocol.encode([model.avro_data for model in models], avro_schema, batch_mode=True) # noinspection PyMethodParameters @property_or_classproperty @@ -284,7 +285,7 @@ def avro_bytes(self) -> bytes: :return: A bytes object. """ # noinspection PyTypeChecker - return MinosAvroProtocol().encode(self.avro_data, self.avro_schema) + return MinosAvroProtocol.encode(self.avro_data, self.avro_schema) # noinspection PyMethodParameters,PyUnusedLocal @self_or_classmethod diff --git a/minos/common/protocol/abc.py b/minos/common/protocol/abc.py index 6505ee39..0f315381 100644 --- a/minos/common/protocol/abc.py +++ b/minos/common/protocol/abc.py @@ -20,7 +20,7 @@ def encode(cls, *args, **kwargs) -> bytes: @classmethod @abc.abstractmethod - def decode(cls, data: bytes) -> Any: + def decode(cls, data: bytes, *args, **kwargs) -> Any: """Decodes the given bytes data. :param data: bytes data to be decoded. diff --git a/minos/common/protocol/avro/base.py b/minos/common/protocol/avro/base.py index a4359b29..1fd1154d 100644 --- a/minos/common/protocol/avro/base.py +++ b/minos/common/protocol/avro/base.py @@ -63,12 +63,13 @@ def _write_data(value: list[dict[str, Any]], schema: dict[str, Any]): return content @classmethod - def decode(cls, data: bytes, flatten: bool = True, *args, **kwargs) -> Union[dict[str, Any], list[dict[str, Any]]]: + def decode(cls, data: bytes, *args, batch_mode: bool = False, **kwargs) -> Any: """Decode the given bytes of data into a single dictionary or a sequence of dictionaries. :param data: A bytes object. - :param flatten: If ``True`` tries to return the values as flat as possible. :param args: Additional positional arguments. + :param batch_mode: If ``True`` the data is processed as a list of models, otherwise the data is processed as a + single model. :param kwargs: Additional named arguments. :return: A dictionary or a list of dictionaries. """ @@ -79,11 +80,16 @@ def decode(cls, data: bytes, flatten: bool = True, *args, **kwargs) -> Union[dic except Exception as exc: raise MinosProtocolException(f"Error decoding the avro bytes: {exc}") - if flatten and len(ans) == 1: - return ans[0] + if not batch_mode: + if len(ans) > 1: + raise MinosProtocolException( + f"The 'batch_mode' argument was set to {False!r} but data contains multiple values: {ans!r}" + ) + ans = ans[0] return ans + # noinspection PyUnusedLocal @classmethod def decode_schema(cls, data: bytes, *args, **kwargs) -> Union[dict[str, Any], list[dict[str, Any]]]: """Decode the given bytes of data into a single dictionary or a sequence of dictionaries. diff --git a/minos/common/protocol/avro/databases.py b/minos/common/protocol/avro/databases.py index 1feeee3e..a3a5ecdc 100644 --- a/minos/common/protocol/avro/databases.py +++ b/minos/common/protocol/avro/databases.py @@ -30,11 +30,10 @@ def encode(cls, value: Any, *args, **kwargs) -> bytes: return super().encode(final_data, _AVRO_SCHEMA) @classmethod - def decode(cls, data: bytes, flatten: bool = True, *args, **kwargs) -> Union[dict[str, Any], list[dict[str, Any]]]: + def decode(cls, data: bytes, *args, **kwargs) -> Union[dict[str, Any], list[dict[str, Any]]]: """Decode the given bytes of data into a single dictionary or a sequence of dictionaries. :param data: A bytes object. - :param flatten: If ``True`` tries to return the values as flat as possible. :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: A dictionary or a list of dictionaries. diff --git a/minos/common/protocol/avro/messages.py b/minos/common/protocol/avro/messages.py index fe38155a..68ff6788 100644 --- a/minos/common/protocol/avro/messages.py +++ b/minos/common/protocol/avro/messages.py @@ -39,7 +39,7 @@ def decode(cls, data: bytes, *args, **kwargs) -> dict: :return: A dictionary or a list of dictionaries. """ data_return = {"headers": dict()} - for schema_dict in super().decode(data, flatten=False): + for schema_dict in super().decode(data, batch_mode=True): logger.debug("Avro: get the request/response in dict format") data_return["headers"] = schema_dict["headers"] # check wich type is body diff --git a/minos/common/protocol/json.py b/minos/common/protocol/json.py index 9871e2d8..f50c64e0 100644 --- a/minos/common/protocol/json.py +++ b/minos/common/protocol/json.py @@ -24,7 +24,7 @@ def encode(cls, data: Any, *args, **kwargs) -> bytes: return orjson.dumps(data) @classmethod - def decode(cls, data: bytes) -> Any: + def decode(cls, data: bytes, *args, **kwargs) -> Any: """Decodes the given bytes data. :param data: bytes data to be decoded. diff --git a/tests/test_common/test_protocol/test_avro/test_base.py b/tests/test_common/test_protocol/test_avro/test_base.py index 14786f38..696d2c8a 100644 --- a/tests/test_common/test_protocol/test_avro/test_base.py +++ b/tests/test_common/test_protocol/test_avro/test_base.py @@ -2,6 +2,7 @@ from minos.common import ( MinosAvroProtocol, + MinosProtocolException, ) @@ -104,6 +105,24 @@ def test_union_schema(self): data = MinosAvroProtocol.decode(serialized) self.assertEqual("one", data) + def test_batch_mode(self): + serialized = MinosAvroProtocol.encode(["one", 1], [["string", "int"]], batch_mode=True) + + schema = MinosAvroProtocol.decode_schema(serialized) + self.assertEqual(["string", "int"], schema) + + data = MinosAvroProtocol.decode(serialized, batch_mode=True) + self.assertEqual(["one", 1], data) + + def test_batch_mode_raises(self): + serialized = MinosAvroProtocol.encode(["one", 1], [["string", "int"]], batch_mode=True) + + schema = MinosAvroProtocol.decode_schema(serialized) + self.assertEqual(["string", "int"], schema) + + with self.assertRaises(MinosProtocolException): + MinosAvroProtocol.decode(serialized) + if __name__ == "__main__": unittest.main()