Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up clear_task_instances by doing a single sql delete for TaskReschedule #14048

Merged
merged 9 commits into from
Feb 10, 2021

Conversation

yuqian90
Copy link
Contributor

@yuqian90 yuqian90 commented Feb 3, 2021

Clearing large number of tasks takes a long time. Most of the time is spent at this line in clear_task_instances (more than 95% time). This slowness sometimes causes the webserver to timeout because the web_server_worker_timeout is hit.

        # Clear all reschedules related to the ti to clear
        session.query(TR).filter(
            TR.dag_id == ti.dag_id,
            TR.task_id == ti.task_id,
            TR.execution_date == ti.execution_date,
            TR.try_number == ti.try_number,
        ).delete()

This line is very slow because it's deleting TaskReschedule rows in a for loop one by one.

This PR simply changes this code to delete TaskReschedule in a single sql query with a bunch of OR conditions. It's effectively doing the same, but now it's much faster. Simple profiling shows that it's at least seven times faster when deleting thousands of TaskReschedule.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. @kaxil - can you take a look on that one as well? It looks like nice thing to cherry-pick to 2.0.1, very localized and seems to improve the case that is rather useful.

@github-actions
Copy link

github-actions bot commented Feb 3, 2021

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Feb 3, 2021
@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 4, 2021

Thanks @potiuk and @mik-laj for the review. I updated the PR according to your comments.

Also, I noticed that doing TR.__table__.delete().where(...) seems to be slightly faster than what I was doing with session.query(...).filter(...).delete() by a small margin. So I changed the PR to use it instead.

@kaxil
Copy link
Member

kaxil commented Feb 6, 2021

Minor suggestions added, LGTM otherwise

@kaxil kaxil added this to the Airflow 2.0.2 milestone Feb 6, 2021
@kaxil
Copy link
Member

kaxil commented Feb 6, 2021

Pushed 99a77f3 -- Let's wait for the CI, will merge after that, thanks @yuqian90

airflow/models/taskinstance.py Outdated Show resolved Hide resolved
@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 8, 2021

Followed @ashb 's suggestion and speed it up even further. Please take another look.

@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 8, 2021

Hi @kaxil this is the experiment I did. test_create_large_dag_with_task_reschedule creates a lot of TaskInstance and TaskReschedule in the db. Then test_clear_large_dag_with_task_reschedule times how long it takes to call clear_task_instances on them.

import pytest

@pytest.fixture(scope="module")
def big_dag():
    with DAG(
        'test_create_large_dag_with_task_reschedule',
        start_date=DEFAULT_DATE,
        end_date=DEFAULT_DATE + datetime.timedelta(days=30),
    ) as dag:
        for i in range(1000):
            PythonSensor(task_id=f'task_{i}', python_callable=lambda: False, mode="reschedule")

    yield dag

import copy

def test_create_large_dag_with_task_reschedule(big_dag):

    tis = []

    for i in range(10):
        execution_date = DEFAULT_DATE + datetime.timedelta(days=i)
        big_dag.create_dagrun(
            execution_date=execution_date,
            state=State.RUNNING,
            run_type=DagRunType.SCHEDULED,
        )
        for task in big_dag.tasks:
            ti = TI(task=copy.copy(task), execution_date=execution_date)
            tis.append(ti)

    import pendulum

    tss = []
    for ti in tis:
        for i in range(5):
            tss.append(TaskReschedule(
                task=ti.task,
                execution_date=ti.execution_date,
                try_number=1,
                start_date=pendulum.now(),
                end_date=pendulum.now(),
                reschedule_date=pendulum.now()))

    def chunks(lst, n):
        """Yield successive n-sized chunks from lst."""
        for i in range(0, len(lst), n):
            yield lst[i:i + n]


    with create_session() as session:
        for chunk in chunks([{"task_id": tr.task_id, "dag_id": tr.dag_id, "execution_date": tr.execution_date,
                              "try_number": tr.try_number, "start_date": tr.start_date, "end_date": tr.end_date,
                              "duration": tr.duration, "reschedule_date": tr.reschedule_date} for tr in tss], 10000):
            session.execute(TaskReschedule.__table__.insert(), chunk)
            session.commit()

def test_clear_large_dag_with_task_reschedule(big_dag):
    import time

    with create_session() as session:
        def count_task_reschedule():
            return (
                session.query(TaskReschedule)
                .filter(
                    TaskReschedule.dag_id == big_dag.dag_id,
                    TaskReschedule.try_number == 1,
                )
                .count()
            )

        assert count_task_reschedule() == 1000 * 10 * 5
        qry = session.query(TI).filter(TI.dag_id == big_dag.dag_id).all()
        start = time.time()
        clear_task_instances(qry, session, dag=big_dag)
        end = time.time()
        print(f"clear_task_instances took {end - start}")
        assert count_task_reschedule() == 0
        session.rollback()

@kaxil kaxil merged commit 9036ce2 into apache:master Feb 10, 2021
ashb pushed a commit that referenced this pull request Mar 19, 2021
…schedule (#14048)

Clearing large number of tasks takes a long time. Most of the time is spent at this line in clear_task_instances (more than 95% time). This slowness sometimes causes the webserver to timeout because the web_server_worker_timeout is hit.

```
        # Clear all reschedules related to the ti to clear
        session.query(TR).filter(
            TR.dag_id == ti.dag_id,
            TR.task_id == ti.task_id,
            TR.execution_date == ti.execution_date,
            TR.try_number == ti.try_number,
        ).delete()
```
This line was very slow because it's deleting TaskReschedule rows in a for loop one by one.

This PR simply changes this code to delete TaskReschedule in a single sql query with a bunch of OR conditions. It's effectively doing the same, but now it's much faster.

Some profiling showed great speed improvement (something like 40 to 50 times faster) compared to the first iteration. So the overall performance should now be 300 times faster than the original for loop deletion.

(cherry picked from commit 9036ce2)
@yuqian90 yuqian90 deleted the slow_clear_task_instances branch May 29, 2021 08:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants