Skip to content

Commit

Permalink
[AIRFLOW-6871] optimize tree view for large DAGs (#7492)
Browse files Browse the repository at this point in the history
This change reduces page size by more than 10X and
reduces page load time by 3-5X. As a result, the
tree view can now load large DAGs that were causing
5XX error without the patch.

List of optimizations applied to the view handler:
* only seralize used task instance attributes to json instead of the
  whole ORM object
* encode task instance attributes as array instead of dict
* encode datetime in unix timestamp instead of iso formmat string
* push task instance attribute construction into client side JS
* remove redundant task instance attributes
* simplify reduce_nodes() logic, remove unnecessary if statements
* seralize JSON as string to be used with JSON.parse on the client side
  to speed up browser JS parse time
* remove spaces in seralized JSON string to reduce payload size

Co-Authored-By: QP Hou <[email protected]>

(cherry-picked from c1c2d6a)
  • Loading branch information
kaxil committed Apr 2, 2020
1 parent 2981dac commit 0c34e6b
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 59 deletions.
100 changes: 86 additions & 14 deletions airflow/www_rbac/templates/airflow/tree.html
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,59 @@
<script>
$('span.status_square').tooltip({html: true});

function ts_to_dtstr(ts) {
var dt = new Date(ts * 1000);
return dt.toISOString();
}

function is_dag_run(d) {
return d.run_id !== undefined;
}

var now_ts = Date.now()/1000;

function populate_taskinstance_properties(node) {
// populate task instance properties for display purpose
var j;
for (j=0; j<node.instances.length; j++) {
var dr_instance = data.instances[j];
var row = node.instances[j];

if (row === null) {
node.instances[j] = {
task_id: node.name,
execution_date: dr_instance.execution_date,
};
continue;
}

var task_instance = {
state: row[0],
try_number: row[1],
start_ts: row[2],
duration: row[3],
};
node.instances[j] = task_instance;

task_instance.task_id = node.name;
task_instance.execution_date = dr_instance.execution_date;
task_instance.external_trigger = dr_instance.external_trigger;

// compute start_date and end_date if applicable
if (task_instance.start_ts !== null) {
task_instance.start_date = ts_to_dtstr(task_instance.start_ts);
if (task_instance.state === "running") {
task_instance.duration = now_ts - task_instance.start_ts;
} else if (task_instance.duration !== null) {
task_instance.end_date = ts_to_dtstr(task_instance.start_ts + task_instance.duration);
}
}
}
}

var devicePixelRatio = window.devicePixelRatio || 1;
var data = {{ data|tojson|safe }};
// JSON.parse is faster for large payloads than an object literal (because the JSON grammer is simpler!)
var data = JSON.parse('{{ data }}');
var barHeight = 20;
var axisHeight = 40;
var square_x = parseInt(500 * devicePixelRatio);
Expand All @@ -103,8 +154,25 @@
var nodes = tree.nodes(data);
var nodeobj = {};
for (i=0; i<nodes.length; i++) {
node = nodes[i];
nodeobj[node.name] = node;
var node = nodes[i];
nodeobj[node.name] = node;

if (node.name === "[DAG]") {
// skip synthetic root node since it's doesn't contain actual task instances
continue;
}

if (node.start_ts !== undefined) {
node.start_date = ts_to_dtstr(node.start_ts);
}
if (node.end_ts !== undefined) {
node.end_date = ts_to_dtstr(node.end_ts);
}
if (node.depends_on_past === undefined) {
node.depends_on_past = false;
}

populate_taskinstance_properties(node);
}

var diagonal = d3.svg.diagonal()
Expand Down Expand Up @@ -213,6 +281,7 @@
if (d.operator != undefined) {
tt += "operator: " + escapeHtml(d.operator) + "<br/>";
}

tt += "depends_on_past: " + escapeHtml(d.depends_on_past) + "<br/>";
tt += "upstream: " + escapeHtml(d.num_dep) + "<br/>";
tt += "retries: " + escapeHtml(d.retries) + "<br/>";
Expand All @@ -228,7 +297,7 @@
.on('mouseout', function(d, i) {
taskTip.hide(d)
d3.select(this).transition()
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.style("stroke-width", function(d) {return is_dag_run(d)? "2": "1"})
})
.attr("height", barHeight)
.attr("width", function(d, i) {return barWidth - d.y;})
Expand Down Expand Up @@ -262,26 +331,29 @@
})
.attr("class", function(d) {return "state " + d.state})
.attr("data-toggle", "tooltip")
.attr("rx", function(d) {return (d.run_id != undefined)? "5": "0"})
.attr("ry", function(d) {return (d.run_id != undefined)? "5": "0"})
.style("shape-rendering", function(d) {return (d.run_id != undefined)? "auto": "crispEdges"})
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.attr("rx", function(d) {return is_dag_run(d)? "5": "0"})
.attr("ry", function(d) {return is_dag_run(d)? "5": "0"})
.style("shape-rendering", function(d) {return is_dag_run(d)? "auto": "crispEdges"})
.style("stroke-width", function(d) {return is_dag_run(d)? "2": "1"})
.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
.on("mouseover", function(d){
var s = "";
if (d.task_id != undefined) {
s += "Task_id: " + escapeHtml(d.task_id) + "<br>";
}
s += "Run: " + escapeHtml(d.execution_date) + "<br>";
if(d.run_id != undefined){
if(is_dag_run(d)){
s += "run_id: <nobr>" + escapeHtml(d.run_id) + "</nobr><br>";
}
if (d.operator != undefined) {
s += "Operator: " + escapeHtml(d.operator) + "<br>"
}

var task = nodeobj[d.task_id];

s += "Operator: " + escapeHtml(task.operator) + "<br>"
if(d.start_date != undefined){
s += "Started: " + escapeHtml(d.start_date) + "<br>";
s += "Ended: " + escapeHtml(d.end_date) + "<br>";
if(d.end_date != undefined){
s += "Ended: " + escapeHtml(d.end_date) + "<br>";
}
if (d.duration != undefined) {
s += "Duration: " + escapeHtml(convertSecsToHumanReadable(d.duration)) + "<br>";
}
Expand All @@ -295,7 +367,7 @@
.on('mouseout', function(d,i) {
taskTip.hide(d)
d3.select(this).transition()
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.style("stroke-width", function(d) {return is_dag_run(d)? "2": "1"})
})
.attr('x', function(d, i) {return (i*(square_size+square_spacing));})
.attr('y', -square_size/2)
Expand Down
126 changes: 81 additions & 45 deletions airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from datetime import timedelta
from urllib.parse import unquote

import six
from six.moves.urllib.parse import quote

import markdown
Expand All @@ -45,6 +46,7 @@
from flask_appbuilder.models.sqla.filters import BaseFilter
from flask_babel import lazy_gettext
import lazy_object_proxy
from jinja2.utils import htmlsafe_json_dumps # type: ignore
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import and_, desc, func, or_, union_all
Expand Down Expand Up @@ -140,7 +142,6 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
######################################################################################
# BaseViews
######################################################################################

@app.errorhandler(404)
def circles(error):
return render_template(
Expand Down Expand Up @@ -1361,7 +1362,6 @@ def success(self):
@action_logging
@provide_session
def tree(self, session=None):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = dagbag.get_dag(dag_id)
Expand All @@ -1378,7 +1378,10 @@ def tree(self, session=None):

base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
if num_runs:
num_runs = int(num_runs)
else:
num_runs = conf.getint('webserver', 'default_dag_run_display_number')

if base_date:
base_date = timezone.parse(base_date)
Expand All @@ -1396,7 +1399,8 @@ def tree(self, session=None):
.all()
)
dag_runs = {
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs
}

dates = sorted(list(dag_runs.keys()))
max_date = max(dates) if dates else None
Expand All @@ -1406,82 +1410,114 @@ def tree(self, session=None):
start_date=min_date, end_date=base_date, session=session)
task_instances = {}
for ti in tis:
tid = alchemy_to_dict(ti)
dr = dag_runs.get(ti.execution_date)
tid['external_trigger'] = dr['external_trigger'] if dr else False
task_instances[(ti.task_id, ti.execution_date)] = tid
task_instances[(ti.task_id, ti.execution_date)] = ti

expanded = []
expanded = set()
# The default recursion traces every path so that tree view has full
# expand/collapse functionality. After 5,000 nodes we stop and fall
# back on a quick DFS search for performance. See PR #320.
node_count = [0]
node_limit = 5000 / max(1, len(dag.leaves))

def encode_ti(ti):
if not ti:
return None

# NOTE: order of entry is important here because client JS relies on it for
# tree node reconstruction. Remember to change JS code in tree.html
# whenever order is altered.
data = [
ti.state,
ti.try_number,
None, # start_ts
None, # duration
]

if ti.start_date:
# round to seconds to reduce payload size
if six.PY2:
data[2] = int(pendulum.instance(ti.start_date).timestamp())
else:
data[2] = int(ti.start_date.timestamp())
if ti.duration is not None:
data[3] = int(ti.duration)

return data

def recurse_nodes(task, visited):
visited.add(task)
node_count[0] += 1
visited.add(task)
task_id = task.task_id

children = [
recurse_nodes(t, visited) for t in task.downstream_list
if node_count[0] < node_limit or t not in visited]

# D3 tree uses children vs _children to define what is
# expanded or not. The following block makes it such that
# repeated nodes are collapsed by default.
children_key = 'children'
if task.task_id not in expanded:
expanded.append(task.task_id)
elif children:
children_key = "_children"

def set_duration(tid):
if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
tid["start_date"] is not None):
d = timezone.utcnow() - pendulum.parse(tid["start_date"])
tid["duration"] = d.total_seconds()
return tid

return {
node = {
'name': task.task_id,
'instances': [
set_duration(task_instances.get((task.task_id, d))) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
for d in dates],
children_key: children,
encode_ti(task_instances.get((task_id, d)))
for d in dates
],
'num_dep': len(task.downstream_list),
'operator': task.task_type,
'retries': task.retries,
'owner': task.owner,
'start_date': task.start_date,
'end_date': task.end_date,
'depends_on_past': task.depends_on_past,
'ui_color': task.ui_color,
'extra_links': task.extra_links,
}

if task.downstream_list:
children = [
recurse_nodes(t, visited) for t in task.downstream_list
if node_count[0] < node_limit or t not in visited]

# D3 tree uses children vs _children to define what is
# expanded or not. The following block makes it such that
# repeated nodes are collapsed by default.
if task.task_id not in expanded:
children_key = 'children'
expanded.add(task.task_id)
else:
children_key = "_children"
node[children_key] = children

if task.depends_on_past:
node['depends_on_past'] = task.depends_on_past
if task.start_date:
# round to seconds to reduce payload size
if six.PY2:
node['start_ts'] = int(pendulum.instance(task.start_date).timestamp())
else:
node['start_ts'] = int(task.start_date.timestamp())
if task.end_date:
# round to seconds to reduce payload size
if six.PY2:
node['end_ts'] = int(pendulum.instance(task.end_date).timestamp())
else:
node['end_ts'] = int(task.end_date.timestamp())
if task.extra_links:
node['extra_links'] = task.extra_links
return node

data = {
'name': '[DAG]',
'children': [recurse_nodes(t, set()) for t in dag.roots],
'instances': [
dag_runs.get(d) or {'execution_date': d.isoformat()}
for d in dates],
for d in dates
],
}

session.commit()

form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
external_logs = conf.get('elasticsearch', 'frontend')

return self.render_template(
'airflow/tree.html',
operators=sorted({op.task_type: op for op in dag.tasks}.values(),
key=lambda x: x.task_type),
root=root,
form=form,
dag=dag, data=data, blur=blur, num_runs=num_runs,
dag=dag,
# avoid spaces to reduce payload size
data=htmlsafe_json_dumps(data, separators=(',', ':')),
blur=blur, num_runs=num_runs,
show_external_logs=bool(external_logs))

@expose('/graph')
Expand Down

0 comments on commit 0c34e6b

Please sign in to comment.