Skip to content

Commit

Permalink
Fix TaskNotFound in log endpoint (apache#13872)
Browse files Browse the repository at this point in the history
(cherry picked from commit dfbccd3)
  • Loading branch information
vemikhaylov authored and kaxil committed Feb 4, 2021
1 parent 3d71e7d commit e60ca11
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
6 changes: 5 additions & 1 deletion airflow/api_connexion/endpoints/log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema
from airflow.exceptions import TaskNotFound
from airflow.models import DagRun
from airflow.security import permissions
from airflow.utils.log.log_reader import TaskLogReader
Expand Down Expand Up @@ -71,7 +72,10 @@ def get_log(session, dag_id, dag_run_id, task_id, task_try_number, full_content=

dag = current_app.dag_bag.get_dag(dag_id)
if dag:
ti.task = dag.get_task(ti.task_id)
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
pass

return_type = request.accept_mimetypes.best_match(['text/plain', 'application/json'])

Expand Down
28 changes: 28 additions & 0 deletions tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,34 @@ def test_should_respond_200_text_plain(self, session):
== f"\n*** Reading local file: {expected_filename}\nLog for testing.\n"
)

@provide_session
def test_get_logs_of_removed_task(self, session):
self._create_dagrun(session)

# Recreate DAG without tasks
dagbag = self.app.dag_bag # pylint: disable=no-member
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
dagbag.bag_dag(dag=dag, root_dag=dag)

key = self.app.config["SECRET_KEY"]
serializer = URLSafeSerializer(key)
token = serializer.dumps({"download_logs": True})

response = self.client.get(
f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
headers={'Accept': 'text/plain'},
environ_overrides={'REMOTE_USER': "test"},
)
expected_filename = "{}/{}/{}/{}/1.log".format(
self.log_dir, self.DAG_ID, self.TASK_ID, self.default_time.replace(':', '.')
)
assert 200 == response.status_code
assert (
response.data.decode('utf-8')
== f"\n*** Reading local file: {expected_filename}\nLog for testing.\n"
)

@provide_session
def test_get_logs_response_with_ti_equal_to_none(self, session):
self._create_dagrun(session)
Expand Down

0 comments on commit e60ca11

Please sign in to comment.