From 6070377d9b83a0553541a3700d2c13b0884f05bc Mon Sep 17 00:00:00 2001 From: Madison Bowden Date: Tue, 4 Feb 2020 16:17:54 -0800 Subject: [PATCH 1/4] [AIRFLOW-6728] Change various stats methods from GET to POST --- airflow/www/views.py | 26 ++++++++++------- tests/www/test_views.py | 62 ++++++++++++++++++++++++----------------- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 985e1d0c53bbcd..f40207aa417854 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -328,7 +328,7 @@ def get_int_arg(value, default=0): num_runs=num_runs, tags=tags) - @expose('/dag_stats') + @expose('/dag_stats', methods=['POST']) @has_access @provide_session def dag_stats(self, session=None): @@ -341,9 +341,10 @@ def dag_stats(self, session=None): dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state))\ .group_by(dr.dag_id, dr.state) - # Filter by get parameters + # Filter by post parameters + request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id + unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id } if selected_dag_ids: @@ -376,7 +377,7 @@ def dag_stats(self, session=None): return wwwutils.json_response(payload) - @expose('/task_stats') + @expose('/task_stats', methods=['POST']) @has_access @provide_session def task_stats(self, session=None): @@ -392,9 +393,10 @@ def task_stats(self, session=None): if 'all_dags' in allowed_dag_ids: allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)} - # Filter by get parameters + # Filter by post parameters + request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id + unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id } if selected_dag_ids: @@ -476,7 +478,7 @@ def task_stats(self, session=None): }) return wwwutils.json_response(payload) - @expose('/last_dagruns') + @expose('/last_dagruns', methods=['POST']) @has_access @provide_session def last_dagruns(self, session=None): @@ -487,8 +489,10 @@ def last_dagruns(self, session=None): if 'all_dags' in allowed_dag_ids: allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] + # Filter by post parameters + request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id + unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id } if selected_dag_ids: @@ -1100,7 +1104,7 @@ def dagrun_clear(self): return self._clear_dag_tis(dag, start_date, end_date, origin, recursive=True, confirmed=confirmed) - @expose('/blocked') + @expose('/blocked', methods=['POST']) @has_access @provide_session def blocked(self, session=None): @@ -1109,8 +1113,10 @@ def blocked(self, session=None): if 'all_dags' in allowed_dag_ids: allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] + # Filter by post parameters + request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id + unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id } if selected_dag_ids: diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 938fa6bd15eace..7edeb95817458b 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -475,20 +475,20 @@ def test_rendered(self): def test_blocked(self): url = 'blocked' - resp = self.client.get(url, follow_redirects=True) + resp = self.client.post(url, follow_redirects=True) self.assertEqual(200, resp.status_code) def test_dag_stats(self): - resp = self.client.get('dag_stats', follow_redirects=True) + resp = self.client.post('dag_stats', follow_redirects=True) self.assertEqual(resp.status_code, 200) def test_task_stats(self): - resp = self.client.get('task_stats', follow_redirects=True) + resp = self.client.post('task_stats', follow_redirects=True) self.assertEqual(resp.status_code, 200) def test_task_stats_only_noncompleted(self): conf.set("webserver", "show_recent_stats_for_completed_runs", "False") - resp = self.client.get('task_stats', follow_redirects=True) + resp = self.client.post('task_stats', follow_redirects=True) self.assertEqual(resp.status_code, 200) def test_dag_details(self): @@ -507,19 +507,22 @@ def test_graph(self): self.check_content_in_response('runme_1', resp) def test_last_dagruns(self): - resp = self.client.get('last_dagruns', follow_redirects=True) + resp = self.client.post('last_dagruns', follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) def test_last_dagruns_success_when_selecting_dags(self): - resp = self.client.get('last_dagruns?dag_ids=example_subdag_operator', follow_redirects=True) + resp = self.client.post('last_dagruns', + json={'dag_ids': ['example_subdag_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) self.assertNotIn('example_bash_operator', stats) self.assertIn('example_subdag_operator', stats) # Multiple - resp = self.client.get('last_dagruns?dag_ids=example_subdag_operator,example_bash_operator', - follow_redirects=True) + resp = self.client.post('last_dagruns', + json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) self.assertIn('example_bash_operator', stats) @@ -1413,33 +1416,36 @@ def test_index_for_all_dag_user(self): def test_dag_stats_success(self): self.logout() self.login() - resp = self.client.get('dag_stats', follow_redirects=True) + resp = self.client.post('dag_stats', follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) def test_dag_stats_failure(self): self.logout() self.login() - resp = self.client.get('dag_stats', follow_redirects=True) + resp = self.client.post('dag_stats', follow_redirects=True) self.check_content_not_in_response('example_subdag_operator', resp) def test_dag_stats_success_for_all_dag_user(self): self.logout() self.login(username='all_dag_user', password='all_dag_user') - resp = self.client.get('dag_stats', follow_redirects=True) + resp = self.client.post('dag_stats', follow_redirects=True) self.check_content_in_response('example_subdag_operator', resp) self.check_content_in_response('example_bash_operator', resp) def test_dag_stats_success_when_selecting_dags(self): - resp = self.client.get('dag_stats?dag_ids=example_subdag_operator', follow_redirects=True) + resp = self.client.post('dag_stats', + json={'dag_ids': ['example_subdag_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) self.assertNotIn('example_bash_operator', stats) self.assertIn('example_subdag_operator', stats) # Multiple - resp = self.client.get('dag_stats?dag_ids=example_subdag_operator,example_bash_operator', - follow_redirects=True) + resp = self.client.post('dag_stats', + json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) self.assertIn('example_bash_operator', stats) @@ -1449,20 +1455,20 @@ def test_dag_stats_success_when_selecting_dags(self): def test_task_stats_success(self): self.logout() self.login() - resp = self.client.get('task_stats', follow_redirects=True) + resp = self.client.post('task_stats', follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) def test_task_stats_failure(self): self.logout() self.login() - resp = self.client.get('task_stats', follow_redirects=True) + resp = self.client.post('task_stats', follow_redirects=True) self.check_content_not_in_response('example_subdag_operator', resp) def test_task_stats_success_for_all_dag_user(self): self.logout() self.login(username='all_dag_user', password='all_dag_user') - resp = self.client.get('task_stats', follow_redirects=True) + resp = self.client.post('task_stats', follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) self.check_content_in_response('example_subdag_operator', resp) @@ -1471,15 +1477,18 @@ def test_task_stats_success_when_selecting_dags(self): self.login(username='all_dag_user', password='all_dag_user') - resp = self.client.get('task_stats?dag_ids=example_subdag_operator', follow_redirects=True) + resp = self.client.post('task_stats', + json={'dag_ids': ['example_subdag_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) self.assertNotIn('example_bash_operator', stats) self.assertIn('example_subdag_operator', stats) # Multiple - resp = self.client.get('task_stats?dag_ids=example_subdag_operator,example_bash_operator', - follow_redirects=True) + resp = self.client.post('task_stats', + json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) self.assertIn('example_bash_operator', stats) @@ -1649,7 +1658,7 @@ def test_blocked_success(self): url = 'blocked' self.logout() self.login() - resp = self.client.get(url, follow_redirects=True) + resp = self.client.post(url, follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) def test_blocked_success_for_all_dag_user(self): @@ -1657,20 +1666,23 @@ def test_blocked_success_for_all_dag_user(self): self.logout() self.login(username='all_dag_user', password='all_dag_user') - resp = self.client.get(url, follow_redirects=True) + resp = self.client.post(url, follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) self.check_content_in_response('example_subdag_operator', resp) def test_blocked_success_when_selecting_dags(self): - resp = self.client.get('blocked?dag_ids=example_subdag_operator', follow_redirects=True) + resp = self.client.post('blocked', + json={'dag_ids': ['example_subdag_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))} self.assertNotIn('example_bash_operator', blocked_dags) self.assertIn('example_subdag_operator', blocked_dags) # Multiple - resp = self.client.get('blocked?dag_ids=example_subdag_operator,example_bash_operator', - follow_redirects=True) + resp = self.client.post('blocked', + json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + follow_redirects=True) self.assertEqual(resp.status_code, 200) blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))} self.assertIn('example_bash_operator', blocked_dags) From 8b72c8c86eb15566a1ea5bd1ab284d0666a11871 Mon Sep 17 00:00:00 2001 From: Madison Bowden Date: Tue, 4 Feb 2020 16:49:13 -0800 Subject: [PATCH 2/4] [AIRFLOW-6728] Update d3.json requests on front end to use POST This also involves sending in the CSRF token. --- airflow/www/templates/airflow/dags.html | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 4c43aec964d87c..7d351c6951b2f5 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -524,10 +524,18 @@

DAGs

if (encoded_dag_ids.length > 0) { // dags on page fetch stats - d3.json("{{ url_for('Airflow.blocked') }}?dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler); - d3.json("{{ url_for('Airflow.last_dagruns') }}?dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler); - d3.json("{{ url_for('Airflow.dag_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler); - d3.json("{{ url_for('Airflow.task_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler); + d3.json("{{ url_for('Airflow.blocked') }}") + .header("X-CSRFToken", "{{ csrf_token() }}") + .post(JSON.stringify({'dag_ids': encoded_dag_ids}), blockedHandler); + d3.json("{{ url_for('Airflow.last_dagruns') }}") + .header("X-CSRFToken", "{{ csrf_token() }}") + .post(JSON.stringify({'dag_ids': encoded_dag_ids}), lastDagRunsHandler); + d3.json("{{ url_for('Airflow.dag_stats') }}") + .header("X-CSRFToken", "{{ csrf_token() }}") + .post(JSON.stringify({'dag_ids': encoded_dag_ids}), dagStatsHandler); + d3.json("{{ url_for('Airflow.task_stats') }}") + .header("X-CSRFToken", "{{ csrf_token() }}") + .post(JSON.stringify({'dag_ids': encoded_dag_ids}), taskStatsHandler); } else { // no dags, hide the loading gifs From e812504eb604af9aeaca45e9be74182f7145c1a5 Mon Sep 17 00:00:00 2001 From: Madison Bowden Date: Wed, 5 Feb 2020 18:25:33 -0800 Subject: [PATCH 3/4] [AIRFLOW-6728] Convert endpoints from JSON POST to form POST --- airflow/www/templates/airflow/dags.html | 8 ++++---- airflow/www/views.py | 12 ++++-------- tests/www/test_views.py | 16 ++++++++-------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 7d351c6951b2f5..dffc007f58b8ce 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -526,16 +526,16 @@

DAGs

// dags on page fetch stats d3.json("{{ url_for('Airflow.blocked') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post(JSON.stringify({'dag_ids': encoded_dag_ids}), blockedHandler); + .post("dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler); d3.json("{{ url_for('Airflow.last_dagruns') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post(JSON.stringify({'dag_ids': encoded_dag_ids}), lastDagRunsHandler); + .post("dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler); d3.json("{{ url_for('Airflow.dag_stats') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post(JSON.stringify({'dag_ids': encoded_dag_ids}), dagStatsHandler); + .post("dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler); d3.json("{{ url_for('Airflow.task_stats') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post(JSON.stringify({'dag_ids': encoded_dag_ids}), taskStatsHandler); + .post("dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler); } else { // no dags, hide the loading gifs diff --git a/airflow/www/views.py b/airflow/www/views.py index f40207aa417854..b1b22f2c8a7b36 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -342,9 +342,8 @@ def dag_stats(self, session=None): .group_by(dr.dag_id, dr.state) # Filter by post parameters - request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id + unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id } if selected_dag_ids: @@ -394,9 +393,8 @@ def task_stats(self, session=None): allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)} # Filter by post parameters - request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id + unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id } if selected_dag_ids: @@ -490,9 +488,8 @@ def last_dagruns(self, session=None): allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] # Filter by post parameters - request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id + unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id } if selected_dag_ids: @@ -1114,9 +1111,8 @@ def blocked(self, session=None): allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)] # Filter by post parameters - request_dag_ids = request.get_json() or {} selected_dag_ids = { - unquote(dag_id) for dag_id in request_dag_ids.get('dag_ids', []) if dag_id + unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id } if selected_dag_ids: diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 7edeb95817458b..9bbd456bd2ac45 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -512,7 +512,7 @@ def test_last_dagruns(self): def test_last_dagruns_success_when_selecting_dags(self): resp = self.client.post('last_dagruns', - json={'dag_ids': ['example_subdag_operator']}, + data={'dag_ids': ['example_subdag_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) @@ -521,7 +521,7 @@ def test_last_dagruns_success_when_selecting_dags(self): # Multiple resp = self.client.post('last_dagruns', - json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) @@ -1435,7 +1435,7 @@ def test_dag_stats_success_for_all_dag_user(self): def test_dag_stats_success_when_selecting_dags(self): resp = self.client.post('dag_stats', - json={'dag_ids': ['example_subdag_operator']}, + data={'dag_ids': ['example_subdag_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) @@ -1444,7 +1444,7 @@ def test_dag_stats_success_when_selecting_dags(self): # Multiple resp = self.client.post('dag_stats', - json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) @@ -1478,7 +1478,7 @@ def test_task_stats_success_when_selecting_dags(self): password='all_dag_user') resp = self.client.post('task_stats', - json={'dag_ids': ['example_subdag_operator']}, + data={'dag_ids': ['example_subdag_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) @@ -1487,7 +1487,7 @@ def test_task_stats_success_when_selecting_dags(self): # Multiple resp = self.client.post('task_stats', - json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) stats = json.loads(resp.data.decode('utf-8')) @@ -1672,7 +1672,7 @@ def test_blocked_success_for_all_dag_user(self): def test_blocked_success_when_selecting_dags(self): resp = self.client.post('blocked', - json={'dag_ids': ['example_subdag_operator']}, + data={'dag_ids': ['example_subdag_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))} @@ -1681,7 +1681,7 @@ def test_blocked_success_when_selecting_dags(self): # Multiple resp = self.client.post('blocked', - json={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, + data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']}, follow_redirects=True) self.assertEqual(resp.status_code, 200) blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))} From 88269604e6aed2c1cff6e108b112a91990a79705 Mon Sep 17 00:00:00 2001 From: Madison Bowden Date: Thu, 6 Feb 2020 09:23:44 -0800 Subject: [PATCH 4/4] [AIRFLOW-6728] Use UrlSearchParams object for DAG IDs --- airflow/www/templates/airflow/dags.html | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index dffc007f58b8ce..229f1ef9bafec4 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -281,11 +281,11 @@

DAGs

return false; } - var encoded_dag_ids = []; + var encoded_dag_ids = new URLSearchParams(); $.each($("[id^=toggle]"), function(i, v) { var dag_id = $(v).attr('dag_id'); - encoded_dag_ids.push(encodeURIComponent(dag_id)); + encoded_dag_ids.append('dag_ids', dag_id); $(v).change (function() { if ($(v).prop('checked')) { @@ -522,20 +522,20 @@

DAGs

}); } - if (encoded_dag_ids.length > 0) { + if (encoded_dag_ids.has('dag_ids')) { // dags on page fetch stats d3.json("{{ url_for('Airflow.blocked') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post("dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler); + .post(encoded_dag_ids, blockedHandler); d3.json("{{ url_for('Airflow.last_dagruns') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post("dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler); + .post(encoded_dag_ids, lastDagRunsHandler); d3.json("{{ url_for('Airflow.dag_stats') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post("dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler); + .post(encoded_dag_ids, dagStatsHandler); d3.json("{{ url_for('Airflow.task_stats') }}") .header("X-CSRFToken", "{{ csrf_token() }}") - .post("dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler); + .post(encoded_dag_ids, taskStatsHandler); } else { // no dags, hide the loading gifs