diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 6be72b2c95c34..ace6a001311a1 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -46,7 +46,7 @@ from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS from airflow.timetables.base import DagRunInfo from airflow.utils import helpers, timezone -from airflow.utils.configuration import conf as airflow_conf, tmp_configuration_copy +from airflow.utils.configuration import tmp_configuration_copy from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState @@ -475,10 +475,8 @@ def _process_backfill_task_instances( """ executed_run_dates = [] - is_unit_test = airflow_conf.getboolean("core", "unit_test_mode") - while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked: - self.log.debug("*** Clearing out not_ready list ***") + self.log.debug("Clearing out not_ready list") ti_status.not_ready.clear() # we need to execute the tasks bottom to top @@ -697,7 +695,7 @@ def _per_task_process(key, ti: TaskInstance, session): self.log.debug(e) perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=is_unit_test + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True ) # execute the tasks in the queue executor.heartbeat() @@ -749,6 +747,7 @@ def to_keep(key: TaskInstanceKey) -> bool: self._log_progress(ti_status) session.commit() + time.sleep(1) # return updated status return executed_run_dates