From a6daeb544e815fe350a96d24ae3bb14aee4079a7 Mon Sep 17 00:00:00 2001 From: bolkedebruin Date: Tue, 7 May 2019 22:42:28 +0200 Subject: [PATCH] [AIRFLOW-4472] Use json.dumps/loads for templating lineage data (#5253) jinja2 cannot use dict/lists as templates hence converting it to json solves this while keeping complexity down. --- airflow/lineage/datasets.py | 11 +++++++++-- airflow/models/baseoperator.py | 6 +++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/airflow/lineage/datasets.py b/airflow/lineage/datasets.py index d2e9e920093849..7cf0867f3b15b0 100644 --- a/airflow/lineage/datasets.py +++ b/airflow/lineage/datasets.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import json import six from typing import List @@ -62,7 +63,11 @@ def __getattr__(self, attr): if attr in self.attributes: if self.context: env = Environment() - return env.from_string(self._data.get(attr)).render(**self.context) + # dump to json here in order to be able to manage dicts and lists + rendered = env.from_string( + json.dumps(self._data.get(attr)) + ).render(**self.context) + return json.loads(rendered) return self._data.get(attr) @@ -82,7 +87,9 @@ def as_dict(self): env = Environment() if self.context: for key, value in six.iteritems(attributes): - attributes[key] = env.from_string(value).render(**self.context) + attributes[key] = json.loads( + env.from_string(json.dumps(value)).render(**self.context) + ) d = { "typeName": self.type_name, diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index d40467527553d3..7d9bd9d7450b38 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -25,7 +25,7 @@ import sys import warnings from datetime import timedelta, datetime -from typing import Iterable, Optional, Dict, Callable, Set +from typing import Callable, Dict, Iterable, List, Optional, Set import jinja2 import six @@ -368,8 +368,8 @@ def __init__( self._log = logging.getLogger("airflow.task.operators") # lineage - self.inlets = [] # type: Iterable[DataSet] - self.outlets = [] # type: Iterable[DataSet] + self.inlets = [] # type: List[DataSet] + self.outlets = [] # type: List[DataSet] self.lineage_data = None self._inlets = {