From 8c5e408b992d0c964c1be822b9b53d05a21ca812 Mon Sep 17 00:00:00 2001 From: kaxil Date: Fri, 8 Nov 2019 00:38:14 +0000 Subject: [PATCH] [AIRFLOW-5869] BugFix: Creating DagRuns fails for Deserialized tasks with no start_date --- airflow/serialization/schema.json | 1 + airflow/serialization/serialized_dag.py | 5 ++ tests/serialization/test_dag_serialization.py | 49 +++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 90182de3c6eb31..f2b05d535a63e3 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -82,6 +82,7 @@ "max_active_runs": { "type" : "number"}, "default_args": { "$ref": "#/definitions/dict" }, "start_date": { "$ref": "#/definitions/datetime" }, + "end_date": { "$ref": "#/definitions/datetime" }, "dagrun_timeout": { "$ref": "#/definitions/timedelta" }, "doc_md": { "type" : "string"} }, diff --git a/airflow/serialization/serialized_dag.py b/airflow/serialization/serialized_dag.py index 320f79382e9d80..d9d6d28d6ff9c4 100644 --- a/airflow/serialization/serialized_dag.py +++ b/airflow/serialization/serialized_dag.py @@ -120,6 +120,11 @@ def deserialize_dag(cls, encoded_dag: dict) -> "SerializedDAG": for task in dag.task_dict.values(): task.dag = dag task = cast(SerializedBaseOperator, task) + + for date_attr in ["start_date", "end_date"]: + if getattr(task, date_attr) is None: + setattr(task, date_attr, getattr(dag, date_attr)) + if task.subdag is not None: setattr(task.subdag, 'parent_dag', dag) task.subdag.is_subdag = True diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index c555841cf0be84..3c3b91c532f8ad 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -295,6 +295,55 @@ def validate_operator_extra_links(self, task): "https://www.google.com" ) + @parameterized.expand([ + (datetime(2019, 8, 1), None, datetime(2019, 8, 1)), + (datetime(2019, 8, 1), datetime(2019, 8, 2), datetime(2019, 8, 2)), + (datetime(2019, 8, 1), datetime(2019, 7, 30), datetime(2019, 8, 1)), + ]) + def test_deserialization_start_date(self, + dag_start_date, + task_start_date, + expected_task_start_date): + dag = DAG(dag_id='simple_dag', start_date=dag_start_date) + BaseOperator(task_id='simple_task', dag=dag, start_date=task_start_date) + + serialized_dag = SerializedDAG.to_dict(dag) + if not task_start_date or dag_start_date >= task_start_date: + # If dag.start_date > task.start_date -> task.start_date=dag.start_date + # because of the logic in dag.add_task() + self.assertNotIn("start_date", serialized_dag["dag"]["tasks"][0]) + else: + self.assertIn("start_date", serialized_dag["dag"]["tasks"][0]) + + dag = SerializedDAG.from_dict(serialized_dag) + simple_task = dag.task_dict["simple_task"] + self.assertEqual(simple_task.start_date, expected_task_start_date) + + @parameterized.expand([ + (datetime(2019, 8, 1), None, datetime(2019, 8, 1)), + (datetime(2019, 8, 1), datetime(2019, 8, 2), datetime(2019, 8, 1)), + (datetime(2019, 8, 1), datetime(2019, 7, 30), datetime(2019, 7, 30)), + ]) + def test_deserialization_end_date(self, + dag_end_date, + task_end_date, + expected_task_end_date): + dag = DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1), + end_date=dag_end_date) + BaseOperator(task_id='simple_task', dag=dag, end_date=task_end_date) + + serialized_dag = SerializedDAG.to_dict(dag) + if not task_end_date or dag_end_date <= task_end_date: + # If dag.end_date < task.end_date -> task.end_date=dag.end_date + # because of the logic in dag.add_task() + self.assertNotIn("end_date", serialized_dag["dag"]["tasks"][0]) + else: + self.assertIn("end_date", serialized_dag["dag"]["tasks"][0]) + + dag = SerializedDAG.from_dict(serialized_dag) + simple_task = dag.task_dict["simple_task"] + self.assertEqual(simple_task.end_date, expected_task_end_date) + @parameterized.expand([ (None, None), ("@weekly", "@weekly"),