From fb169536a32b7247ee58ef4bf1e3deccfce23341 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 6 May 2024 08:24:38 -0700 Subject: [PATCH] Only heartbeat if necessary in backfill loop (#39399) Currently, backfill sleeps for a minute in every iteration, which is extremely slow. The reason is that it waits synchronously until a heartbeat is necessary. Since the loop is otherwise fast, this results in waits of up to a minute between syncing. With this change, if we don't add sleep(1), the loop will be very fast and generate tons of logs. So I sleep each second to slow it down just a bit. --- airflow/jobs/backfill_job_runner.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 6be72b2c95c34..ace6a001311a1 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -46,7 +46,7 @@ from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS from airflow.timetables.base import DagRunInfo from airflow.utils import helpers, timezone -from airflow.utils.configuration import conf as airflow_conf, tmp_configuration_copy +from airflow.utils.configuration import tmp_configuration_copy from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState @@ -475,10 +475,8 @@ def _process_backfill_task_instances( """ executed_run_dates = [] - is_unit_test = airflow_conf.getboolean("core", "unit_test_mode") - while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked: - self.log.debug("*** Clearing out not_ready list ***") + self.log.debug("Clearing out not_ready list") ti_status.not_ready.clear() # we need to execute the tasks bottom to top @@ -697,7 +695,7 @@ def _per_task_process(key, ti: TaskInstance, session): self.log.debug(e) perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=is_unit_test + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True ) # execute the tasks in the queue executor.heartbeat() @@ -749,6 +747,7 @@ def to_keep(key: TaskInstanceKey) -> bool: self._log_progress(ti_status) session.commit() + time.sleep(1) # return updated status return executed_run_dates