diff --git a/airflow/jobs.py b/airflow/jobs.py index 39e3462faae664..00d4864b1d3a99 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -168,15 +168,17 @@ def heartbeat(self): if job.state == State.SHUTDOWN: self.kill() - # Figure out how long to sleep for - sleep_for = 0 - if job.latest_heartbeat: - sleep_for = max( - 0, - self.heartrate - (timezone.utcnow() - - job.latest_heartbeat).total_seconds()) - - sleep(sleep_for) + is_unit_test = conf.getboolean('core', 'unit_test_mode') + if not is_unit_test: + # Figure out how long to sleep for + sleep_for = 0 + if job.latest_heartbeat: + seconds_remaining = self.heartrate - \ + (timezone.utcnow() - job.latest_heartbeat)\ + .total_seconds() + sleep_for = max(0, seconds_remaining) + + sleep(sleep_for) # Update last heartbeat time with create_session() as session: diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 42046d2fbb5f16..e16c9d9a0a7f89 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -133,6 +133,11 @@ def abort(): class BackfillJobTest(unittest.TestCase): def setUp(self): + with create_session() as session: + session.query(models.DagRun).delete() + session.query(models.Pool).delete() + session.query(models.TaskInstance).delete() + self.parser = cli.CLIFactory.get_parser() self.dagbag = DagBag(include_examples=True) @@ -1207,7 +1212,7 @@ def test_backfill_run_backwards(self): job = BackfillJob( dag=dag, start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE + datetime.timedelta(days=5), + end_date=DEFAULT_DATE + datetime.timedelta(days=1), run_backwards=True ) job.run()