From e6fcd29a6a5042139e5b118b123d230a05a4dec4 Mon Sep 17 00:00:00 2001 From: Andrew Stahlman Date: Mon, 18 Feb 2019 17:13:45 -0800 Subject: [PATCH] [AIRFLOW-3885] ~2.5x speed-up for backfill tests (#4731) The BackfillJobTest suite now takes 57 seconds vs. the baseline of 147 seconds on my laptop. A couple of optimizations: - Don't sleep() if we are running unit tests - Don't backfill more DagRuns than needed (reduced from 5 to 2, since we only need 2 DagRuns to verify that we can run backwards) I've also made a few tests reentrant by clearing out the Pool, DagRun, and TaskInstance table between runs. --- airflow/jobs.py | 20 +++++++++++--------- tests/jobs.py | 7 ++++++- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 17a677b34023d5..55b1b92bfa8e34 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -165,15 +165,17 @@ def heartbeat(self): if job.state == State.SHUTDOWN: self.kill() - # Figure out how long to sleep for - sleep_for = 0 - if job.latest_heartbeat: - sleep_for = max( - 0, - self.heartrate - (timezone.utcnow() - - job.latest_heartbeat).total_seconds()) - - sleep(sleep_for) + is_unit_test = conf.getboolean('core', 'unit_test_mode') + if not is_unit_test: + # Figure out how long to sleep for + sleep_for = 0 + if job.latest_heartbeat: + seconds_remaining = self.heartrate - \ + (timezone.utcnow() - job.latest_heartbeat)\ + .total_seconds() + sleep_for = max(0, seconds_remaining) + + sleep(sleep_for) # Update last heartbeat time with create_session() as session: diff --git a/tests/jobs.py b/tests/jobs.py index 90e9f64820188f..8581c03fd01124 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -130,6 +130,11 @@ def abort(): class BackfillJobTest(unittest.TestCase): def setUp(self): + with create_session() as session: + session.query(models.DagRun).delete() + session.query(models.Pool).delete() + session.query(models.TaskInstance).delete() + self.parser = cli.CLIFactory.get_parser() self.dagbag = DagBag(include_examples=True) @@ -1201,7 +1206,7 @@ def test_backfill_run_backwards(self): job = BackfillJob( dag=dag, start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=5), + end_date=DEFAULT_DATE + datetime.timedelta(days=1), run_backwards=True ) job.run()