From 8f643b8b10315a40b328e2b8fa4ea27bd6e1dab6 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 4 Feb 2021 01:59:31 +0000 Subject: [PATCH] Fix broken SLA Mechanism (#14056) closes https://github.com/apache/airflow/issues/14050 We were not de-serializing `BaseOperator.sla` properly, hence we were returning float instead of `timedelta` object. Example: 100.0 instead of timedelta(seconds=100) And because we had a check in _manage_sla in `SchedulerJob` and `DagFileProcessor`, we were skipping SLA. SchedulerJob: https://github.com/apache/airflow/blob/88bdcfa0df5bcb4c489486e05826544b428c8f43/airflow/jobs/scheduler_job.py#L1766-L1768 DagFileProcessor: https://github.com/apache/airflow/blob/88bdcfa0df5bcb4c489486e05826544b428c8f43/airflow/jobs/scheduler_job.py#L395-L397 (cherry picked from commit 604a37eee50715db345c5a7afed085c9afe8530d) --- airflow/jobs/scheduler_job.py | 2 ++ airflow/serialization/serialized_objects.py | 2 +- tests/jobs/test_scheduler_job.py | 3 +++ tests/serialization/test_dag_serialization.py | 4 ++++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 3b213abf6604..f25ae02431dd 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -393,6 +393,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: We are assuming that the scheduler runs often, so we only check for tasks that should have succeeded in the past hour. """ + self.log.info("Running SLA Checks for %s", dag.dag_id) if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks): self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) return @@ -552,6 +553,7 @@ def execute_callbacks( :param session: DB session. """ for request in callback_requests: + self.log.debug("Processing Callback Request: %s", request) try: if isinstance(request, TaskCallbackRequest): self._execute_task_callbacks(dagbag, request) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index f59ce6a6f3c3..38df10a5a0a4 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -451,7 +451,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: v = set(v) elif k == "subdag": v = SerializedDAG.deserialize_dag(v) - elif k in {"retry_delay", "execution_timeout"}: + elif k in {"retry_delay", "execution_timeout", "sla"}: v = cls._deserialize_timedelta(v) elif k in encoded_op["template_fields"]: pass diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 0d9745284fa0..38678333798f 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3525,6 +3525,9 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self): dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') DummyOperator(task_id='task1', dag=dag, sla=timedelta(seconds=60)) + # Used Serialized DAG as Serialized DAG is used in Scheduler + dag = SerializedDAG.from_json(SerializedDAG.to_json(dag)) + with patch.object(settings, "CHECK_SLAS", True): scheduler_job = SchedulerJob(subdir=os.devnull) mock_agent = mock.MagicMock() diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 1b7524acfe17..2046e226d953 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -60,6 +60,7 @@ "depends_on_past": False, "retries": 1, "retry_delay": {"__type": "timedelta", "__var": 300.0}, + "sla": {"__type": "timedelta", "__var": 100.0}, }, }, "start_date": 1564617600.0, @@ -84,6 +85,7 @@ "owner": "airflow", "retries": 1, "retry_delay": 300.0, + "sla": 100.0, "_downstream_task_ids": [], "_inlets": [], "_is_dummy": False, @@ -111,6 +113,7 @@ "task_id": "custom_task", "retries": 1, "retry_delay": 300.0, + "sla": 100.0, "_downstream_task_ids": [], "_inlets": [], "_is_dummy": False, @@ -158,6 +161,7 @@ def make_simple_dag(): "retries": 1, "retry_delay": timedelta(minutes=5), "depends_on_past": False, + "sla": timedelta(seconds=100), }, start_date=datetime(2019, 8, 1), is_paused_upon_creation=False,