diff --git a/airflow/lineage/datasets.py b/airflow/lineage/datasets.py index 260277065b6f0..3d61e5d3f4843 100644 --- a/airflow/lineage/datasets.py +++ b/airflow/lineage/datasets.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import json import six from typing import List @@ -62,7 +63,11 @@ def __getattr__(self, attr): if attr in self.attributes: if self.context: env = Environment() - return env.from_string(self._data.get(attr)).render(**self.context) + # dump to json here in order to be able to manage dicts and lists + rendered = env.from_string( + json.dumps(self._data.get(attr)) + ).render(**self.context) + return json.loads(rendered) return self._data.get(attr) @@ -82,7 +87,9 @@ def as_dict(self): env = Environment() if self.context: for key, value in six.iteritems(attributes): - attributes[key] = env.from_string(value).render(**self.context) + attributes[key] = json.loads( + env.from_string(json.dumps(value)).render(**self.context) + ) d = { "typeName": self.type_name, diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index b61c9808d442a..52037c56a756c 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -428,8 +428,8 @@ def __init__( self._log = logging.getLogger("airflow.task.operators") # lineage - self.inlets = [] # type: Iterable[DataSet] - self.outlets = [] # type: Iterable[DataSet] + self.inlets = [] # type: List[DataSet] + self.outlets = [] # type: List[DataSet] self.lineage_data = None self._inlets = {