Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt build #3490

Merged
merged 6 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## dbt 0.21.0

### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490))

### Fixes
- Fix docs generation for cross-db sources in REDSHIFT RA3 node ([#3236](https://github.com/fishtown-analytics/dbt/issues/3236), [#3408](https://github.com/fishtown-analytics/dbt/pull/3408))
Expand Down
7 changes: 6 additions & 1 deletion core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ class ParsedSourceDefinition(
UnparsedBaseNode,
HasUniqueID,
HasRelationMetadata,
HasFqn
HasFqn,
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved

):
name: str
source_name: str
Expand Down Expand Up @@ -689,6 +690,10 @@ def is_ephemeral_model(self):
def depends_on_nodes(self):
return []

@property
def depends_on(self):
return {'nodes': []}

@property
def refs(self):
return []
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TestStatus(StrEnum):
Error = NodeStatus.Error
Fail = NodeStatus.Fail
Warn = NodeStatus.Warn
Skipped = NodeStatus.Skipped


class FreshnessStatus(StrEnum):
Expand Down
82 changes: 53 additions & 29 deletions core/dbt/graph/queue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import networkx as nx # type: ignore
import threading
from queue import PriorityQueue
from typing import (
Dict, Set, Optional
)

import networkx as nx # type: ignore
from queue import PriorityQueue
from typing import Dict, Set, List, Generator, Optional

from .graph import UniqueId
from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure
Expand All @@ -21,9 +19,8 @@ class GraphQueue:
that separate threads do not call `.empty()` or `__len__()` and `.get()` at
the same time, as there is an unlocked race!
"""
def __init__(
self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]
):

def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]):
self.graph = graph
self.manifest = manifest
self._selected = selected
Expand All @@ -37,7 +34,7 @@ def __init__(
# this lock controls most things
self.lock = threading.Lock()
# store the 'score' of each node as a number. Lower is higher priority.
self._scores = self._calculate_scores()
self._scores = self._get_scores(self.graph)
# populate the initial queue
self._find_new_additions()
# awaits after task end
Expand All @@ -56,30 +53,59 @@ def _include_in_cost(self, node_id: UniqueId) -> bool:
return False
return True

def _calculate_scores(self) -> Dict[UniqueId, int]:
"""Calculate the 'value' of each node in the graph based on how many
blocking descendants it has. We use this score for the internal
priority queue's ordering, so the quality of this metric is important.
@staticmethod
def _grouped_topological_sort(
graph: nx.DiGraph,
) -> Generator[List[str], None, None]:
"""Topological sort of given graph that groups ties.
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved

Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however
instead of arbitrarily ordering ties in the sort order, ties are grouped together in
lists.

Args:
graph: The graph to be sorted.

The score is stored as a negative number because the internal
PriorityQueue picks lowest values first.
Returns:
A generator that yields lists of nodes, one list per graph depth level.
"""
kwigley marked this conversation as resolved.
Show resolved Hide resolved
indegree_map = {v: d for v, d in graph.in_degree() if d > 0}
zero_indegree = [v for v, d in graph.in_degree() if d == 0]

while zero_indegree:
yield zero_indegree
new_zero_indegree = []
for v in zero_indegree:
for _, child in graph.edges(v):
indegree_map[child] -= 1
if not indegree_map[child]:
new_zero_indegree.append(child)
zero_indegree = new_zero_indegree

We could do this in one pass over the graph instead of len(self.graph)
passes but this is easy. For large graphs this may hurt performance.
def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]:
"""Scoring nodes for processing order.

This operates on the graph, so it would require a lock if called from
outside __init__.
Scores are calculated by the graph depth level. Lowest score (0) should be processed first.

:return Dict[str, int]: The score dict, mapping unique IDs to integer
scores. Lower scores are higher priority.
Args:
graph: The graph to be scored.

Returns:
A dictionary consisting of `node name`:`score` pairs.
"""
# split graph by connected subgraphs
subgraphs = (
graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph))
)

# score all nodes in all subgraphs
scores = {}
for node in self.graph.nodes():
score = -1 * len([
d for d in nx.descendants(self.graph, node)
if self._include_in_cost(d)
])
scores[node] = score
for subgraph in subgraphs:
grouped_nodes = self._grouped_topological_sort(subgraph)
for level, group in enumerate(grouped_nodes):
for node in group:
scores[node] = level

return scores

def get(
Expand Down Expand Up @@ -133,8 +159,6 @@ def _already_known(self, node: UniqueId) -> bool:
def _find_new_additions(self) -> None:
"""Find any nodes in the graph that need to be added to the internal
queue and add them.

Callers must hold the lock.
"""
for node, in_degree in self.graph.in_degree():
if not self._already_known(node) and in_degree == 0:
Expand Down
28 changes: 27 additions & 1 deletion core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dbt.version
import dbt.flags as flags
import dbt.task.run as run_task
import dbt.task.build as build_task
import dbt.task.compile as compile_task
import dbt.task.debug as debug_task
import dbt.task.clean as clean_task
Expand Down Expand Up @@ -377,6 +378,30 @@ def _build_init_subparser(subparsers, base_subparser):
return sub


def _build_build_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
'build',
parents=[base_subparser],
help='''
Run all Seeds, Models, Snapshots, and tests in DAG order
'''
)
sub.set_defaults(
cls=build_task.BuildTask,
which='build',
rpc_method='build'
)
sub.add_argument(
'-x',
'--fail-fast',
action='store_true',
help='''
Stop execution upon a first failure.
'''
)
return sub


def _build_clean_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
'clean',
Expand Down Expand Up @@ -1038,6 +1063,7 @@ def parse_args(args, cls=DBTArgumentParser):
_build_deps_subparser(subs, base_subparser)
_build_list_subparser(subs, base_subparser)

build_sub = _build_build_subparser(subs, base_subparser)
snapshot_sub = _build_snapshot_subparser(subs, base_subparser)
rpc_sub = _build_rpc_subparser(subs, base_subparser)
run_sub = _build_run_subparser(subs, base_subparser)
Expand All @@ -1051,7 +1077,7 @@ def parse_args(args, cls=DBTArgumentParser):
rpc_sub, seed_sub, parse_sub)
# --models, --exclude
# list_sub sets up its own arguments.
_add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub)
_add_selection_arguments(build_sub, run_sub, compile_sub, generate_sub, test_sub)
_add_selection_arguments(snapshot_sub, seed_sub, models_name='select')
# --defer
_add_defer_argument(run_sub, test_sub)
Expand Down
43 changes: 43 additions & 0 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from .compile import CompileTask

from .run import ModelRunner as run_model_runner
from .snapshot import SnapshotRunner as snapshot_model_runner
from .seed import SeedRunner as seed_runner
from .test import TestRunner as test_runner

from dbt.graph import ResourceTypeSelector
from dbt.exceptions import InternalException
from dbt.node_types import NodeType


class BuildTask(CompileTask):
iknox-fa marked this conversation as resolved.
Show resolved Hide resolved
"""The Build task processes all assets of a given process and attempts to 'build'
them in an opinionated fashion. Every resource type outlined in RUNNER_MAP
will be processed by the mapped runner class.

I.E. a resource of type Model is handled by the ModelRunner which is imported
as run_model_runner.
"""

RUNNER_MAP = {
NodeType.Model: run_model_runner,
NodeType.Snapshot: snapshot_model_runner,
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
}

def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException(
'manifest and graph must be set to get node selection'
)

return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=[x for x in self.RUNNER_MAP.keys()],
)

def get_runner_type(self, node):
return self.RUNNER_MAP.get(node.resource_type)
2 changes: 1 addition & 1 deletion core/dbt/task/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_node_selector(self) -> ResourceTypeSelector:
resource_types=NodeType.executable(),
)

def get_runner_type(self):
def get_runner_type(self, _):
return CompileRunner

def task_end_messages(self, results):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def get_node_selector(self):
previous_state=self.previous_state,
)

def get_runner_type(self):
def get_runner_type(self, _):
return FreshnessRunner

def write_result(self, result):
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/task/rpc/sql_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ def interpret_results(self, results):
class RemoteCompileTask(RemoteRunSQLTask, CompileTask):
METHOD_NAME = 'compile_sql'

def get_runner_type(self):
def get_runner_type(self, _):
return RPCCompileRunner


class RemoteRunTask(RemoteRunSQLTask, RunTask):
METHOD_NAME = 'run_sql'

def get_runner_type(self):
def get_runner_type(self, _):
return RPCExecuteRunner
2 changes: 1 addition & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def get_node_selector(self) -> ResourceTypeSelector:
resource_types=[NodeType.Model],
)

def get_runner_type(self):
def get_runner_type(self, _):
return ModelRunner

def task_end_messages(self, results):
Expand Down
10 changes: 6 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
print_run_end_messages,
print_cancel_line,
)

from dbt import ui
from dbt.task.base import ConfiguredTask
from dbt.adapters.base import BaseRelation
Expand All @@ -37,8 +38,9 @@
InternalException,
NotImplementedException,
RuntimeException,
FailFastException
FailFastException,
)

from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, Graph
from dbt.parser.manifest import ManifestLoader

Expand Down Expand Up @@ -127,7 +129,7 @@ def _runtime_initialize(self):

self.job_queue = self.get_graph_queue()

# we use this a couple times. order does not matter.
# we use this a couple of times. order does not matter.
self._flattened_nodes = []
for uid in self.job_queue.get_selected_nodes():
if uid in self.manifest.nodes:
Expand All @@ -148,7 +150,7 @@ def _runtime_initialize(self):
def raise_on_first_error(self):
return False

def get_runner_type(self):
def get_runner_type(self, node):
raise NotImplementedException('Not Implemented')

def result_path(self):
Expand All @@ -165,7 +167,7 @@ def get_runner(self, node):
run_count = self.run_count
num_nodes = self.num_nodes

cls = self.get_runner_type()
cls = self.get_runner_type(node)
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_node_selector(self):
resource_types=[NodeType.Seed],
)

def get_runner_type(self):
def get_runner_type(self, _):
return SeedRunner

def task_end_messages(self, results):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ def get_node_selector(self):
resource_types=[NodeType.Snapshot],
)

def get_runner_type(self):
def get_runner_type(self, _):
return SnapshotRunner
2 changes: 1 addition & 1 deletion core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,5 @@ def get_node_selector(self) -> TestSelector:
previous_state=self.previous_state,
)

def get_runner_type(self):
def get_runner_type(self, _):
return TestRunner
10 changes: 10 additions & 0 deletions test/integration/069_build_test/data/countries.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"iso3","name","iso2","iso_numeric","cow_alpha","cow_numeric","fao_code","un_code","wb_code","imf_code","fips","geonames_name","geonames_id","r_name","aiddata_name","aiddata_code","oecd_name","oecd_code","historical_name","historical_iso3","historical_iso2","historical_iso_numeric"
"ABW","Aruba","AW","533","","","","533","ABW","314","AA","Aruba","3577279","ARUBA","Aruba","12","Aruba","373","","","",""
"AFG","Afghanistan","AF","4","AFG","700","2","4","AFG","512","AF","Afghanistan","1149361","AFGHANISTAN","Afghanistan","1","Afghanistan","625","","","",""
"AGO","Angola","AO","24","ANG","540","7","24","AGO","614","AO","Angola","3351879","ANGOLA","Angola","7","Angola","225","","","",""
"AIA","Anguilla","AI","660","","","","660","AIA","312","AV","Anguilla","3573511","ANGUILLA","Anguilla","8","Anguilla","376","","","",""
"ALA","Aland Islands","AX","248","","","","248","ALA","","","Aland Islands","661882","ALAND ISLANDS","","","","","","","",""
"ALB","Albania","AL","8","ALB","339","3","8","ALB","914","AL","Albania","783754","ALBANIA","Albania","3","Albania","71","","","",""
"AND","Andorra","AD","20","AND","232","6","20","ADO","","AN","Andorra","3041565","ANDORRA","","","","","","","",""
"ANT","Netherlands Antilles","AN","530","","","","","ANT","353","NT","Netherlands Antilles","","NETHERLANDS ANTILLES","Netherlands Antilles","211","Netherlands Antilles","361","Netherlands Antilles","ANT","AN","530"
"ARE","United Arab Emirates","AE","784","UAE","696","225","784","ARE","466","AE","United Arab Emirates","290557","UNITED ARAB EMIRATES","United Arab Emirates","140","United Arab Emirates","576","","","",""
3 changes: 3 additions & 0 deletions test/integration/069_build_test/models/model_0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

select * from {{ ref('countries') }}
3 changes: 3 additions & 0 deletions test/integration/069_build_test/models/model_1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

select * from {{ ref('snap_0') }}
3 changes: 3 additions & 0 deletions test/integration/069_build_test/models/model_2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

select * from {{ ref('snap_1') }}
Loading