Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3518] Performance fixes for topological_sort #4322

Merged
merged 2 commits into from
Dec 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from __future__ import unicode_literals

import copy
from collections import defaultdict, namedtuple
from collections import defaultdict, namedtuple, OrderedDict

from builtins import ImportError as BuiltinImportError, bytes, object, str
from future.standard_library import install_aliases
Expand Down Expand Up @@ -2662,10 +2662,10 @@ def __init__(
}

def __eq__(self, other):
return (
type(self) == type(other) and
all(self.__dict__.get(c, None) == other.__dict__.get(c, None)
for c in self._comps))
if (type(self) == type(other) and
self.task_id == other.task_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to also check that self.dag_id == other.dag_id?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(And if we do we should probably add a test that checks that covers that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is implicitly well covered by the cycle detection tests in tests/models.py.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag1 = DAG('dag2')
task1 = MyOp(task_id='start', dag=dag1)

dag2 = DAG('dag1')
task2 = MyOp(task_id='start', dag=dag2)

Will task1 == task2? It sort of looks like form this diff only that it might.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So comparing the task_ids is just an optimization. If they differ, the attributes task_id, dag_id, owner, email, email_on_retry, retry_delay, retry_exponential_backoff, max_retry_delay, start_date, schedule_interval, depends_on_past, wait_for_downstream, adhoc, priority_weight, sla, execution_timeout, on_failure_callback, on_success_callback and on_retry_callback are compared to determine if the dags are really equal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry yes! If they don't match then we return false. Right

return all(self.__dict__.get(c, None) == other.__dict__.get(c, None) for c in self._comps)
return False

def __ne__(self, other):
return not self == other
Expand Down Expand Up @@ -3443,12 +3443,13 @@ def __repr__(self):
return "<DAG: {self.dag_id}>".format(self=self)

def __eq__(self, other):
return (
type(self) == type(other) and
if (type(self) == type(other) and
self.dag_id == other.dag_id):

# Use getattr() instead of __dict__ as __dict__ doesn't return
# correct values for properties.
all(getattr(self, c, None) == getattr(other, c, None)
for c in self._comps))
return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps)
return False

def __ne__(self, other):
return not self == other
Expand Down Expand Up @@ -3904,8 +3905,8 @@ def topological_sort(self):
:return: list of tasks in topological order
"""

# copy the the tasks so we leave it unmodified
graph_unsorted = self.tasks[:]
# convert into an OrderedDict to speedup lookup while keeping order the same
graph_unsorted = OrderedDict((task.task_id, task) for task in self.tasks)

graph_sorted = []

Expand All @@ -3928,14 +3929,14 @@ def topological_sort(self):
# not, we need to bail out as the graph therefore can't be
# sorted.
acyclic = False
for node in list(graph_unsorted):
for node in list(graph_unsorted.values()):
for edge in node.upstream_list:
if edge in graph_unsorted:
if edge.task_id in graph_unsorted:
break
# no edges in upstream tasks
else:
acyclic = True
graph_unsorted.remove(node)
del graph_unsorted[node.task_id]
graph_sorted.append(node)

if not acyclic:
Expand Down