Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6728] Change various DAG info methods to POST #7364

Merged
merged 4 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,18 @@ <h2>DAGs</h2>

if (encoded_dag_ids.length > 0) {
// dags on page fetch stats
d3.json("{{ url_for('Airflow.blocked') }}?dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}?dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler);
d3.json("{{ url_for('Airflow.blocked') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post("dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post("dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post("dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}")
.header("X-CSRFToken", "{{ csrf_token() }}")
.post("dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? I would expect that this would post the encoded as a single parameter..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't have to handle the encoding and building of forms ourselves:

var p = new URLSearchParams();
p.append("dag_id", "not+encoded.here");
p.append("dag_id", "second dag/id");
d3.json("/a").post(p)

And that sends this request:

POST /a HTTP/1.1
Host: 0.0.0.0:8080
User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:72.0) Gecko/20100101 Firefox/72.0
Accept: application/json,*/*
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Content-Type: application/x-www-form-urlencoded;charset=UTF-8
Content-Length: 48
Origin: http://0.0.0.0:8080
Connection: keep-alive
Referer: http://0.0.0.0:8080/home

dag_id=not%2Bencoded.here&dag_id=second+dag%2Fid

Could you make this change @madison-ookla ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I've tested this locally and it works 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready for you now @ashb @mik-laj 😁

}
else {
// no dags, hide the loading gifs
Expand Down
22 changes: 12 additions & 10 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def get_int_arg(value, default=0):
num_runs=num_runs,
tags=tags)

@expose('/dag_stats')
@expose('/dag_stats', methods=['POST'])
@has_access
@provide_session
def dag_stats(self, session=None):
Expand All @@ -341,9 +341,9 @@ def dag_stats(self, session=None):
dag_state_stats = session.query(dr.dag_id, dr.state, sqla.func.count(dr.state))\
.group_by(dr.dag_id, dr.state)

# Filter by get parameters
# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -376,7 +376,7 @@ def dag_stats(self, session=None):

return wwwutils.json_response(payload)

@expose('/task_stats')
@expose('/task_stats', methods=['POST'])
@has_access
@provide_session
def task_stats(self, session=None):
Expand All @@ -392,9 +392,9 @@ def task_stats(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = {dag_id for dag_id, in session.query(models.DagModel.dag_id)}

# Filter by get parameters
# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -476,7 +476,7 @@ def task_stats(self, session=None):
})
return wwwutils.json_response(payload)

@expose('/last_dagruns')
@expose('/last_dagruns', methods=['POST'])
@has_access
@provide_session
def last_dagruns(self, session=None):
Expand All @@ -487,8 +487,9 @@ def last_dagruns(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down Expand Up @@ -1100,7 +1101,7 @@ def dagrun_clear(self):
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=True, confirmed=confirmed)

@expose('/blocked')
@expose('/blocked', methods=['POST'])
@has_access
@provide_session
def blocked(self, session=None):
Expand All @@ -1109,8 +1110,9 @@ def blocked(self, session=None):
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

# Filter by post parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
unquote(dag_id) for dag_id in request.form.getlist('dag_ids') if dag_id
}

if selected_dag_ids:
Expand Down
62 changes: 37 additions & 25 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,20 +475,20 @@ def test_rendered(self):

def test_blocked(self):
url = 'blocked'
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.assertEqual(200, resp.status_code)

def test_dag_stats(self):
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_task_stats(self):
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_task_stats_only_noncompleted(self):
conf.set("webserver", "show_recent_stats_for_completed_runs", "False")
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.assertEqual(resp.status_code, 200)

def test_dag_details(self):
Expand All @@ -507,19 +507,22 @@ def test_graph(self):
self.check_content_in_response('runme_1', resp)

def test_last_dagruns(self):
resp = self.client.get('last_dagruns', follow_redirects=True)
resp = self.client.post('last_dagruns', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_last_dagruns_success_when_selecting_dags(self):
resp = self.client.get('last_dagruns?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('last_dagruns',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('last_dagruns?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('last_dagruns',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand Down Expand Up @@ -1413,33 +1416,36 @@ def test_index_for_all_dag_user(self):
def test_dag_stats_success(self):
self.logout()
self.login()
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_dag_stats_failure(self):
self.logout()
self.login()
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_not_in_response('example_subdag_operator', resp)

def test_dag_stats_success_for_all_dag_user(self):
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get('dag_stats', follow_redirects=True)
resp = self.client.post('dag_stats', follow_redirects=True)
self.check_content_in_response('example_subdag_operator', resp)
self.check_content_in_response('example_bash_operator', resp)

def test_dag_stats_success_when_selecting_dags(self):
resp = self.client.get('dag_stats?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('dag_stats',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('dag_stats?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('dag_stats',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand All @@ -1449,20 +1455,20 @@ def test_dag_stats_success_when_selecting_dags(self):
def test_task_stats_success(self):
self.logout()
self.login()
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_task_stats_failure(self):
self.logout()
self.login()
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_not_in_response('example_subdag_operator', resp)

def test_task_stats_success_for_all_dag_user(self):
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get('task_stats', follow_redirects=True)
resp = self.client.post('task_stats', follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)
self.check_content_in_response('example_subdag_operator', resp)

Expand All @@ -1471,15 +1477,18 @@ def test_task_stats_success_when_selecting_dags(self):
self.login(username='all_dag_user',
password='all_dag_user')

resp = self.client.get('task_stats?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('task_stats',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertNotIn('example_bash_operator', stats)
self.assertIn('example_subdag_operator', stats)

# Multiple
resp = self.client.get('task_stats?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('task_stats',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
stats = json.loads(resp.data.decode('utf-8'))
self.assertIn('example_bash_operator', stats)
Expand Down Expand Up @@ -1649,28 +1658,31 @@ def test_blocked_success(self):
url = 'blocked'
self.logout()
self.login()
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)

def test_blocked_success_for_all_dag_user(self):
url = 'blocked'
self.logout()
self.login(username='all_dag_user',
password='all_dag_user')
resp = self.client.get(url, follow_redirects=True)
resp = self.client.post(url, follow_redirects=True)
self.check_content_in_response('example_bash_operator', resp)
self.check_content_in_response('example_subdag_operator', resp)

def test_blocked_success_when_selecting_dags(self):
resp = self.client.get('blocked?dag_ids=example_subdag_operator', follow_redirects=True)
resp = self.client.post('blocked',
data={'dag_ids': ['example_subdag_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))}
self.assertNotIn('example_bash_operator', blocked_dags)
self.assertIn('example_subdag_operator', blocked_dags)

# Multiple
resp = self.client.get('blocked?dag_ids=example_subdag_operator,example_bash_operator',
follow_redirects=True)
resp = self.client.post('blocked',
data={'dag_ids': ['example_subdag_operator', 'example_bash_operator']},
follow_redirects=True)
self.assertEqual(resp.status_code, 200)
blocked_dags = {blocked['dag_id'] for blocked in json.loads(resp.data.decode('utf-8'))}
self.assertIn('example_bash_operator', blocked_dags)
Expand Down