Skip to content

Commit

Permalink
Group filters by dag_id execution_date and try_number
Browse files Browse the repository at this point in the history
clear_task_instances took 2.240063190460205
  • Loading branch information
yuqian90 committed Feb 8, 2021
1 parent 36e7270 commit 6b67c43
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pickle
import signal
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from typing import IO, Any, Dict, Iterable, List, NamedTuple, Optional, Tuple, Union
Expand Down Expand Up @@ -172,17 +173,34 @@ def clear_task_instances(

if tr_filter:
# Clear all reschedules related to the ti to clear
delete_qry = TR.__table__.delete().where(
or_(
and_(
TR.dag_id == dag_id,
TR.task_id == task_id,
TR.execution_date == execution_date,
TR.try_number == try_number,
)
for dag_id, task_id, execution_date, try_number in tr_filter

# This is an optimization for the common case where all tis are for a small number
# of dag_id, execution_date and try_number. Use a nested dict of dag_id,
# execution_date, try_number and task_id to construct the where clause in a
# hierarchical manner. This speeds up the delete statement by more than 40x for
# large number of tis (50k+).
task_id_by_key = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
for dag_id, task_id, execution_date, try_number in tr_filter:
task_id_by_key[dag_id][execution_date][try_number].add(task_id)

conditions = or_(
and_(
TR.dag_id == dag_id,
or_(
and_(
TR.execution_date == execution_date,
or_(
and_(TR.try_number == try_number, TR.task_id.in_(task_ids))
for try_number, task_ids in task_tries.items()
),
)
for execution_date, task_tries in dates.items()
),
)
for dag_id, dates in task_id_by_key.items()
)

delete_qry = TR.__table__.delete().where(conditions)
session.execute(delete_qry)

if job_ids:
Expand Down

0 comments on commit 6b67c43

Please sign in to comment.