From f7ee54e00d92820edc5e51d1c8630b5d6cc0fc12 Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Sun, 18 Feb 2024 10:53:38 +0100 Subject: [PATCH] Fix Airflow serialization for namedtuple (#37168) Namedtuple is serialized like 'builtins.tuple' The serialize method (in airflow/serialization/serializers/builtin.py) does qualname() on the namedtuple, which returns an arbitrary name. If this is used as classname, it will fail to deserialize: there won't be any deserializer for it. --- airflow/serialization/serde.py | 29 +++++++++++++++++++++++------ tests/serialization/test_serde.py | 9 +++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/airflow/serialization/serde.py b/airflow/serialization/serde.py index fd7eb33af72846..42e5a3a658852a 100644 --- a/airflow/serialization/serde.py +++ b/airflow/serialization/serde.py @@ -134,6 +134,20 @@ def serialize(o: object, depth: int = 0) -> U | None: cls = type(o) qn = qualname(o) + classname = None + + # Serialize namedtuple like tuples + # We also override the classname returned by the builtin.py serializer. The classname + # has to be "builtins.tuple", so that the deserializer can deserialize the object into tuple. + if _is_namedtuple(o): + qn = "builtins.tuple" + classname = qn + + # if there is a builtin serializer available use that + if qn in _serializers: + data, serialized_classname, version, is_serialized = _serializers[qn].serialize(o) + if is_serialized: + return encode(classname or serialized_classname, version, serialize(data, depth + 1)) # custom serializers dct = { @@ -141,12 +155,6 @@ def serialize(o: object, depth: int = 0) -> U | None: VERSION: getattr(cls, "__version__", DEFAULT_VERSION), } - # if there is a builtin serializer available use that - if qn in _serializers: - data, classname, version, is_serialized = _serializers[qn].serialize(o) - if is_serialized: - return encode(classname, version, serialize(data, depth + 1)) - # object / class brings their own if hasattr(o, "serialize"): data = getattr(o, "serialize")() @@ -337,6 +345,15 @@ def _is_pydantic(cls: Any) -> bool: return hasattr(cls, "model_config") and hasattr(cls, "model_fields") and hasattr(cls, "model_fields_set") +def _is_namedtuple(cls: Any) -> bool: + """Return True if the class is a namedtuple. + + Checking is done by attributes as it is significantly faster than + using isinstance. + """ + return hasattr(cls, "_asdict") and hasattr(cls, "_fields") and hasattr(cls, "_field_defaults") + + def _register(): """Register builtin serializers and deserializers for types that don't have any themselves.""" _serializers.clear() diff --git a/tests/serialization/test_serde.py b/tests/serialization/test_serde.py index fc37aa60639621..6298de53e63989 100644 --- a/tests/serialization/test_serde.py +++ b/tests/serialization/test_serde.py @@ -18,6 +18,7 @@ import datetime import enum +from collections import namedtuple from dataclasses import dataclass from importlib import import_module from typing import ClassVar @@ -185,6 +186,14 @@ def test_ser_plain_dict(self): i = {SCHEMA_ID: "cannot"} serialize(i) + def test_ser_namedtuple(self): + CustomTuple = namedtuple("CustomTuple", ["id", "value"]) + data = CustomTuple(id=1, value="something") + + i = deserialize(serialize(data)) + e = (1, "something") + assert i == e + def test_no_serializer(self): with pytest.raises(TypeError, match="^cannot serialize"): i = Exception