From 5325492bbf508ac5aa7ba4128197fd26eed0305b Mon Sep 17 00:00:00 2001 From: vladimir Date: Sun, 24 Jan 2021 14:01:01 +0300 Subject: [PATCH] Fix TaskNotFound in log endpoint --- .../api_connexion/endpoints/log_endpoint.py | 6 +++- .../endpoints/test_log_endpoint.py | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/endpoints/log_endpoint.py index d0ebf5684db849..5fcff8ef4fc47c 100644 --- a/airflow/api_connexion/endpoints/log_endpoint.py +++ b/airflow/api_connexion/endpoints/log_endpoint.py @@ -22,6 +22,7 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import BadRequest, NotFound from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema +from airflow.exceptions import TaskNotFound from airflow.models import DagRun from airflow.security import permissions from airflow.utils.log.log_reader import TaskLogReader @@ -71,7 +72,10 @@ def get_log(session, dag_id, dag_run_id, task_id, task_try_number, full_content= dag = current_app.dag_bag.get_dag(dag_id) if dag: - ti.task = dag.get_task(ti.task_id) + try: + ti.task = dag.get_task(ti.task_id) + except TaskNotFound: + pass return_type = request.accept_mimetypes.best_match(['text/plain', 'application/json']) diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py index 509cbe7c0b6d2c..33c63e3e82f9a9 100644 --- a/tests/api_connexion/endpoints/test_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_log_endpoint.py @@ -197,6 +197,34 @@ def test_should_respond_200_text_plain(self, session): == f"\n*** Reading local file: {expected_filename}\nLog for testing.\n" ) + @provide_session + def test_get_logs_of_removed_task(self, session): + self._create_dagrun(session) + + # Recreate DAG without tasks + dagbag = self.app.dag_bag # pylint: disable=no-member + dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time)) + dagbag.bag_dag(dag=dag, root_dag=dag) + + key = self.app.config["SECRET_KEY"] + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/" + f"taskInstances/{self.TASK_ID}/logs/1?token={token}", + headers={'Accept': 'text/plain'}, + environ_overrides={'REMOTE_USER': "test"}, + ) + expected_filename = "{}/{}/{}/{}/1.log".format( + self.log_dir, self.DAG_ID, self.TASK_ID, self.default_time.replace(':', '.') + ) + assert 200 == response.status_code + assert ( + response.data.decode('utf-8') + == f"\n*** Reading local file: {expected_filename}\nLog for testing.\n" + ) + @provide_session def test_get_logs_response_with_ti_equal_to_none(self, session): self._create_dagrun(session)