Skip to content

Commit

Permalink
Fix broken SLA Mechanism (#14056)
Browse files Browse the repository at this point in the history
closes #14050

We were not de-serializing `BaseOperator.sla` properly, hence
we were returning float instead of `timedelta` object.

Example: 100.0 instead of timedelta(seconds=100)

And because we had a check in _manage_sla in `SchedulerJob` and `DagFileProcessor`,
we were skipping SLA.

SchedulerJob:
https://github.com/apache/airflow/blob/88bdcfa0df5bcb4c489486e05826544b428c8f43/airflow/jobs/scheduler_job.py#L1766-L1768

DagFileProcessor:
https://github.com/apache/airflow/blob/88bdcfa0df5bcb4c489486e05826544b428c8f43/airflow/jobs/scheduler_job.py#L395-L397
(cherry picked from commit 604a37e)
  • Loading branch information
kaxil committed Feb 4, 2021
1 parent 3fbbe3e commit 8f643b8
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 1 deletion.
2 changes: 2 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
self.log.info("Running SLA Checks for %s", dag.dag_id)
if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
return
Expand Down Expand Up @@ -552,6 +553,7 @@ def execute_callbacks(
:param session: DB session.
"""
for request in callback_requests:
self.log.debug("Processing Callback Request: %s", request)
try:
if isinstance(request, TaskCallbackRequest):
self._execute_task_callbacks(dagbag, request)
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
v = set(v)
elif k == "subdag":
v = SerializedDAG.deserialize_dag(v)
elif k in {"retry_delay", "execution_timeout"}:
elif k in {"retry_delay", "execution_timeout", "sla"}:
v = cls._deserialize_timedelta(v)
elif k in encoded_op["template_fields"]:
pass
Expand Down
3 changes: 3 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,9 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self):
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
DummyOperator(task_id='task1', dag=dag, sla=timedelta(seconds=60))

# Used Serialized DAG as Serialized DAG is used in Scheduler
dag = SerializedDAG.from_json(SerializedDAG.to_json(dag))

with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = SchedulerJob(subdir=os.devnull)
mock_agent = mock.MagicMock()
Expand Down
4 changes: 4 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"depends_on_past": False,
"retries": 1,
"retry_delay": {"__type": "timedelta", "__var": 300.0},
"sla": {"__type": "timedelta", "__var": 100.0},
},
},
"start_date": 1564617600.0,
Expand All @@ -84,6 +85,7 @@
"owner": "airflow",
"retries": 1,
"retry_delay": 300.0,
"sla": 100.0,
"_downstream_task_ids": [],
"_inlets": [],
"_is_dummy": False,
Expand Down Expand Up @@ -111,6 +113,7 @@
"task_id": "custom_task",
"retries": 1,
"retry_delay": 300.0,
"sla": 100.0,
"_downstream_task_ids": [],
"_inlets": [],
"_is_dummy": False,
Expand Down Expand Up @@ -158,6 +161,7 @@ def make_simple_dag():
"retries": 1,
"retry_delay": timedelta(minutes=5),
"depends_on_past": False,
"sla": timedelta(seconds=100),
},
start_date=datetime(2019, 8, 1),
is_paused_upon_creation=False,
Expand Down

0 comments on commit 8f643b8

Please sign in to comment.