Skip to content

Commit

Permalink
[AIRFLOW-6195] Fixed TaskInstance attrs not correct on UI (#6758)
Browse files Browse the repository at this point in the history
(cherry picked from commit d4a8afb)
  • Loading branch information
baolsen authored and kaxil committed Dec 17, 2019
1 parent 0dba304 commit de007d5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def _per_task_process(task, key, ti, session=None):
self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
ti.queued_dttm = timezone.utcnow()
session.merge(ti)

cfg_path = None
Expand Down
5 changes: 2 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,9 +1096,7 @@ def _change_state_for_executable_task_instances(self, task_instances,
# set TIs to queued state
for task_instance in tis_to_set_to_queued:
task_instance.state = State.QUEUED
task_instance.queued_dttm = (timezone.utcnow()
if not task_instance.queued_dttm
else task_instance.queued_dttm)
task_instance.queued_dttm = timezone.utcnow()
session.merge(task_instance)

# Generate a list of SimpleTaskInstance for the use of queuing
Expand Down Expand Up @@ -1226,6 +1224,7 @@ def _change_state_for_tasks_failed_to_execute(self, session):
# set TIs to queued state
for task_instance in tis_to_set_to_scheduled:
task_instance.state = State.SCHEDULED
task_instance.queued_dttm = None
self.executor.queued_tasks.pop(task_instance.key)

task_instance_str = "\n\t".join(
Expand Down
13 changes: 12 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))
# If adding new fields here then remember to add them to
# refresh_from_db() or they wont display in the UI correctly

__table_args__ = (
Index('ti_dag_state', dag_id, state),
Expand Down Expand Up @@ -474,14 +476,23 @@ def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_
else:
ti = qry.first()
if ti:
self.state = ti.state
# Fields ordered per model definition
self.start_date = ti.start_date
self.end_date = ti.end_date
self.duration = ti.duration
self.state = ti.state
# Get the raw value of try_number column, don't read through the
# accessor here otherwise it will be incremeneted by one already.
self.try_number = ti._try_number
self.max_tries = ti.max_tries
self.hostname = ti.hostname
self.unixname = ti.unixname
self.job_id = ti.job_id
self.pool = ti.pool
self.queue = ti.queue
self.priority_weight = ti.priority_weight
self.operator = ti.operator
self.queued_dttm = ti.queued_dttm
self.pid = ti.pid
if refresh_executor_config:
self.executor_config = ti.executor_config
Expand Down

0 comments on commit de007d5

Please sign in to comment.