Skip to content

Commit

Permalink
Use existing DagBag for 'dag_details' & trigger Endpoints (#8501)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Apr 21, 2020
1 parent b88ca51 commit a8cedf8
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
17 changes: 0 additions & 17 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

from airflow.configuration import conf
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DagBag, DagModel
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
Expand Down Expand Up @@ -425,19 +424,3 @@ def is_utcdatetime(self, col_name):
(('is_utcdatetime', DateTimeField, AirflowDateTimePickerWidget),) +
FieldConverter.conversion_table
)


def get_dag(orm_dag: DagModel, store_serialized_dags=False):
"""Creates a dagbag to load and return a DAG.
Calling it from UI should set store_serialized_dags = STORE_SERIALIZED_DAGS.
There may be a delay for scheduler to write serialized DAG into database,
loads from file in this case.
FIXME: remove it when webserver does not access to DAG folder in future.
"""
dag = DagBag(
dag_folder=orm_dag.fileloc, store_serialized_dags=store_serialized_dags).get_dag(orm_dag.dag_id)
if store_serialized_dags and dag is None:
dag = DagBag(
dag_folder=orm_dag.fileloc, store_serialized_dags=False).get_dag(orm_dag.dag_id)
return dag
7 changes: 2 additions & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
from airflow.www.forms import (
ConnectionForm, DagRunForm, DateTimeForm, DateTimeWithNumRunsForm, DateTimeWithNumRunsWithDagRunsForm,
)
from airflow.www.utils import get_dag
from airflow.www.widgets import AirflowModelListWidget

PAGE_SIZE = conf.getint('webserver', 'page_size')
Expand Down Expand Up @@ -575,9 +574,7 @@ def code(self, session=None):
@provide_session
def dag_details(self, session=None):
dag_id = request.args.get('dag_id')
dag_orm = DagModel.get_dagmodel(dag_id, session=session)
# FIXME: items needed for this view should move to the database
dag = get_dag(dag_orm, STORE_SERIALIZED_DAGS)
dag = dagbag.get_dag(dag_id)
title = "DAG details"
root = request.args.get('root', '')

Expand Down Expand Up @@ -1057,7 +1054,7 @@ def trigger(self, session=None):
conf=conf
)

dag = get_dag(dag_orm, STORE_SERIALIZED_DAGS)
dag = dagbag.get_dag(dag_id)
dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
Expand Down
25 changes: 25 additions & 0 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,19 @@ def test_dag_details(self):
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('DAG details', resp)

@parameterized.expand(["graph", "tree", "dag_details"])
@mock.patch('airflow.www.views.dagbag.get_dag')
def test_view_uses_existing_dagbag(self, endpoint, mock_get_dag):
"""
Test that Graph, Tree & Dag Details View uses the DagBag already created in views.py
instead of creating a new one.
"""
mock_get_dag.return_value = DAG(dag_id='example_bash_operator')
url = f'{endpoint}?dag_id=example_bash_operator'
resp = self.client.get(url, follow_redirects=True)
mock_get_dag.assert_called_once_with('example_bash_operator')
self.check_content_in_response('example_bash_operator', resp)

def test_dag_details_trigger_origin_tree_view(self):
dag = self.dagbag.dags['test_tree_view']
dag.create_dagrun(
Expand Down Expand Up @@ -2208,6 +2221,18 @@ def test_trigger_dag_form(self):
self.assertEqual(resp.status_code, 200)
self.check_content_in_response('Trigger DAG: {}'.format(test_dag_id), resp)

@mock.patch('airflow.www.views.dagbag.get_dag')
def test_trigger_endpoint_uses_existing_dagbag(self, mock_get_dag):
"""
Test that Trigger Endpoint uses the DagBag already created in views.py
instead of creating a new one.
"""
mock_get_dag.return_value = DAG(dag_id='example_bash_operator')
url = 'trigger?dag_id=example_bash_operator'
resp = self.client.post(url, data={}, follow_redirects=True)
mock_get_dag.assert_called_once_with('example_bash_operator')
self.check_content_in_response('example_bash_operator', resp)


class TestExtraLinks(TestBase):
def setUp(self):
Expand Down

0 comments on commit a8cedf8

Please sign in to comment.