Skip to content

Commit

Permalink
new topo sort logic
Browse files Browse the repository at this point in the history
  • Loading branch information
iknox-fa committed Jul 1, 2021
1 parent 7525695 commit fef1291
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
66 changes: 59 additions & 7 deletions core/dbt/graph/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import threading

from queue import PriorityQueue
from typing import (
Set, Optional
)
from typing import Dict, Set, List, Generator, Optional

from .graph import UniqueId
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure
Expand All @@ -21,9 +19,8 @@ class GraphQueue:
that separate threads do not call `.empty()` or `__len__()` and `.get()` at
the same time, as there is an unlocked race!
"""
def __init__(
self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]
):

def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]):
self.graph = graph
self.manifest = manifest
self._selected = selected
Expand All @@ -38,7 +35,7 @@ def __init__(
self.lock = threading.Lock()
# store the 'score' of each node as a number. Lower is higher priority.
# TODO: incorporate _include_in_cost (or remove dead code, still needed?)
self._scores = {y: x for x, y in enumerate(nx.topological_sort(self.graph))}
self._scores = self._get_scores(self.graph)
# populate the initial queue
self._find_new_additions()
# awaits after task end
Expand All @@ -57,6 +54,61 @@ def _include_in_cost(self, node_id: UniqueId) -> bool:
return False
return True

@staticmethod
def _grouped_topological_sort(
graph: nx.DiGraph,
) -> Generator[List[str], None, None]:
"""Topological sort of given graph that groups ties.
Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however
instead of arbitrarily ordering ties in the sort order, ties are grouped together in
lists.
Args:
graph: The graph to be sorted.
Returns:
A generator that yields lists of nodes, one list per graph depth level.
"""
indegree_map = {v: d for v, d in graph.in_degree() if d > 0}
zero_indegree = [v for v, d in graph.in_degree() if d == 0]

while zero_indegree:
yield zero_indegree
new_zero_indegree = []
for v in zero_indegree:
for _, child in graph.edges(v):
indegree_map[child] -= 1
if not indegree_map[child]:
new_zero_indegree.append(child)
zero_indegree = new_zero_indegree

def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]:
"""Scoring nodes for processing order.
Scores are calculated by the graph depth level. Lowest score (0) should be processed first.
Args:
graph: The graph to be scored.
Returns:
A dictionary consisting of `node name`:`score` pairs.
"""
# split graph by connected subgraphs
subgraphs = (
graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph))
)

# score all nodes in all subgraphs
scores = {}
for subgraph in subgraphs:
grouped_nodes = self._grouped_topological_sort(subgraph)
for level, group in enumerate(grouped_nodes):
for node in group:
scores[node] = level

return scores

def get(
self, block: bool = True, timeout: Optional[float] = None
) -> GraphMemberNode:
Expand Down
1 change: 0 additions & 1 deletion test/unit/test_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def test_linker_add_dependency(self):
self.assert_would_join(queue)
self.assertTrue(queue.empty())

@pytest.mark.skip('TODO: determine if needed (in theory we never have disjoint graphs)')
def test_linker_add_disjoint_dependencies(self):
actual_deps = [('A', 'B')]
additional_node = 'Z'
Expand Down

0 comments on commit fef1291

Please sign in to comment.