Skip to content

Commit

Permalink
[AIRFLOW-3702] Add backfill option to run backwards (apache#4676)
Browse files Browse the repository at this point in the history
  • Loading branch information
dima-asana authored and ashb committed Mar 6, 2019
1 parent 71c6bb1 commit 85f0cf3
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 5 deletions.
10 changes: 9 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def backfill(args, dag=None):
verbose=args.verbose,
conf=run_conf,
rerun_failed_tasks=args.rerun_failed_tasks,
run_backwards=args.run_backwards
)


Expand Down Expand Up @@ -1576,6 +1577,13 @@ class CLIFactory(object):
"all the failed tasks for the backfill date range "
"instead of throwing exceptions"),
"store_true"),
'run_backwards': Arg(
("-B", "--run_backwards",),
(
"if set, the backfill will run tasks from the most "
"recent day first. if there are tasks that depend_on_past "
"this option will throw an exception"),
"store_true"),

# list_tasks
'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
Expand Down Expand Up @@ -1921,7 +1929,7 @@ class CLIFactory(object):
'mark_success', 'local', 'donot_pickle',
'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf',
'reset_dag_run', 'rerun_failed_tasks',
'reset_dag_run', 'rerun_failed_tasks', 'run_backwards'
)
}, {
'func': list_dag_runs,
Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.

from builtins import range
from collections import OrderedDict

from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -39,7 +40,7 @@ def __init__(self, parallelism=PARALLELISM):
:type parallelism: int
"""
self.parallelism = parallelism
self.queued_tasks = {}
self.queued_tasks = OrderedDict()
self.running = {}
self.event_buffer = {}

Expand Down
19 changes: 17 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import sys
import threading
import time
from collections import defaultdict
from collections import defaultdict, OrderedDict
from time import sleep

import six
Expand Down Expand Up @@ -1833,7 +1833,7 @@ def __init__(self,
:param total_runs: Number of total dag runs able to run
:type total_runs: int
"""
self.to_run = to_run or dict()
self.to_run = to_run or OrderedDict()
self.running = running or dict()
self.skipped = skipped or set()
self.succeeded = succeeded or set()
Expand All @@ -1859,6 +1859,7 @@ def __init__(
verbose=False,
conf=None,
rerun_failed_tasks=False,
run_backwards=False,
*args, **kwargs):
"""
:param dag: DAG object.
Expand All @@ -1885,6 +1886,8 @@ def __init__(
:param rerun_failed_tasks: flag to whether to
auto rerun the failed task in backfill
:type rerun_failed_tasks: bool
:param run_backwards: Whether to process the dates from most to least recent
:type run_backwards bool
:param args:
:param kwargs:
"""
Expand All @@ -1901,6 +1904,7 @@ def __init__(
self.verbose = verbose
self.conf = conf
self.rerun_failed_tasks = rerun_failed_tasks
self.run_backwards = run_backwards
super(BackfillJob, self).__init__(*args, **kwargs)

def _update_counters(self, ti_status):
Expand Down Expand Up @@ -2141,8 +2145,10 @@ def _process_backfill_task_instances(self,
# or leaf to root, as otherwise tasks might be
# determined deadlocked while they are actually
# waiting for their upstream to finish

for task in self.dag.topological_sort():
for key, ti in list(ti_status.to_run.items()):

if task.task_id != ti.task_id:
continue

Expand Down Expand Up @@ -2228,6 +2234,7 @@ def _process_backfill_task_instances(self,
self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
session.merge(ti)

cfg_path = None
Expand Down Expand Up @@ -2409,6 +2416,14 @@ def _execute(self, session=None):
# Get intervals between the start/end dates, which will turn into dag runs
run_dates = self.dag.get_run_dates(start_date=start_date,
end_date=self.bf_end_date)
if self.run_backwards:
tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
if tasks_that_depend_on_past:
raise AirflowException(
'You cannot backfill backwards because one or more tasks depend_on_past: {}'.format(
",".join(tasks_that_depend_on_past)))
run_dates = run_dates[::-1]

if len(run_dates) == 0:
self.log.info("No run dates were found for the given dates and dag interval.")
return
Expand Down
7 changes: 7 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4274,6 +4274,7 @@ def run(
verbose=False,
conf=None,
rerun_failed_tasks=False,
run_backwards=False,
):
"""
Runs the DAG.
Expand Down Expand Up @@ -4304,6 +4305,11 @@ def run(
:type verbose: bool
:param conf: user defined dictionary passed from CLI
:type conf: dict
:param rerun_failed_tasks:
:type: bool
:param run_backwards:
:type: bool
"""
from airflow.jobs import BackfillJob
if not executor and local:
Expand All @@ -4324,6 +4330,7 @@ def run(
verbose=verbose,
conf=conf,
rerun_failed_tasks=rerun_failed_tasks,
run_backwards=run_backwards,
)
job.run()

Expand Down
62 changes: 61 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,41 @@ def test_cli_backfill_depends_on_past(self):
self.assertEqual(ti.state, State.SUCCESS)
dag.clear()

def test_cli_backfill_depends_on_past_backwards(self):
"""
Test that CLI respects -B argument and raises on interaction with depends_on_past
"""
dag_id = 'test_depends_on_past'
start_date = DEFAULT_DATE + datetime.timedelta(days=1)
end_date = start_date + datetime.timedelta(days=1)
args = [
'backfill',
dag_id,
'-l',
'-s',
start_date.isoformat(),
'-e',
end_date.isoformat(),
'-I'
]
dag = self.dagbag.get_dag(dag_id)
dag.clear()

cli.backfill(self.parser.parse_args(args + ['-I']))
ti = TI(dag.get_task('test_dop_task'), end_date)
ti.refresh_from_db()
# runs fine forwards
self.assertEqual(ti.state, State.SUCCESS)

# raises backwards
expected_msg = 'You cannot backfill backwards because one or more tasks depend_on_past: {}'.format(
'test_dop_task')
self.assertRaisesRegexp(
AirflowException,
expected_msg,
cli.backfill,
self.parser.parse_args(args + ['-B']))

def test_cli_receives_delay_arg(self):
"""
Tests that the --delay argument is passed correctly to the BackfillJob
Expand Down Expand Up @@ -1141,7 +1176,8 @@ def get_test_dag_for_backfill(schedule_interval=None):
DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')
owner='airflow',
)
return dag

test_dag = get_test_dag_for_backfill()
Expand All @@ -1158,6 +1194,30 @@ def get_test_dag_for_backfill(schedule_interval=None):
start_date=DEFAULT_DATE - datetime.timedelta(hours=3),
end_date=DEFAULT_DATE,))

def test_backfill_run_backwards(self):
dag = self.dagbag.get_dag("test_start_date_scheduling")
dag.clear()

job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=5),
run_backwards=True
)
job.run()

session = settings.Session()
tis = session.query(TI).filter(
TI.dag_id == 'test_start_date_scheduling' and TI.task_id == 'dummy'
).order_by(TI.execution_date).all()

queued_times = [ti.queued_dttm for ti in tis]
self.assertTrue(queued_times == sorted(queued_times, reverse=True))
self.assertTrue(all([ti.state == State.SUCCESS for ti in tis]))

dag.clear()
session.close()


class LocalTaskJobTest(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit 85f0cf3

Please sign in to comment.