Replies: 7 comments 11 replies
-
Dag runs all successd, But the Scheduler frequently exited with above error. |
Beta Was this translation helpful? Give feedback.
-
Yes. looks like the same issue as #27300 from the first glance. No fix or reason for it is yet known. |
Beta Was this translation helpful? Give feedback.
-
What is the Database you are using @tongtie ? |
Beta Was this translation helpful? Give feedback.
-
I found all the errors appearing here , the So I deleted the mapping task , with default settings, The scheduler runs well, no exited error. |
Beta Was this translation helpful? Give feedback.
-
The code here is not helpful in reproducing this, @tongtie maybe you can make it more helpful so that one can easily copy and run it |
Beta Was this translation helpful? Give feedback.
-
Reading about the sqlalchemy synchronization strategies https://docs.sqlalchemy.org/en/14/orm/session_basics.html#selecting-a-synchronization-strategy, I suspect that for this particular StaleDataError, we need to use fetch in this update query: airflow/airflow/models/abstractoperator.py Lines 553 to 558 in 94b3b89 |
Beta Was this translation helpful? Give feedback.
-
Row locking this update looks like the surest solution to this: diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index e83b7838c0..ac76a9c000 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -35,6 +35,7 @@ from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import WeightRule
+from airflow.utils.sqlalchemy import with_row_locks, skip_locked
TaskStateChangeCallback = Callable[[Context], None]
@@ -550,13 +551,20 @@ class AbstractOperator(LoggingMixin, DAGNode):
# Any (old) task instances with inapplicable indexes (>= the total
# number we need) are set to "REMOVED".
- session.query(TaskInstance).filter(
+ query = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == run_id,
TaskInstance.map_index >= total_expanded_ti_count,
- ).update({TaskInstance.state: TaskInstanceState.REMOVED})
-
+ )
+ to_update = with_row_locks(
+ query,
+ of=TaskInstance,
+ session=session,
+ **skip_locked(session=session)
+ )
+ for ti in to_update:
+ ti.state = TaskInstanceState.REMOVED
session.flush()
return all_expanded_tis, total_expanded_ti_count - 1 The error: |
Beta Was this translation helpful? Give feedback.
-
When I ran hundreds of tasks, encounter Scheduler error frequently.
this is my airflow info
this is my dag
The dag code like this, I deleted the business code.
Beta Was this translation helpful? Give feedback.
All reactions