Skip to content

Commit

Permalink
[AIRFLOW-6239] Filter dags return by last_dagruns (#6804)
Browse files Browse the repository at this point in the history
Add dag_ids get parameter to last_dagruns endpoint so can filter by the
set of dag_ids present on the dags view. This is intended to speed up
the response time on systems running a large number of dags.

(cherry picked from commit bdcd7cd)
  • Loading branch information
robinedwards authored and ashb committed Dec 18, 2019
1 parent 541edc7 commit bbe425c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airflow/www_rbac/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ <h2>DAGs</h2>
}
});
});
d3.json("{{ url_for('Airflow.last_dagruns') }}", function(error, json) {
d3.json("{{ url_for('Airflow.last_dagruns') }}?dag_ids=" + (encoded_dag_ids.join(',')), function(error, json) {
for(var safe_dag_id in json) {
dag_id = json[safe_dag_id].dag_id;
last_run = json[safe_dag_id].last_run;
Expand Down
21 changes: 16 additions & 5 deletions airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,18 +435,29 @@ def task_stats(self, session=None):
def last_dagruns(self, session=None):
DagRun = models.DagRun

filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
allowed_dag_ids = appbuilder.sm.get_accessible_dag_ids()

if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
}

if selected_dag_ids:
filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
else:
filter_dag_ids = allowed_dag_ids

if not filter_dag_ids:
return
return wwwutils.json_response({})

query = session.query(
DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('last_run')
).group_by(DagRun.dag_id)

if 'all_dags' not in filter_dag_ids:
# Filter to only ask for accesible dags
query = query.filter(DagRun.dag_id.in_(filter_dag_ids.keys()))
# Filter to only ask for accessible and selected dags
query = query.filter(DagRun.dag_id.in_(filter_dag_ids))

resp = {
r.dag_id.replace('.', '__dot__'): {
Expand Down

0 comments on commit bbe425c

Please sign in to comment.