Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6871] optimize tree view for large DAGs #7492

Merged
merged 1 commit into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 86 additions & 14 deletions airflow/www/templates/airflow/tree.html
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,59 @@
<script>
$('span.status_square').tooltip({html: true});

function ts_to_dtstr(ts) {
var dt = new Date(ts * 1000);
return dt.toISOString();
}

function is_dag_run(d) {
return d.run_id !== undefined;
}

var now_ts = Date.now()/1000;

function populate_taskinstance_properties(node) {
// populate task instance properties for display purpose
var j;
for (j=0; j<node.instances.length; j++) {
var dr_instance = data.instances[j];
var row = node.instances[j];

if (row === null) {
node.instances[j] = {
task_id: node.name,
execution_date: dr_instance.execution_date,
};
continue;
}

var task_instance = {
state: row[0],
try_number: row[1],
start_ts: row[2],
duration: row[3],
};
node.instances[j] = task_instance;

task_instance.task_id = node.name;
task_instance.execution_date = dr_instance.execution_date;
task_instance.external_trigger = dr_instance.external_trigger;

// compute start_date and end_date if applicable
if (task_instance.start_ts !== null) {
task_instance.start_date = ts_to_dtstr(task_instance.start_ts);
if (task_instance.state === "running") {
task_instance.duration = now_ts - task_instance.start_ts;
} else if (task_instance.duration !== null) {
task_instance.end_date = ts_to_dtstr(task_instance.start_ts + task_instance.duration);
}
}
}
}

var devicePixelRatio = window.devicePixelRatio || 1;
var data = {{ data|tojson|safe }};
// JSON.parse is faster for large payloads than an object literal (because the JSON grammer is simpler!)
var data = JSON.parse('{{ data }}');
var barHeight = 20;
var axisHeight = 40;
var square_x = parseInt(500 * devicePixelRatio);
Expand All @@ -104,8 +155,25 @@
var nodes = tree.nodes(data);
var nodeobj = {};
for (i=0; i<nodes.length; i++) {
node = nodes[i];
nodeobj[node.name] = node;
var node = nodes[i];
nodeobj[node.name] = node;

if (node.name === "[DAG]") {
// skip synthetic root node since it's doesn't contain actual task instances
continue;
}

if (node.start_ts !== undefined) {
node.start_date = ts_to_dtstr(node.start_ts);
}
if (node.end_ts !== undefined) {
node.end_date = ts_to_dtstr(node.end_ts);
}
if (node.depends_on_past === undefined) {
node.depends_on_past = false;
}

populate_taskinstance_properties(node);
}

var diagonal = d3.svg.diagonal()
Expand Down Expand Up @@ -214,6 +282,7 @@
if (d.operator != undefined) {
tt += "operator: " + escapeHtml(d.operator) + "<br/>";
}

tt += "depends_on_past: " + escapeHtml(d.depends_on_past) + "<br/>";
tt += "upstream: " + escapeHtml(d.num_dep) + "<br/>";
tt += "retries: " + escapeHtml(d.retries) + "<br/>";
Expand All @@ -229,7 +298,7 @@
.on('mouseout', function(d, i) {
taskTip.hide(d)
d3.select(this).transition()
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.style("stroke-width", function(d) {return is_dag_run(d)? "2": "1"})
})
.attr("height", barHeight)
.attr("width", function(d, i) {return barWidth - d.y;})
Expand Down Expand Up @@ -263,26 +332,29 @@
})
.attr("class", function(d) {return "state " + d.state})
.attr("data-toggle", "tooltip")
.attr("rx", function(d) {return (d.run_id != undefined)? "5": "0"})
.attr("ry", function(d) {return (d.run_id != undefined)? "5": "0"})
.style("shape-rendering", function(d) {return (d.run_id != undefined)? "auto": "crispEdges"})
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.attr("rx", function(d) {return is_dag_run(d)? "5": "0"})
.attr("ry", function(d) {return is_dag_run(d)? "5": "0"})
.style("shape-rendering", function(d) {return is_dag_run(d)? "auto": "crispEdges"})
.style("stroke-width", function(d) {return is_dag_run(d)? "2": "1"})
.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
.on("mouseover", function(d){
var s = "";
if (d.task_id != undefined ) {
s += "Task_id: " + escapeHtml(d.task_id) + "<br>";
}
s += "Run: " + escapeHtml(d.execution_date) + "<br>";
if(d.run_id != undefined){
if(is_dag_run(d)){
s += "run_id: <nobr>" + escapeHtml(d.run_id) + "</nobr><br>";
}
if (d.operator != undefined) {
s += "Operator: " + escapeHtml(d.operator) + "<br>"
}

var task = nodeobj[d.task_id];

s += "Operator: " + escapeHtml(task.operator) + "<br>"
if(d.start_date != undefined){
s += "Started: " + escapeHtml(d.start_date) + "<br>";
s += "Ended: " + escapeHtml(d.end_date) + "<br>";
if(d.end_date != undefined){
s += "Ended: " + escapeHtml(d.end_date) + "<br>";
}
if (d.duration != undefined) {
s += "Duration: " + escapeHtml(convertSecsToHumanReadable(d.duration)) + "<br>";
}
Expand All @@ -296,7 +368,7 @@
.on('mouseout', function(d,i) {
taskTip.hide(d)
d3.select(this).transition()
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.style("stroke-width", function(d) {return is_dag_run(d)? "2": "1"})
})
.attr('x', function(d, i) {return (i*(square_size+square_spacing));})
.attr('y', -square_size/2)
Expand Down
126 changes: 77 additions & 49 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import socket
import traceback
from collections import defaultdict
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from urllib.parse import quote, unquote

import lazy_object_proxy
Expand All @@ -41,6 +42,7 @@
from flask_appbuilder.actions import action
from flask_appbuilder.models.sqla.filters import BaseFilter
from flask_babel import lazy_gettext
from jinja2.utils import htmlsafe_json_dumps # type: ignore
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import and_, desc, func, or_, union_all
Expand Down Expand Up @@ -139,7 +141,6 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
######################################################################################
# BaseViews
######################################################################################

@app.errorhandler(404)
def circles(error):
return render_template(
Expand Down Expand Up @@ -1336,7 +1337,6 @@ def success(self):
@gzipped
@action_logging
def tree(self):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = dagbag.get_dag(dag_id)
Expand All @@ -1353,7 +1353,10 @@ def tree(self):

base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
if num_runs:
num_runs = int(num_runs)
else:
num_runs = conf.getint('webserver', 'default_dag_run_display_number')

if base_date:
base_date = timezone.parse(base_date)
Expand All @@ -1371,90 +1374,115 @@ def tree(self):
.all()
)
dag_runs = {
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs
}

dates = sorted(list(dag_runs.keys()))
max_date = max(dates) if dates else None
min_date = min(dates) if dates else None

tis = dag.get_task_instances(start_date=min_date, end_date=base_date)
task_instances = {}
task_instances: Dict[Tuple[str, datetime], models.TaskInstance] = {}
for ti in tis:
tid = alchemy_to_dict(ti)
dr = dag_runs.get(ti.execution_date)
tid['external_trigger'] = dr['external_trigger'] if dr else False
task_instances[(ti.task_id, ti.execution_date)] = tid
task_instances[(ti.task_id, ti.execution_date)] = ti

expanded = []
expanded = set()
# The default recursion traces every path so that tree view has full
# expand/collapse functionality. After 5,000 nodes we stop and fall
# back on a quick DFS search for performance. See PR #320.
node_count = [0]
node_count = 0
node_limit = 5000 / max(1, len(dag.leaves))

def encode_ti(ti: Optional[models.TaskInstance]) -> Optional[List]:
if not ti:
return None

# NOTE: order of entry is important here because client JS relies on it for
# tree node reconstruction. Remember to change JS code in tree.html
# whenever order is altered.
data = [
ti.state,
ti.try_number,
None, # start_ts
None, # duration
]

if ti.start_date:
# round to seconds to reduce payload size
data[2] = int(ti.start_date.timestamp())
if ti.duration is not None:
data[3] = int(ti.duration)

return data

def recurse_nodes(task, visited):
nonlocal node_count
node_count += 1
visited.add(task)
node_count[0] += 1

children = [
recurse_nodes(t, visited) for t in task.downstream_list
if node_count[0] < node_limit or t not in visited]

# D3 tree uses children vs _children to define what is
# expanded or not. The following block makes it such that
# repeated nodes are collapsed by default.
children_key = 'children'
if task.task_id not in expanded:
expanded.append(task.task_id)
elif children:
children_key = "_children"

def set_duration(tid):
if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
tid["start_date"] is not None):
d = timezone.utcnow() - timezone.parse(tid["start_date"])
tid["duration"] = d.total_seconds()
return tid

return {
task_id = task.task_id

node = {
'name': task.task_id,
'instances': [
set_duration(task_instances.get((task.task_id, d))) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
for d in dates],
children_key: children,
encode_ti(task_instances.get((task_id, d)))
for d in dates
],
'num_dep': len(task.downstream_list),
'operator': task.task_type,
'retries': task.retries,
'owner': task.owner,
'start_date': task.start_date,
'end_date': task.end_date,
'depends_on_past': task.depends_on_past,
'ui_color': task.ui_color,
'extra_links': task.extra_links,
}

if task.downstream_list:
children = [
recurse_nodes(t, visited) for t in task.downstream_list
if node_count < node_limit or t not in visited]

# D3 tree uses children vs _children to define what is
# expanded or not. The following block makes it such that
# repeated nodes are collapsed by default.
if task.task_id not in expanded:
children_key = 'children'
ashb marked this conversation as resolved.
Show resolved Hide resolved
expanded.add(task.task_id)
else:
children_key = "_children"
node[children_key] = children

if task.depends_on_past:
node['depends_on_past'] = task.depends_on_past
if task.start_date:
# round to seconds to reduce payload size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much does it reduce it by? Is stripping of all ms noticable? (Could we perhaps limit to 3 or 6 sig. fig.?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not much for task node since we don't have too many of them, that's why i didn't add the rounding in the first place here. It did make a big difference for task instance node since we have lots of them, IIRC, probably around 10-20% size reduction.

I can change it to round to 3 sig. fig. everywhere to see what the performance implication would be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb round to 3 sig. fig. increases the overall payload size by 15%. The question now is do we care about millisecond accuracy for task start/end time enough to take this 15% performance hit?

So far, I found second granularity has been good enough for us, but I might be missing other use-cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not needed.

Is it worth making it a config option do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path has a very hot function call loop that's very sensitive to if statements. For the large DAG that we have, adding one extra if statement increases the response time by more than 400ms. That's why simplify reduce_nodes() logic, remove unnecessary if statements is in the optimization list :)

That and based on the understanding that we are rewriting Airflow web into a proper SPA, I think it's best not to introduce a config for this change. I would prefer us giving round to second a try and come back to add more sig. fig. or add a config later on if any real use-case comes up. It's better to not engineer solutions when we don't have a good use-case in mind.

If you are really concerned about the precision, we can perhaps change it to round to 1 sig. fig. for a 7% performance hit. I can't really think of a case where knowing 0.01 second difference of runtime is important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good reasoning. I'll copy some/most of this in to the commit message for future-proofing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this unresolved as a reminder to myself.

node['start_ts'] = int(task.start_date.timestamp())
if task.end_date:
# round to seconds to reduce payload size
node['end_ts'] = int(task.end_date.timestamp())
if task.extra_links:
node['extra_links'] = task.extra_links
return node

data = {
'name': '[DAG]',
'children': [recurse_nodes(t, set()) for t in dag.roots],
'instances': [
dag_runs.get(d) or {'execution_date': d.isoformat()}
for d in dates],
for d in dates
],
}

session.commit()

form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
external_logs = conf.get('elasticsearch', 'frontend')

return self.render_template(
'airflow/tree.html',
operators=sorted({op.task_type: op for op in dag.tasks}.values(), key=lambda x: x.task_type),
root=root,
form=form,
dag=dag, data=data, blur=blur, num_runs=num_runs,
dag=dag,
# avoid spaces to reduce payload size
data=htmlsafe_json_dumps(data, separators=(',', ':')),
blur=blur, num_runs=num_runs,
show_external_logs=bool(external_logs))

@expose('/graph')
Expand Down