Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6195] Fixed TaskInstance attrs not correct on UI #6758

Merged
merged 13 commits into from
Dec 11, 2019
2 changes: 1 addition & 1 deletion airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def _per_task_process(task, key, ti, session=None):
self.log.debug('Sending %s to executor', ti)
# Skip scheduled state, we are executing immediately
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() if not ti.queued_dttm else ti.queued_dttm
ti.queued_dttm = timezone.utcnow()
session.merge(ti)

cfg_path = None
Expand Down
5 changes: 2 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,9 +1261,7 @@ def _change_state_for_executable_task_instances(self, task_instances,
# set TIs to queued state
for task_instance in tis_to_set_to_queued:
task_instance.state = State.QUEUED
task_instance.queued_dttm = (timezone.utcnow()
if not task_instance.queued_dttm
else task_instance.queued_dttm)
task_instance.queued_dttm = timezone.utcnow()
session.merge(task_instance)

# Generate a list of SimpleTaskInstance for the use of queuing
Expand Down Expand Up @@ -1391,6 +1389,7 @@ def _change_state_for_tasks_failed_to_execute(self, session):
# set TIs to queued state
for task_instance in tis_to_set_to_scheduled:
task_instance.state = State.SCHEDULED
task_instance.queued_dttm = None
self.executor.queued_tasks.pop(task_instance.key)

task_instance_str = "\n\t".join(
Expand Down
13 changes: 12 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
pid = Column(Integer)
executor_config = Column(PickleType(pickler=dill))
# If adding new fields here then remember to add them to
# refresh_from_db() or they wont display in the UI correctly

__table_args__ = (
Index('ti_dag_state', dag_id, state),
Expand Down Expand Up @@ -443,14 +445,23 @@ def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_
else:
ti = qry.first()
if ti:
self.state = ti.state
# Fields ordered per model definition
self.start_date = ti.start_date
self.end_date = ti.end_date
self.duration = ti.duration
self.state = ti.state
# Get the raw value of try_number column, don't read through the
# accessor here otherwise it will be incremented by one already.
self.try_number = ti._try_number
self.max_tries = ti.max_tries
self.hostname = ti.hostname
self.unixname = ti.unixname
self.job_id = ti.job_id
self.pool = ti.pool
self.queue = ti.queue
self.priority_weight = ti.priority_weight
self.operator = ti.operator
self.queued_dttm = ti.queued_dttm
self.pid = ti.pid
if refresh_executor_config:
self.executor_config = ti.executor_config
Expand Down