Skip to content

Commit

Permalink
[AIRFLOW-6728] Change various DAG info methods to POST (#7364)
Browse files Browse the repository at this point in the history
If the number of dags was large and/or the length of the DAG ids were too large this would exceed the maximum possible query string limit.

To work around that we have made these endpoints always make POST requests
  • Loading branch information
madison-ookla authored Feb 7, 2020
1 parent e6d0b57 commit b738c9e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 42 deletions.
22 changes: 15 additions & 7 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ <h2>DAGs</h2>
return false;
}

var encoded_dag_ids = [];
var encoded_dag_ids = new URLSearchParams();

$.each($("[id^=toggle]"), function(i, v) {
var dag_id = $(v).attr('dag_id');
encoded_dag_ids.push(encodeURIComponent(dag_id));
encoded_dag_ids.append('dag_ids', dag_id);

$(v).change (function() {
if ($(v).prop('checked')) {
Expand Down Expand Up @@ -521,12 +521,20 @@ <h2>DAGs</h2>
});
}

if (encoded_dag_ids.length > 0) {
if (encoded_dag_ids.has('dag_ids')) {
// dags on page fetch stats
d3.json("{{ url_for('Airflow.blocked') }}?dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}?dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler);
d3.json("{{ url_for('Airflow.blocked') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post(encoded_dag_ids, taskStatsHandler);
}
else {
// no dags, hide the loading gifs
Expand Down
22 changes: 12 additions & 10 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def get_int_arg(value, default=0):
num_runs=num_runs,
tags=tags)

@expose('/dag_stats')
@expose('/dag_stats', methods=['POST'])
@has_access
@provide_session
def dag_stats(self, session=None):
Expand All @@ -341,9 +341,9 @@ def dag_stats(self, session=None):
dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state))\
.group_by(dr.dag_id, dr.state)

# Filter by get parameters
# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -376,7 +376,7 @@ def dag_stats(self, session=None):

return wwwutils.json_response(payload)

@expose('/task_stats')
@expose('/task_stats', methods=['POST'])
@has_access
@provide_session
def task_stats(self, session=None):
Expand All @@ -392,9 +392,9 @@ def task_stats(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)}

# Filter by get parameters
# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -476,7 +476,7 @@ def task_stats(self, session=None):
})
return wwwutils.json_response(payload)

@expose('/last_dagruns')
@expose('/last_dagruns', methods=['POST'])
@has_access
@provide_session
def last_dagruns(self, session=None):
Expand All @@ -487,8 +487,9 @@ def last_dagruns(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -1122,7 +1123,7 @@ def dagrun_clear(self):
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=True, confirmed=confirmed)

@expose('/blocked')
@expose('/blocked', methods=['POST'])
@has_access
@provide_session
def blocked(self, session=None):
Expand All @@ -1131,8 +1132,9 @@ def blocked(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down
62 changes: 37 additions & 25 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,20 +475,20 @@ def test_rendered(self):

def test_blocked(self):
url = 'blocked'
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.assertEqual(200, resp.status_code)

def test_dag_stats(self):
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_task_stats(self):
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_task_stats_only_noncompleted(self):
conf.set("webserver", "show_recent_stats_for_completed_runs", "False")
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_dag_details(self):
Expand All @@ -507,19 +507,22 @@ def test_graph(self):
self.check_content_in_response('runme_1', resp)

def test_last_dagruns(self):
resp = self.client.get('last_dagruns', follow_redirects=True)
resp = self.client.post('last_dagruns', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_last_dagruns_success_when_selecting_dags(self):
resp = self.client.get('last_dagruns?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('last_dagruns',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('last_dagruns?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('last_dagruns',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand Down Expand Up @@ -1413,33 +1416,36 @@ def test_index_for_all_dag_user(self):
def test_dag_stats_success(self):
self.logout()
self.login()
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_dag_stats_failure(self):
self.logout()
self.login()
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_not_in_response('example_subdag_operator', resp)

def test_dag_stats_success_for_all_dag_user(self):
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_in_response('example_subdag_operator', resp)
self.check_content_in_response('example_bash_operator', resp)

def test_dag_stats_success_when_selecting_dags(self):
resp = self.client.get('dag_stats?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('dag_stats',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('dag_stats?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('dag_stats',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand All @@ -1449,20 +1455,20 @@ def test_dag_stats_success_when_selecting_dags(self):
def test_task_stats_success(self):
self.logout()
self.login()
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_task_stats_failure(self):
self.logout()
self.login()
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_not_in_response('example_subdag_operator', resp)

def test_task_stats_success_for_all_dag_user(self):
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)
self.check_content_in_response('example_subdag_operator', resp)

Expand All @@ -1471,15 +1477,18 @@ def test_task_stats_success_when_selecting_dags(self):
self.login(username='all_dag_user',
password='all_dag_user')

resp = self.client.get('task_stats?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('task_stats',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('task_stats?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('task_stats',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand Down Expand Up @@ -1649,28 +1658,31 @@ def test_blocked_success(self):
url = 'blocked'
self.logout()
self.login()
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_blocked_success_for_all_dag_user(self):
url = 'blocked'
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)
self.check_content_in_response('example_subdag_operator', resp)

def test_blocked_success_when_selecting_dags(self):
resp = self.client.get('blocked?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('blocked',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))}
self.assertNotIn('example_bash_operator', blocked_dags)
self.assertIn('example_subdag_operator', blocked_dags)

# Multiple
resp = self.client.get('blocked?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('blocked',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))}
self.assertIn('example_bash_operator', blocked_dags)
Expand Down

0 comments on commit b738c9e

Please sign in to comment.