Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airflow dag success , but tasks not yet started,not scheduled. #15559

Closed
trollhe opened this issue Apr 28, 2021 · 5 comments · Fixed by #15714
Closed

airflow dag success , but tasks not yet started,not scheduled. #15559

trollhe opened this issue Apr 28, 2021 · 5 comments · Fixed by #15714
Labels
affected_version:2.0 Issues Reported for 2.0 area:core kind:bug This is a clearly a bug

Comments

@trollhe
Copy link

trollhe commented Apr 28, 2021

hi,team:

my dag is 1 minute schedule,one parts dag state is success,but tasks state is not yet started in a dag:

image

how can to fix it?

@trollhe trollhe added the kind:feature Feature Requests label Apr 28, 2021
@xinbinhuang xinbinhuang added kind:bug This is a clearly a bug and removed kind:feature Feature Requests labels Apr 28, 2021
@DHARESHWAR
Copy link

I am facing similar issues on Airflow 2.0.1
Dag is successful and task didn't start. It is happening when I am running 2 airflow schedulers

Screenshot 2021-04-28 at 12 23 38 PM

@vikramkoka vikramkoka added affected_version:2.0 Issues Reported for 2.0 area:core labels May 3, 2021
@SamWheating
Copy link
Contributor

SamWheating commented May 3, 2021

We've been experiencing the same issues (Airflow 2.0.2, 2x schedulers, MySQL 8).

Furthermore, when a task has depends_on_past=True this will cause the DAG to completely lock as no future runs can be created.

I'm currently trying to recreate this by running some high-frequency DAGs with and without multiple schedulers, I'll update here with my findings.

Update:

I was able to get this to happen by running 3 schedulers with a DAG running every 1 minute:
image

From the scheduler logs:
DEBUG - number of tis tasks for <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-03 20:09:00+00:00: scheduled__2021-05-03T20:09:00+00:00, externally triggered: False>: 0 task(s)

So it looks like the scheduler was running DagRun.update_state() before any Task instances had been created, which would cause this DAG to be marked as successful.

Do you think this could this be either:

  1. A race condition in the scheduler code?
  2. A result of improper scheduler locking? (We're using mySQL 8 with use_row_level_locking=True)

Update 2:

I did some more investigating today and found that while the DAGRun and TaskInstances are created by one scheduler, they are soon after marked as successful by a different scheduler.

Here's some selected logs from one occurence of this issue to demonstrate this (I added debug log entries to the TaskInstance and DagRun constructors so that I could see when and where they were being created)

airflow-scheduler-1:

2021-05-04 20:12:00,123 dag.py: DEBUG - Created DagRun <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-04 20:11:00+00:00: scheduled__2021-05-04T20:11:00+00:00, externally triggered: False>

2021-05-04 20:12:00,126 taskinstance.py: DEBUG - Created TaskInstance <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-04 20:11:00+00:00 [None]>

2021-05-04 20:12:00,134 scheduler_job.py: INFO - DAG airflow-utils.send-airflow-heartbeat_every_minute is at (or above) max_active_runs (1 of 1), not creating any more runs

airflow-scheduler-2:

2021-05-04 20:12:00,156 dagrun.py: DEBUG - number of tis tasks for <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-04 20:11:00+00:00: scheduled__2021-05-04T20:11:00+00:00, externally triggered: False>: 0 task(s)

2021-05-04 20:12:00,156 dagrun.py: INFO - Marking run <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-04 20:11:00+00:00: scheduled__2021-05-04T20:11:00+00:00, externally triggered: False> successful

So within ~30ms of the DagRun being created, another scheduler instance marked it as complete based on having no tasks.

@MatthewRBruce
Copy link
Contributor

Okay, so we dug into this and here's what we found (TL;DR we think we're getting bit by mysql's default isolation level of REPEATABLE_READ):

from airflow import settings
from airflow.models import dagrun
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils.types import DagRunType
from airflow.utils.state import State
from sqlalchemy import and_, func, not_, or_, tuple_
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks

session = settings.Session()
cls = dagrun.DagRun

Here we execute a query to get the recent DagRuns for airflow-utils.send-airflow-heartbeat_every_minute and we see we get runs up to 13:37:00

dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
for dag_run in dag_run_query:
...   print(dag_run)
...
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:32:00+00:00: scheduled__2021-05-05T13:32:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:31:00+00:00: scheduled__2021-05-05T13:31:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:30:00+00:00: scheduled__2021-05-05T13:30:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:29:00+00:00: scheduled__2021-05-05T13:29:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:28:00+00:00: scheduled__2021-05-05T13:28:00+00:00, externally triggered: False>

Next we wait ~5 mins, and we run this query again but with FOR UPDATE and we get the new rows (This is how the schdeuler will determine what dag runs to schedule here: https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1477)

dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
for dag_run in dag_query_run_update:
...   print(dag_run)
...
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:42:00+00:00: scheduled__2021-05-05T13:42:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:41:00+00:00: scheduled__2021-05-05T13:41:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:40:00+00:00: scheduled__2021-05-05T13:40:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:39:00+00:00: scheduled__2021-05-05T13:39:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:38:00+00:00: scheduled__2021-05-05T13:38:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False>

So, there are some new DagRuns, great. When the scheduler goes to get the related task instances (https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L307-L310) it will execute the query, but in the original snapshot:

ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
for ti in ti_query:
...   print (ti)
...
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:37:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:36:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:35:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:34:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:33:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:32:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:31:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:30:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:29:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:28:00+00:00 [success]>

As we can see in ^ This query can only see the TIs up until 13:37:00, so it finds 0 tasks for the recent runs, which means this https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L444 will mark the DAG run as successful

@MatthewRBruce
Copy link
Contributor

MatthewRBruce commented May 5, 2021

and confirmed with READ_COMMITTED, we get the expected results:

>>> dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
>>> for dag_run in dag_run_query:
...   print(dag_run)
...
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:31:00+00:00: scheduled__2021-05-05T14:31:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:30:00+00:00: scheduled__2021-05-05T14:30:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:29:00+00:00: scheduled__2021-05-05T14:29:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:28:00+00:00: scheduled__2021-05-05T14:28:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:27:00+00:00: scheduled__2021-05-05T14:27:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:26:00+00:00: scheduled__2021-05-05T14:26:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:25:00+00:00: scheduled__2021-05-05T14:25:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:24:00+00:00: scheduled__2021-05-05T14:24:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:23:00+00:00: scheduled__2021-05-05T14:23:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:22:00+00:00: scheduled__2021-05-05T14:22:00+00:00, externally triggered: False>

wait a few minutes

>>> dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
>>> for dag_run in dag_query_run_update:
...   print(dag_run)
...
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:34:00+00:00: scheduled__2021-05-05T14:34:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:33:00+00:00: scheduled__2021-05-05T14:33:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:32:00+00:00: scheduled__2021-05-05T14:32:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:31:00+00:00: scheduled__2021-05-05T14:31:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:30:00+00:00: scheduled__2021-05-05T14:30:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:29:00+00:00: scheduled__2021-05-05T14:29:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:28:00+00:00: scheduled__2021-05-05T14:28:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:27:00+00:00: scheduled__2021-05-05T14:27:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:26:00+00:00: scheduled__2021-05-05T14:26:00+00:00, externally triggered: False>
<DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:25:00+00:00: scheduled__2021-05-05T14:25:00+00:00, externally triggered: False>
>>> ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
>>> for ti in ti_query:
...   print(ti)
...
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:34:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:33:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:32:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:31:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:30:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:29:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:28:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:27:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:26:00+00:00 [success]>
<TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:25:00+00:00 [success]>

@SamWheating
Copy link
Contributor

Just following up to confirm that switching our database's isolation level from REPEATABLE READ to READ COMMITTED fixed the original issue in our case and allowed us to run multiple schedulers without issue.

Its worth noting that the default for Postgres is READ COMMITTED, so Postgres users likely did not encounter this issue.

@trollhe, could you confirm the following?

  1. You're also using MySQL 8 with REPEATABLE READ isolation level.
  2. Changing the isolation level to READ COMMITTED fixes the original issue.

Fixing this Issue

I think there's a few possible ways to "fix" this issue.

  1. Add a note in the docs that reminds users to configure the correct isolation level when using MySQL 8
  2. Add a check in create_engine_args() to add isolation_level=READ COMMITTED when MySQL is being used.
  3. Add a config option to provide additional engine args, or to set the isolation level.

Any thoughts on the preferred fix? Feel free to assign this to me and I can implement one of these fixes this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants