Skip to content

Commit

Permalink
[AIRFLOW-3885] ~2.5x speed-up for backfill tests (apache#4731)
Browse files Browse the repository at this point in the history
The BackfillJobTest suite now takes 57 seconds vs. the baseline of 147
seconds on my laptop.

A couple of optimizations:

- Don't sleep() if we are running unit tests
- Don't backfill more DagRuns than needed (reduced from 5 to 2, since we
  only need 2 DagRuns to verify that we can run backwards)

I've also made a few tests reentrant by clearing out the Pool, DagRun,
and TaskInstance table between runs.
  • Loading branch information
astahlman authored and ashb committed Mar 6, 2019
1 parent 85f0cf3 commit e6fcd29
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
20 changes: 11 additions & 9 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,17 @@ def heartbeat(self):
if job.state == State.SHUTDOWN:
self.kill()

# Figure out how long to sleep for
sleep_for = 0
if job.latest_heartbeat:
sleep_for = max(
0,
self.heartrate - (timezone.utcnow() -
job.latest_heartbeat).total_seconds())

sleep(sleep_for)
is_unit_test = conf.getboolean('core', 'unit_test_mode')
if not is_unit_test:
# Figure out how long to sleep for
sleep_for = 0
if job.latest_heartbeat:
seconds_remaining = self.heartrate - \
(timezone.utcnow() - job.latest_heartbeat)\
.total_seconds()
sleep_for = max(0, seconds_remaining)

sleep(sleep_for)

# Update last heartbeat time
with create_session() as session:
Expand Down
7 changes: 6 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def abort():
class BackfillJobTest(unittest.TestCase):

def setUp(self):
with create_session() as session:
session.query(models.DagRun).delete()
session.query(models.Pool).delete()
session.query(models.TaskInstance).delete()

self.parser = cli.CLIFactory.get_parser()
self.dagbag = DagBag(include_examples=True)

Expand Down Expand Up @@ -1201,7 +1206,7 @@ def test_backfill_run_backwards(self):
job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=5),
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
run_backwards=True
)
job.run()
Expand Down

0 comments on commit e6fcd29

Please sign in to comment.