Skip to content

Commit

Permalink
[AIRFLOW-5869] BugFix: Some Deserialized tasks have no start_date (#6519
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 3fa64ea)
  • Loading branch information
kaxil authored and potiuk committed Nov 12, 2019
1 parent ecfcd43 commit ef44ed7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"max_active_runs": { "type" : "number"},
"default_args": { "$ref": "#/definitions/dict" },
"start_date": { "$ref": "#/definitions/datetime" },
"end_date": { "$ref": "#/definitions/datetime" },
"dagrun_timeout": { "$ref": "#/definitions/timedelta" },
"doc_md": { "type" : "string"}
},
Expand Down
5 changes: 5 additions & 0 deletions airflow/serialization/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ def deserialize_dag(cls, encoded_dag):
for task in dag.task_dict.values():
task.dag = dag
task = cast(SerializedBaseOperator, task)

for date_attr in ["start_date", "end_date"]:
if getattr(task, date_attr) is None:
setattr(task, date_attr, getattr(dag, date_attr))

if task.subdag is not None:
setattr(task.subdag, 'parent_dag', dag)
task.subdag.is_subdag = True
Expand Down
49 changes: 49 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,55 @@ def validate_operator_extra_links(self, task):
"https://www.google.com"
)

@parameterized.expand([
(datetime(2019, 8, 1), None, datetime(2019, 8, 1)),
(datetime(2019, 8, 1), datetime(2019, 8, 2), datetime(2019, 8, 2)),
(datetime(2019, 8, 1), datetime(2019, 7, 30), datetime(2019, 8, 1)),
])
def test_deserialization_start_date(self,
dag_start_date,
task_start_date,
expected_task_start_date):
dag = DAG(dag_id='simple_dag', start_date=dag_start_date)
BaseOperator(task_id='simple_task', dag=dag, start_date=task_start_date)

serialized_dag = SerializedDAG.to_dict(dag)
if not task_start_date or dag_start_date >= task_start_date:
# If dag.start_date > task.start_date -> task.start_date=dag.start_date
# because of the logic in dag.add_task()
self.assertNotIn("start_date", serialized_dag["dag"]["tasks"][0])
else:
self.assertIn("start_date", serialized_dag["dag"]["tasks"][0])

dag = SerializedDAG.from_dict(serialized_dag)
simple_task = dag.task_dict["simple_task"]
self.assertEqual(simple_task.start_date, expected_task_start_date)

@parameterized.expand([
(datetime(2019, 8, 1), None, datetime(2019, 8, 1)),
(datetime(2019, 8, 1), datetime(2019, 8, 2), datetime(2019, 8, 1)),
(datetime(2019, 8, 1), datetime(2019, 7, 30), datetime(2019, 7, 30)),
])
def test_deserialization_end_date(self,
dag_end_date,
task_end_date,
expected_task_end_date):
dag = DAG(dag_id='simple_dag', start_date=datetime(2019, 8, 1),
end_date=dag_end_date)
BaseOperator(task_id='simple_task', dag=dag, end_date=task_end_date)

serialized_dag = SerializedDAG.to_dict(dag)
if not task_end_date or dag_end_date <= task_end_date:
# If dag.end_date < task.end_date -> task.end_date=dag.end_date
# because of the logic in dag.add_task()
self.assertNotIn("end_date", serialized_dag["dag"]["tasks"][0])
else:
self.assertIn("end_date", serialized_dag["dag"]["tasks"][0])

dag = SerializedDAG.from_dict(serialized_dag)
simple_task = dag.task_dict["simple_task"]
self.assertEqual(simple_task.end_date, expected_task_end_date)

@parameterized.expand([
(None, None),
("@weekly", "@weekly"),
Expand Down

0 comments on commit ef44ed7

Please sign in to comment.