From 8949aaca604f77ce8f24ff5255d18225a77eef5a Mon Sep 17 00:00:00 2001 From: brookskd Date: Tue, 7 Aug 2018 10:53:18 -0400 Subject: [PATCH] [AIRFLOW-2231] Fix relativedelta DAG schedule_interval (#3174) Fixes issues when specifying a DAG with a schedule_interval of type relativedelta. --- airflow/models.py | 8 ++++---- tests/core.py | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index 288bd4c93736f1..d2fd3c05320aa6 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -26,7 +26,7 @@ from builtins import str, object, bytes, ImportError as BuiltinImportError import copy -from collections import namedtuple, defaultdict +from collections import namedtuple, defaultdict, Hashable from datetime import timedelta import dill @@ -3233,7 +3233,7 @@ def __init__( ) self.schedule_interval = schedule_interval - if schedule_interval in cron_presets: + if isinstance(schedule_interval, Hashable) and schedule_interval in cron_presets: self._schedule_interval = cron_presets.get(schedule_interval) elif schedule_interval == '@once': self._schedule_interval = None @@ -3333,7 +3333,7 @@ def following_schedule(self, dttm): cron = croniter(self._schedule_interval, dttm) following = timezone.make_aware(cron.get_next(datetime), self.timezone) return timezone.convert_to_utc(following) - elif isinstance(self._schedule_interval, timedelta): + elif self._schedule_interval is not None: return dttm + self._schedule_interval def previous_schedule(self, dttm): @@ -3348,7 +3348,7 @@ def previous_schedule(self, dttm): cron = croniter(self._schedule_interval, dttm) prev = timezone.make_aware(cron.get_prev(datetime), self.timezone) return timezone.convert_to_utc(prev) - elif isinstance(self._schedule_interval, timedelta): + elif self._schedule_interval is not None: return dttm - self._schedule_interval def get_run_dates(self, start_date, end_date=None): diff --git a/tests/core.py b/tests/core.py index d336b1bd1ff1b2..b0471bc807d259 100644 --- a/tests/core.py +++ b/tests/core.py @@ -153,6 +153,46 @@ def test_schedule_dag_no_previous_runs(self): self.assertFalse(dag_run.external_trigger) dag.clear() + def test_schedule_dag_relativedelta(self): + """ + Tests scheduling a dag with a relativedelta schedule_interval + """ + delta = relativedelta(hours=+1) + dag = DAG(TEST_DAG_ID + 'test_schedule_dag_relativedelta', + schedule_interval=delta) + dag.add_task(models.BaseOperator( + task_id="faketastic", + owner='Also fake', + start_date=datetime(2015, 1, 2, 0, 0))) + + dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) + self.assertIsNotNone(dag_run) + self.assertEqual(dag.dag_id, dag_run.dag_id) + self.assertIsNotNone(dag_run.run_id) + self.assertNotEqual('', dag_run.run_id) + self.assertEqual( + datetime(2015, 1, 2, 0, 0), + dag_run.execution_date, + msg='dag_run.execution_date did not match expectation: {0}' + .format(dag_run.execution_date) + ) + self.assertEqual(State.RUNNING, dag_run.state) + self.assertFalse(dag_run.external_trigger) + dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) + self.assertIsNotNone(dag_run2) + self.assertEqual(dag.dag_id, dag_run2.dag_id) + self.assertIsNotNone(dag_run2.run_id) + self.assertNotEqual('', dag_run2.run_id) + self.assertEqual( + datetime(2015, 1, 2, 0, 0) + delta, + dag_run2.execution_date, + msg='dag_run2.execution_date did not match expectation: {0}' + .format(dag_run2.execution_date) + ) + self.assertEqual(State.RUNNING, dag_run2.state) + self.assertFalse(dag_run2.external_trigger) + dag.clear() + def test_schedule_dag_fake_scheduled_previous(self): """ Test scheduling a dag where there is a prior DagRun