diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py index acb97180d5c5b..26578f8c18cd8 100644 --- a/airflow/jobs/backfill_job.py +++ b/airflow/jobs/backfill_job.py @@ -488,7 +488,7 @@ def _per_task_process(task, key, ti, session=None): self.log.debug('Sending %s to executor', ti) # Skip scheduled state, we are executing immediately ti.state = State.QUEUED - ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm + ti.queued_dttm = timezone.utcnow() session.merge(ti) cfg_path = None diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 53293f366d755..188216584111d 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1261,9 +1261,7 @@ def _change_state_for_executable_task_instances(self, task_instances, # set TIs to queued state for task_instance in tis_to_set_to_queued: task_instance.state = State.QUEUED - task_instance.queued_dttm = (timezone.utcnow() - if not task_instance.queued_dttm - else task_instance.queued_dttm) + task_instance.queued_dttm = timezone.utcnow() session.merge(task_instance) # Generate a list of SimpleTaskInstance for the use of queuing @@ -1391,6 +1389,7 @@ def _change_state_for_tasks_failed_to_execute(self, session): # set TIs to queued state for task_instance in tis_to_set_to_scheduled: task_instance.state = State.SCHEDULED + task_instance.queued_dttm = None self.executor.queued_tasks.pop(task_instance.key) task_instance_str = "\n\t".join( diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 716d51d1b7349..1118919e23b5c 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -160,6 +160,8 @@ class TaskInstance(Base, LoggingMixin): queued_dttm = Column(UtcDateTime) pid = Column(Integer) executor_config = Column(PickleType(pickler=dill)) + # If adding new fields here then remember to add them to + # refresh_from_db() or they wont display in the UI correctly __table_args__ = ( Index('ti_dag_state', dag_id, state), @@ -443,14 +445,23 @@ def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_ else: ti = qry.first() if ti: - self.state = ti.state + # Fields ordered per model definition self.start_date = ti.start_date self.end_date = ti.end_date + self.duration = ti.duration + self.state = ti.state # Get the raw value of try_number column, don't read through the # accessor here otherwise it will be incremented by one already. self.try_number = ti._try_number self.max_tries = ti.max_tries self.hostname = ti.hostname + self.unixname = ti.unixname + self.job_id = ti.job_id + self.pool = ti.pool + self.queue = ti.queue + self.priority_weight = ti.priority_weight + self.operator = ti.operator + self.queued_dttm = ti.queued_dttm self.pid = ti.pid if refresh_executor_config: self.executor_config = ti.executor_config