Skip to content

Commit

Permalink
[AIRFLOW-2231] Fix relativedelta DAG schedule_interval (apache#3174)
Browse files Browse the repository at this point in the history
Fixes issues when specifying a DAG with a schedule_interval of type relativedelta.
  • Loading branch information
kylebrooks-8451 authored and kaxil committed Aug 7, 2018
1 parent 4f138df commit 8949aac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
8 changes: 4 additions & 4 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from builtins import str, object, bytes, ImportError as BuiltinImportError
import copy
from collections import namedtuple, defaultdict
from collections import namedtuple, defaultdict, Hashable
from datetime import timedelta

import dill
Expand Down Expand Up @@ -3233,7 +3233,7 @@ def __init__(
)

self.schedule_interval = schedule_interval
if schedule_interval in cron_presets:
if isinstance(schedule_interval, Hashable) and schedule_interval in cron_presets:
self._schedule_interval = cron_presets.get(schedule_interval)
elif schedule_interval == '@once':
self._schedule_interval = None
Expand Down Expand Up @@ -3333,7 +3333,7 @@ def following_schedule(self, dttm):
cron = croniter(self._schedule_interval, dttm)
following = timezone.make_aware(cron.get_next(datetime), self.timezone)
return timezone.convert_to_utc(following)
elif isinstance(self._schedule_interval, timedelta):
elif self._schedule_interval is not None:
return dttm + self._schedule_interval

def previous_schedule(self, dttm):
Expand All @@ -3348,7 +3348,7 @@ def previous_schedule(self, dttm):
cron = croniter(self._schedule_interval, dttm)
prev = timezone.make_aware(cron.get_prev(datetime), self.timezone)
return timezone.convert_to_utc(prev)
elif isinstance(self._schedule_interval, timedelta):
elif self._schedule_interval is not None:
return dttm - self._schedule_interval

def get_run_dates(self, start_date, end_date=None):
Expand Down
40 changes: 40 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,46 @@ def test_schedule_dag_no_previous_runs(self):
self.assertFalse(dag_run.external_trigger)
dag.clear()

def test_schedule_dag_relativedelta(self):
"""
Tests scheduling a dag with a relativedelta schedule_interval
"""
delta = relativedelta(hours=+1)
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_relativedelta',
schedule_interval=delta)
dag.add_task(models.BaseOperator(
task_id="faketastic",
owner='Also fake',
start_date=datetime(2015, 1, 2, 0, 0)))

dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
self.assertIsNotNone(dag_run)
self.assertEqual(dag.dag_id, dag_run.dag_id)
self.assertIsNotNone(dag_run.run_id)
self.assertNotEqual('', dag_run.run_id)
self.assertEqual(
datetime(2015, 1, 2, 0, 0),
dag_run.execution_date,
msg='dag_run.execution_date did not match expectation: {0}'
.format(dag_run.execution_date)
)
self.assertEqual(State.RUNNING, dag_run.state)
self.assertFalse(dag_run.external_trigger)
dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
self.assertIsNotNone(dag_run2)
self.assertEqual(dag.dag_id, dag_run2.dag_id)
self.assertIsNotNone(dag_run2.run_id)
self.assertNotEqual('', dag_run2.run_id)
self.assertEqual(
datetime(2015, 1, 2, 0, 0) + delta,
dag_run2.execution_date,
msg='dag_run2.execution_date did not match expectation: {0}'
.format(dag_run2.execution_date)
)
self.assertEqual(State.RUNNING, dag_run2.state)
self.assertFalse(dag_run2.external_trigger)
dag.clear()

def test_schedule_dag_fake_scheduled_previous(self):
"""
Test scheduling a dag where there is a prior DagRun
Expand Down

0 comments on commit 8949aac

Please sign in to comment.