Skip to content

Commit

Permalink
[AIRFLOW-6728] Change various DAG info methods to POST (#7364)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil committed Mar 17, 2020
1 parent ec76d28 commit 0be3c5e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 36 deletions.
24 changes: 16 additions & 8 deletions airflow/www_rbac/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ <h2>DAGs</h2>
return false;
}

var encoded_dag_ids = [];
var encoded_dag_ids = new URLSearchParams();

$.each($("[id^=toggle]"), function(i, v) {
var dag_id = encodeURIComponent($(v).attr('dag_id'));
encoded_dag_ids.push(dag_id);
var dag_id = $(v).attr('dag_id');
encoded_dag_ids.append('dag_ids', dag_id);

$(v).change (function() {
var dag_id = $(v).attr('dag_id');
Expand Down Expand Up @@ -515,12 +515,20 @@ <h2>DAGs</h2>
});
}

if (encoded_dag_ids.length > 0) {
if (encoded_dag_ids.has('dag_ids')) {
// dags on page fetch stats
d3.json("{{ url_for('Airflow.blocked') }}?dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}?dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler);
d3.json("{{ url_for('Airflow.blocked') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, taskStatsHandler);
}
else {
// no dags, hide the loading gifs
Expand Down
22 changes: 12 additions & 10 deletions airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def get_int_arg(value, default=0):
num_runs=num_runs,
tags=tags)

@expose('/dag_stats')
@expose('/dag_stats', methods=['POST'])
@has_access
@provide_session
def dag_stats(self, session=None):
Expand All @@ -345,9 +345,9 @@ def dag_stats(self, session=None):
dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state))\
.group_by(dr.dag_id, dr.state)

# Filter by get parameters
# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -380,7 +380,7 @@ def dag_stats(self, session=None):

return wwwutils.json_response(payload)

@expose('/task_stats')
@expose('/task_stats', methods=['POST'])
@has_access
@provide_session
def task_stats(self, session=None):
Expand All @@ -395,9 +395,9 @@ def task_stats(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)}

# Filter by get parameters
# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -474,7 +474,7 @@ def task_stats(self, session=None):
})
return wwwutils.json_response(payload)

@expose('/last_dagruns')
@expose('/last_dagruns', methods=['POST'])
@has_access
@provide_session
def last_dagruns(self, session=None):
Expand All @@ -485,8 +485,9 @@ def last_dagruns(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -1113,7 +1114,7 @@ def dagrun_clear(self):
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=True, confirmed=confirmed)

@expose('/blocked')
@expose('/blocked', methods=['POST'])
@has_access
@provide_session
def blocked(self, session=None):
Expand All @@ -1122,8 +1123,9 @@ def blocked(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down
85 changes: 67 additions & 18 deletions tests/www_rbac/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,20 @@ def test_pickle_info(self):

def test_blocked(self):
url = 'blocked'
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.assertEqual(200, resp.status_code)

def test_dag_stats(self):
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_task_stats(self):
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_task_stats_only_noncompleted(self):
conf.set("webserver", "show_recent_stats_for_completed_runs", "False")
resp = self.client.post('task_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_dag_details(self):
Expand All @@ -554,9 +559,28 @@ def test_graph(self):
self.check_content_in_response('runme_1', resp)

def test_last_dagruns(self):
resp = self.client.get('last_dagruns', follow_redirects=True)
resp = self.client.post('last_dagruns', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_last_dagruns_success_when_selecting_dags(self):
resp = self.client.post('last_dagruns',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.post('last_dagruns',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)
self.check_content_not_in_response('example_xcom', resp)

def test_tree(self):
url = 'tree?dag_id=example_bash_operator'
resp = self.client.get(url, follow_redirects=True)
Expand Down Expand Up @@ -1405,33 +1429,36 @@ def test_index_for_all_dag_user(self):
def test_dag_stats_success(self):
self.logout()
self.login()
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_dag_stats_failure(self):
self.logout()
self.login()
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_not_in_response('example_subdag_operator', resp)

def test_dag_stats_success_for_all_dag_user(self):
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_in_response('example_subdag_operator', resp)
self.check_content_in_response('example_bash_operator', resp)

def test_dag_stats_success_when_selecting_dags(self):
resp = self.client.get('dag_stats?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('dag_stats',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('dag_stats?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('dag_stats',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand All @@ -1441,20 +1468,20 @@ def test_dag_stats_success_when_selecting_dags(self):
def test_task_stats_success(self):
self.logout()
self.login()
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_task_stats_failure(self):
self.logout()
self.login()
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_not_in_response('example_subdag_operator', resp)

def test_task_stats_success_for_all_dag_user(self):
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)
self.check_content_in_response('example_subdag_operator', resp)

Expand All @@ -1463,15 +1490,18 @@ def test_task_stats_success_when_selecting_dags(self):
self.login(username='all_dag_user',
password='all_dag_user')

resp = self.client.get('task_stats?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('task_stats',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('task_stats?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('task_stats',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand Down Expand Up @@ -1650,18 +1680,37 @@ def test_blocked_success(self):
url = 'blocked'
self.logout()
self.login()
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_blocked_success_for_all_dag_user(self):
url = 'blocked'
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)
self.check_content_in_response('example_subdag_operator', resp)

def test_blocked_success_when_selecting_dags(self):
resp = self.client.post('blocked',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))}
self.assertNotIn('example_bash_operator', blocked_dags)
self.assertIn('example_subdag_operator', blocked_dags)

# Multiple
resp = self.client.post('blocked',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))}
self.assertIn('example_bash_operator', blocked_dags)
self.assertIn('example_subdag_operator', blocked_dags)
self.check_content_not_in_response('example_xcom', resp)

def test_failed_success(self):
self.logout()
self.login()
Expand Down

0 comments on commit 0be3c5e

Please sign in to comment.