diff --git a/CHANGELOG.md b/CHANGELOG.md index ae869002248..f566793c77f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## dbt 0.21.0 ### Features +- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490)) ### Fixes - Fix docs generation for cross-db sources in REDSHIFT RA3 node ([#3236](https://github.com/fishtown-analytics/dbt/issues/3236), [#3408](https://github.com/fishtown-analytics/dbt/pull/3408)) diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 26f75c9722f..d2dbe48c225 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -592,7 +592,8 @@ class ParsedSourceDefinition( UnparsedBaseNode, HasUniqueID, HasRelationMetadata, - HasFqn + HasFqn, + ): name: str source_name: str @@ -689,6 +690,10 @@ def is_ephemeral_model(self): def depends_on_nodes(self): return [] + @property + def depends_on(self): + return {'nodes': []} + @property def refs(self): return [] diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 307940a4819..843d375c219 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -78,6 +78,7 @@ class TestStatus(StrEnum): Error = NodeStatus.Error Fail = NodeStatus.Fail Warn = NodeStatus.Warn + Skipped = NodeStatus.Skipped class FreshnessStatus(StrEnum): diff --git a/core/dbt/graph/queue.py b/core/dbt/graph/queue.py index ee01504fef1..5205cce655f 100644 --- a/core/dbt/graph/queue.py +++ b/core/dbt/graph/queue.py @@ -1,10 +1,8 @@ +import networkx as nx # type: ignore import threading -from queue import PriorityQueue -from typing import ( - Dict, Set, Optional -) -import networkx as nx # type: ignore +from queue import PriorityQueue +from typing import Dict, Set, List, Generator, Optional from .graph import UniqueId from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure @@ -21,9 +19,8 @@ class GraphQueue: that separate threads do not call `.empty()` or `__len__()` and `.get()` at the same time, as there is an unlocked race! """ - def __init__( - self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId] - ): + + def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]): self.graph = graph self.manifest = manifest self._selected = selected @@ -37,7 +34,7 @@ def __init__( # this lock controls most things self.lock = threading.Lock() # store the 'score' of each node as a number. Lower is higher priority. - self._scores = self._calculate_scores() + self._scores = self._get_scores(self.graph) # populate the initial queue self._find_new_additions() # awaits after task end @@ -56,30 +53,59 @@ def _include_in_cost(self, node_id: UniqueId) -> bool: return False return True - def _calculate_scores(self) -> Dict[UniqueId, int]: - """Calculate the 'value' of each node in the graph based on how many - blocking descendants it has. We use this score for the internal - priority queue's ordering, so the quality of this metric is important. + @staticmethod + def _grouped_topological_sort( + graph: nx.DiGraph, + ) -> Generator[List[str], None, None]: + """Topological sort of given graph that groups ties. + + Adapted from `nx.topological_sort`, this function returns a topo sort of a graph however + instead of arbitrarily ordering ties in the sort order, ties are grouped together in + lists. + + Args: + graph: The graph to be sorted. - The score is stored as a negative number because the internal - PriorityQueue picks lowest values first. + Returns: + A generator that yields lists of nodes, one list per graph depth level. + """ + indegree_map = {v: d for v, d in graph.in_degree() if d > 0} + zero_indegree = [v for v, d in graph.in_degree() if d == 0] + + while zero_indegree: + yield zero_indegree + new_zero_indegree = [] + for v in zero_indegree: + for _, child in graph.edges(v): + indegree_map[child] -= 1 + if not indegree_map[child]: + new_zero_indegree.append(child) + zero_indegree = new_zero_indegree - We could do this in one pass over the graph instead of len(self.graph) - passes but this is easy. For large graphs this may hurt performance. + def _get_scores(self, graph: nx.DiGraph) -> Dict[str, int]: + """Scoring nodes for processing order. - This operates on the graph, so it would require a lock if called from - outside __init__. + Scores are calculated by the graph depth level. Lowest score (0) should be processed first. - :return Dict[str, int]: The score dict, mapping unique IDs to integer - scores. Lower scores are higher priority. + Args: + graph: The graph to be scored. + + Returns: + A dictionary consisting of `node name`:`score` pairs. """ + # split graph by connected subgraphs + subgraphs = ( + graph.subgraph(x) for x in nx.connected_components(nx.Graph(graph)) + ) + + # score all nodes in all subgraphs scores = {} - for node in self.graph.nodes(): - score = -1 * len([ - d for d in nx.descendants(self.graph, node) - if self._include_in_cost(d) - ]) - scores[node] = score + for subgraph in subgraphs: + grouped_nodes = self._grouped_topological_sort(subgraph) + for level, group in enumerate(grouped_nodes): + for node in group: + scores[node] = level + return scores def get( @@ -133,8 +159,6 @@ def _already_known(self, node: UniqueId) -> bool: def _find_new_additions(self) -> None: """Find any nodes in the graph that need to be added to the internal queue and add them. - - Callers must hold the lock. """ for node, in_degree in self.graph.in_degree(): if not self._already_known(node) and in_degree == 0: diff --git a/core/dbt/main.py b/core/dbt/main.py index ae53b13af46..5068cdfbd33 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -11,6 +11,7 @@ import dbt.version import dbt.flags as flags import dbt.task.run as run_task +import dbt.task.build as build_task import dbt.task.compile as compile_task import dbt.task.debug as debug_task import dbt.task.clean as clean_task @@ -377,6 +378,30 @@ def _build_init_subparser(subparsers, base_subparser): return sub +def _build_build_subparser(subparsers, base_subparser): + sub = subparsers.add_parser( + 'build', + parents=[base_subparser], + help=''' + Run all Seeds, Models, Snapshots, and tests in DAG order + ''' + ) + sub.set_defaults( + cls=build_task.BuildTask, + which='build', + rpc_method='build' + ) + sub.add_argument( + '-x', + '--fail-fast', + action='store_true', + help=''' + Stop execution upon a first failure. + ''' + ) + return sub + + def _build_clean_subparser(subparsers, base_subparser): sub = subparsers.add_parser( 'clean', @@ -1038,6 +1063,7 @@ def parse_args(args, cls=DBTArgumentParser): _build_deps_subparser(subs, base_subparser) _build_list_subparser(subs, base_subparser) + build_sub = _build_build_subparser(subs, base_subparser) snapshot_sub = _build_snapshot_subparser(subs, base_subparser) rpc_sub = _build_rpc_subparser(subs, base_subparser) run_sub = _build_run_subparser(subs, base_subparser) @@ -1051,7 +1077,7 @@ def parse_args(args, cls=DBTArgumentParser): rpc_sub, seed_sub, parse_sub) # --models, --exclude # list_sub sets up its own arguments. - _add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub) + _add_selection_arguments(build_sub, run_sub, compile_sub, generate_sub, test_sub) _add_selection_arguments(snapshot_sub, seed_sub, models_name='select') # --defer _add_defer_argument(run_sub, test_sub) diff --git a/core/dbt/task/build.py b/core/dbt/task/build.py new file mode 100644 index 00000000000..8d93003a137 --- /dev/null +++ b/core/dbt/task/build.py @@ -0,0 +1,43 @@ +from .compile import CompileTask + +from .run import ModelRunner as run_model_runner +from .snapshot import SnapshotRunner as snapshot_model_runner +from .seed import SeedRunner as seed_runner +from .test import TestRunner as test_runner + +from dbt.graph import ResourceTypeSelector +from dbt.exceptions import InternalException +from dbt.node_types import NodeType + + +class BuildTask(CompileTask): + """The Build task processes all assets of a given process and attempts to 'build' + them in an opinionated fashion. Every resource type outlined in RUNNER_MAP + will be processed by the mapped runner class. + + I.E. a resource of type Model is handled by the ModelRunner which is imported + as run_model_runner. + """ + + RUNNER_MAP = { + NodeType.Model: run_model_runner, + NodeType.Snapshot: snapshot_model_runner, + NodeType.Seed: seed_runner, + NodeType.Test: test_runner, + } + + def get_node_selector(self) -> ResourceTypeSelector: + if self.manifest is None or self.graph is None: + raise InternalException( + 'manifest and graph must be set to get node selection' + ) + + return ResourceTypeSelector( + graph=self.graph, + manifest=self.manifest, + previous_state=self.previous_state, + resource_types=[x for x in self.RUNNER_MAP.keys()], + ) + + def get_runner_type(self, node): + return self.RUNNER_MAP.get(node.resource_type) diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index 7b93ba6d3c5..80b94107af2 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -56,7 +56,7 @@ def get_node_selector(self) -> ResourceTypeSelector: resource_types=NodeType.executable(), ) - def get_runner_type(self): + def get_runner_type(self, _): return CompileRunner def task_end_messages(self, results): diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 33bc18f218a..d1a8fb9ac61 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -155,7 +155,7 @@ def get_node_selector(self): previous_state=self.previous_state, ) - def get_runner_type(self): + def get_runner_type(self, _): return FreshnessRunner def write_result(self, result): diff --git a/core/dbt/task/rpc/sql_commands.py b/core/dbt/task/rpc/sql_commands.py index 557c5bac9a5..6580dcfdb5b 100644 --- a/core/dbt/task/rpc/sql_commands.py +++ b/core/dbt/task/rpc/sql_commands.py @@ -202,12 +202,12 @@ def interpret_results(self, results): class RemoteCompileTask(RemoteRunSQLTask, CompileTask): METHOD_NAME = 'compile_sql' - def get_runner_type(self): + def get_runner_type(self, _): return RPCCompileRunner class RemoteRunTask(RemoteRunSQLTask, RunTask): METHOD_NAME = 'run_sql' - def get_runner_type(self): + def get_runner_type(self, _): return RPCExecuteRunner diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 8469d849e07..ffc5e93642d 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -452,7 +452,7 @@ def get_node_selector(self) -> ResourceTypeSelector: resource_types=[NodeType.Model], ) - def get_runner_type(self): + def get_runner_type(self, _): return ModelRunner def task_end_messages(self, results): diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index ed9bab6e2f7..e046b498d07 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -12,6 +12,7 @@ print_run_end_messages, print_cancel_line, ) + from dbt import ui from dbt.task.base import ConfiguredTask from dbt.adapters.base import BaseRelation @@ -37,8 +38,9 @@ InternalException, NotImplementedException, RuntimeException, - FailFastException + FailFastException, ) + from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, Graph from dbt.parser.manifest import ManifestLoader @@ -127,7 +129,7 @@ def _runtime_initialize(self): self.job_queue = self.get_graph_queue() - # we use this a couple times. order does not matter. + # we use this a couple of times. order does not matter. self._flattened_nodes = [] for uid in self.job_queue.get_selected_nodes(): if uid in self.manifest.nodes: @@ -148,7 +150,7 @@ def _runtime_initialize(self): def raise_on_first_error(self): return False - def get_runner_type(self): + def get_runner_type(self, node): raise NotImplementedException('Not Implemented') def result_path(self): @@ -165,7 +167,7 @@ def get_runner(self, node): run_count = self.run_count num_nodes = self.num_nodes - cls = self.get_runner_type() + cls = self.get_runner_type(node) return cls(self.config, adapter, node, run_count, num_nodes) def call_runner(self, runner): diff --git a/core/dbt/task/seed.py b/core/dbt/task/seed.py index d2a0b2c4776..edd60f52c85 100644 --- a/core/dbt/task/seed.py +++ b/core/dbt/task/seed.py @@ -57,7 +57,7 @@ def get_node_selector(self): resource_types=[NodeType.Seed], ) - def get_runner_type(self): + def get_runner_type(self, _): return SeedRunner def task_end_messages(self, results): diff --git a/core/dbt/task/snapshot.py b/core/dbt/task/snapshot.py index c247dd77478..4515f11c583 100644 --- a/core/dbt/task/snapshot.py +++ b/core/dbt/task/snapshot.py @@ -38,5 +38,5 @@ def get_node_selector(self): resource_types=[NodeType.Snapshot], ) - def get_runner_type(self): + def get_runner_type(self, _): return SnapshotRunner diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index beabf662f56..28c29f343c9 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -192,5 +192,5 @@ def get_node_selector(self) -> TestSelector: previous_state=self.previous_state, ) - def get_runner_type(self): + def get_runner_type(self, _): return TestRunner diff --git a/test/integration/069_build_test/data/countries.csv b/test/integration/069_build_test/data/countries.csv new file mode 100644 index 00000000000..82db396fd6f --- /dev/null +++ b/test/integration/069_build_test/data/countries.csv @@ -0,0 +1,10 @@ +"iso3","name","iso2","iso_numeric","cow_alpha","cow_numeric","fao_code","un_code","wb_code","imf_code","fips","geonames_name","geonames_id","r_name","aiddata_name","aiddata_code","oecd_name","oecd_code","historical_name","historical_iso3","historical_iso2","historical_iso_numeric" +"ABW","Aruba","AW","533","","","","533","ABW","314","AA","Aruba","3577279","ARUBA","Aruba","12","Aruba","373","","","","" +"AFG","Afghanistan","AF","4","AFG","700","2","4","AFG","512","AF","Afghanistan","1149361","AFGHANISTAN","Afghanistan","1","Afghanistan","625","","","","" +"AGO","Angola","AO","24","ANG","540","7","24","AGO","614","AO","Angola","3351879","ANGOLA","Angola","7","Angola","225","","","","" +"AIA","Anguilla","AI","660","","","","660","AIA","312","AV","Anguilla","3573511","ANGUILLA","Anguilla","8","Anguilla","376","","","","" +"ALA","Aland Islands","AX","248","","","","248","ALA","","","Aland Islands","661882","ALAND ISLANDS","","","","","","","","" +"ALB","Albania","AL","8","ALB","339","3","8","ALB","914","AL","Albania","783754","ALBANIA","Albania","3","Albania","71","","","","" +"AND","Andorra","AD","20","AND","232","6","20","ADO","","AN","Andorra","3041565","ANDORRA","","","","","","","","" +"ANT","Netherlands Antilles","AN","530","","","","","ANT","353","NT","Netherlands Antilles","","NETHERLANDS ANTILLES","Netherlands Antilles","211","Netherlands Antilles","361","Netherlands Antilles","ANT","AN","530" +"ARE","United Arab Emirates","AE","784","UAE","696","225","784","ARE","466","AE","United Arab Emirates","290557","UNITED ARAB EMIRATES","United Arab Emirates","140","United Arab Emirates","576","","","","" \ No newline at end of file diff --git a/test/integration/069_build_test/models/model_0.sql b/test/integration/069_build_test/models/model_0.sql new file mode 100644 index 00000000000..2fe54b32418 --- /dev/null +++ b/test/integration/069_build_test/models/model_0.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +select * from {{ ref('countries') }} \ No newline at end of file diff --git a/test/integration/069_build_test/models/model_1.sql b/test/integration/069_build_test/models/model_1.sql new file mode 100644 index 00000000000..d8efda2c3b2 --- /dev/null +++ b/test/integration/069_build_test/models/model_1.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +select * from {{ ref('snap_0') }} \ No newline at end of file diff --git a/test/integration/069_build_test/models/model_2.sql b/test/integration/069_build_test/models/model_2.sql new file mode 100644 index 00000000000..25bea5224cf --- /dev/null +++ b/test/integration/069_build_test/models/model_2.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +select * from {{ ref('snap_1') }} \ No newline at end of file diff --git a/test/integration/069_build_test/models/model_99.sql b/test/integration/069_build_test/models/model_99.sql new file mode 100644 index 00000000000..38c103e823b --- /dev/null +++ b/test/integration/069_build_test/models/model_99.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +select '1' as "num" \ No newline at end of file diff --git a/test/integration/069_build_test/models/test.yml b/test/integration/069_build_test/models/test.yml new file mode 100644 index 00000000000..6f9133aa487 --- /dev/null +++ b/test/integration/069_build_test/models/test.yml @@ -0,0 +1,15 @@ +version: 2 + +models: + - name: model_0 + columns: + - name: iso3 + tests: + - unique + - not_null + - name: model_2 + columns: + - name: iso3 + tests: + - unique + - not_null diff --git a/test/integration/069_build_test/snapshots/snap_0.sql b/test/integration/069_build_test/snapshots/snap_0.sql new file mode 100644 index 00000000000..03e8e491f21 --- /dev/null +++ b/test/integration/069_build_test/snapshots/snap_0.sql @@ -0,0 +1,16 @@ +{% snapshot snap_0 %} + +{{ + config( + target_database=database, + target_schema=schema, + unique_key='iso3', + + strategy='timestamp', + updated_at='snap_0_updated_at', + ) +}} + +select *, current_timestamp as snap_0_updated_at from {{ ref('model_0') }} + +{% endsnapshot %} \ No newline at end of file diff --git a/test/integration/069_build_test/snapshots/snap_1.sql b/test/integration/069_build_test/snapshots/snap_1.sql new file mode 100644 index 00000000000..90455ed4625 --- /dev/null +++ b/test/integration/069_build_test/snapshots/snap_1.sql @@ -0,0 +1,39 @@ +{% snapshot snap_1 %} + +{{ + config( + target_database=database, + target_schema=schema, + unique_key='iso3', + + strategy='timestamp', + updated_at='snap_1_updated_at', + ) +}} + +SELECT + iso3, + "name", + iso2, + iso_numeric, + cow_alpha, + cow_numeric, + fao_code, + un_code, + wb_code, + imf_code, + fips, + geonames_name, + geonames_id, + r_name, + aiddata_name, + aiddata_code, + oecd_name, + oecd_code, + historical_name, + historical_iso3, + historical_iso2, + historical_iso_numeric, + current_timestamp as snap_1_updated_at from {{ ref('model_1') }} + +{% endsnapshot %} \ No newline at end of file diff --git a/test/integration/069_build_test/snapshots/snap_99.sql b/test/integration/069_build_test/snapshots/snap_99.sql new file mode 100644 index 00000000000..5288dbbb805 --- /dev/null +++ b/test/integration/069_build_test/snapshots/snap_99.sql @@ -0,0 +1,15 @@ +{% snapshot snap_99 %} + +{{ + config( + target_database=database, + target_schema=schema, + strategy='timestamp', + unique_key='num', + updated_at='snap_99_updated_at', + ) +}} + +select *, current_timestamp as snap_99_updated_at from {{ ref('model_99') }} + +{% endsnapshot %} \ No newline at end of file diff --git a/test/integration/069_build_test/test_build.py b/test/integration/069_build_test/test_build.py new file mode 100644 index 00000000000..3e4a7450302 --- /dev/null +++ b/test/integration/069_build_test/test_build.py @@ -0,0 +1,37 @@ +from test.integration.base import DBTIntegrationTest, use_profile +import yaml + + +class TestBuild(DBTIntegrationTest): + @property + def schema(self): + return "build_test_069" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + "config-version": 2, + "snapshot-paths": ["snapshots"], + "data-paths": ["data"], + "source-paths": ["models"], + "seeds": { + "quote_columns": False, + }, + } + + def build(self, expect_pass=True, extra_args=None, **kwargs): + args = ["build"] + if kwargs: + args.extend(("--args", yaml.safe_dump(kwargs))) + if extra_args: + args.extend(extra_args) + + return self.run_dbt(args, expect_pass=expect_pass) + + @use_profile("postgres") + def test__postgres_build_happy_path(self): + self.build() diff --git a/test/unit/test_linker.py b/test/unit/test_linker.py index e9f18a36059..b3b3627ff76 100644 --- a/test/unit/test_linker.py +++ b/test/unit/test_linker.py @@ -179,4 +179,4 @@ def test__find_cycles__no_cycles(self): for (l, r) in actual_deps: self.linker.dependency(l, r) - self.assertIsNone(self.linker.find_cycles()) + self.assertIsNone(self.linker.find_cycles()) \ No newline at end of file diff --git a/test/unit/test_registry_get_request_exception.py b/test/unit/test_registry_get_request_exception.py index 3ebb46221ab..254169d9894 100644 --- a/test/unit/test_registry_get_request_exception.py +++ b/test/unit/test_registry_get_request_exception.py @@ -6,4 +6,4 @@ class testRegistryGetRequestException(unittest.TestCase): def test_registry_request_error_catching(self): # using non routable IP to test connection error logic in the _get function - self.assertRaises(RegistryException, _get, '', 'http://10.255.255.1') + self.assertRaises(RegistryException, _get, '', 'http://0.0.0.0')