Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re adds the dbt compile command #407

Merged
merged 6 commits into from
May 8, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def print_compile_stats(stats):
stat_line = ", ".join(
["{} {}".format(ct, names.get(t)) for t, ct in results.items()])

logger.info("Compiled {}".format(stat_line))
logger.info("Found {}".format(stat_line))


def prepend_ctes(model, flat_graph):
Expand Down Expand Up @@ -166,8 +166,8 @@ def __write(self, build_filepath, payload):
# concurrent writes that try to create the same dir can fail
try:
os.makedirs(os.path.dirname(target_path))
except FileExistsError:
logger.debug("Caught concurrent write: {}".format(target_path))
except OSError as e:
logger.debug("Caught concurrent write: {}\n{}".format(target_path, str(e)))
pass

dbt.compat.write_file(target_path, payload)
Expand Down
46 changes: 26 additions & 20 deletions dbt/include/global_project/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@

{% macro dbt__simple_create_table(schema, identifier, dist, sort, sql) -%}
create table "{{ schema }}"."{{ identifier }}"
{{ dist }} {{ sort }} as (
{{ sql }}
);
{%- endmacro %}

{% macro dbt__create_table(schema, model, dist, sort, sql, flags, adapter) -%}

{%- set identifier = model['name'] -%}
{%- set already_exists = adapter.already_exists(schema, identifier) -%}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

{%- set non_destructive_mode = flags.NON_DESTRUCTIVE == True -%}

{% if non_destructive_mode and already_exists -%}
create temporary table {{ identifier }}__dbt_tmp {{ dist }} {{ sort }} as (
{{ sql }}
);
{% if non_destructive_mode -%}
{%- if adapter.already_exists(schema, identifier) -%}
create temporary table {{ identifier }}__dbt_tmp {{ dist }} {{ sort }} as (
{{ sql }}
);

{% set dest_columns = adapter.get_columns_in_table(schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}
{% set dest_columns = adapter.get_columns_in_table(schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

insert into {{ schema }}.{{ identifier }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from "{{ identifier }}__dbt_tmp"
);
insert into {{ schema }}.{{ identifier }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from "{{ identifier }}__dbt_tmp"
);
{%- else -%}
{{ dbt__simple_create_table(schema, identifier, dist, sort, sql) }}
{%- endif -%}
{%- elif non_destructive_mode -%}
create table "{{ schema }}"."{{ identifier }}"
{{ dist }} {{ sort }} as (
{{ sql }}
);
{{ dbt__simple_create_table(schema, identifier, dist, sort, sql) }}
{%- else -%}
create table "{{ schema }}"."{{ identifier }}__dbt_tmp"
{{ dist }} {{ sort }} as (
{{ sql }}
);
{% set tmp_identifier = identifier + '__dbt_tmp' %}
{{ dbt__simple_create_table(schema, tmp_identifier, dist, sort, sql) }}
{%- endif %}

{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{% macro dbt__create_view(schema, model, sql, flags, adapter) -%}

{%- set identifier = model['name'] -%}
{%- set already_exists = adapter.already_exists(schema, identifier) -%}
{%- set non_destructive_mode = flags.NON_DESTRUCTIVE == True -%}

{%- if non_destructive_mode -%}
Expand Down
90 changes: 48 additions & 42 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dbt.flags as flags
import dbt.project as project
import dbt.task.run as run_task
import dbt.task.compile as compile_task
import dbt.task.debug as debug_task
import dbt.task.clean as clean_task
import dbt.task.deps as deps_task
Expand Down Expand Up @@ -289,48 +290,53 @@ def parse_args(args):
)
sub.set_defaults(cls=archive_task.ArchiveTask, which='archive')

sub = subs.add_parser('run', parents=[base_subparser])
sub.add_argument(
'--models',
required=False,
nargs='+',
help="""
Specify the models to include.
"""
)
sub.add_argument(
'--exclude',
required=False,
nargs='+',
help="""
Specify the models to exclude.
"""
)
sub.add_argument(
'--threads',
type=int,
required=False,
help="""
Specify number of threads to use while executing models. Overrides
settings in profiles.yml.
"""
)
sub.add_argument(
'--non-destructive',
action='store_true',
help="""
If specified, DBT will not drop views. Tables will be truncated instead
of dropped.
"""
)
sub.add_argument(
'--full-refresh',
action='store_true',
help="""
If specified, DBT will drop incremental models and fully-recalculate
the incremental table from the model definition.
""")
sub.set_defaults(cls=run_task.RunTask, which='run')
run_sub = subs.add_parser('run', parents=[base_subparser])
run_sub.set_defaults(cls=run_task.RunTask, which='run')

compile_sub = subs.add_parser('compile', parents=[base_subparser])
compile_sub.set_defaults(cls=compile_task.CompileTask, which='compile')

for sub in [run_sub, compile_sub]:
sub.add_argument(
'--models',
required=False,
nargs='+',
help="""
Specify the models to include.
"""
)
sub.add_argument(
'--exclude',
required=False,
nargs='+',
help="""
Specify the models to exclude.
"""
)
sub.add_argument(
'--threads',
type=int,
required=False,
help="""
Specify number of threads to use while executing models. Overrides
settings in profiles.yml.
"""
)
sub.add_argument(
'--non-destructive',
action='store_true',
help="""
If specified, DBT will not drop views. Tables will be truncated
instead of dropped.
"""
)
sub.add_argument(
'--full-refresh',
action='store_true',
help="""
If specified, DBT will drop incremental models and
fully-recalculate the incremental table from the model definition.
""")

sub = subs.add_parser('seed', parents=[base_subparser])
sub.add_argument(
Expand Down
2 changes: 2 additions & 0 deletions dbt/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def load_and_parse_sql(package_name, root_project, all_projects, root_dir,
if resource_type == NodeType.Test:
path = dbt.utils.get_pseudo_test_path(
name, file_match.get('relative_path'), 'data_test')
elif resource_type == NodeType.Analysis:
path = os.path.join('analysis', file_match.get('relative_path'))
else:
path = file_match.get('relative_path')

Expand Down
67 changes: 57 additions & 10 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,28 @@ def execute_node(self, node, flat_graph, existing, profile, adapter):

return node, result

def compile_node(self, node, flat_graph):

compiler = dbt.compilation.Compiler(self.project)
node = compiler.compile_node(node, flat_graph)
return node

def safe_compile_node(self, data):
node, flat_graph, existing, schema_name, node_index, num_nodes = data

result = RunModelResult(node)
profile = self.project.run_environment()
adapter = get_adapter(profile)

try:
compiled_node = self.compile_node(node, flat_graph)
result = RunModelResult(compiled_node)

finally:
adapter.release_connection(profile, node.get('name'))

return result

def safe_execute_node(self, data):
node, flat_graph, existing, schema_name, node_index, num_nodes = data

Expand All @@ -531,8 +553,7 @@ def safe_execute_node(self, data):
profile = self.project.run_environment()
adapter = get_adapter(profile)

compiler = dbt.compilation.Compiler(self.project)
node = compiler.compile_node(node, flat_graph)
node = self.compile_node(node, flat_graph)

if not is_ephemeral:
node, status = self.execute_node(node, flat_graph, existing,
Expand Down Expand Up @@ -625,7 +646,7 @@ def skip_dependent(node):
return skip_dependent

def execute_nodes(self, flat_graph, node_dependency_list, on_failure,
should_run_hooks=False):
should_run_hooks=False, should_execute=True):
profile = self.project.run_environment()
adapter = get_adapter(profile)
master_connection = adapter.get_connection(profile)
Expand Down Expand Up @@ -660,7 +681,8 @@ def execute_nodes(self, flat_graph, node_dependency_list, on_failure,

pool = ThreadPool(num_threads)

print_counts(flat_nodes)
if should_execute:
print_counts(flat_nodes)

start_time = time.time()

Expand Down Expand Up @@ -689,8 +711,13 @@ def get_idx(node):
nodes_to_execute = [node for node in node_list
if not node.get('skip')]

if should_execute:
action = self.safe_execute_node
else:
action = self.safe_compile_node

for result in pool.imap_unordered(
self.safe_execute_node,
action,
[(node, flat_graph, existing, schema_name,
get_idx(node), num_nodes,)
for node in nodes_to_execute]):
Expand All @@ -701,7 +728,8 @@ def get_idx(node):
flat_graph['nodes'][result.node.get('unique_id')] = result.node

index = get_idx(result.node)
track_model_run(index, num_nodes, result)
if should_execute:
track_model_run(index, num_nodes, result)

if result.errored:
on_failure(result.node)
Expand All @@ -720,7 +748,8 @@ def get_idx(node):

execution_time = time.time() - start_time

print_results_line(node_results, execution_time)
if should_execute:
print_results_line(node_results, execution_time)

return node_results

Expand Down Expand Up @@ -790,7 +819,8 @@ def try_create_schema(self):

def run_types_from_graph(self, include_spec, exclude_spec,
resource_types, tags, should_run_hooks=False,
flatten_graph=False):
flatten_graph=False, should_execute=True):

compiler = dbt.compilation.Compiler(self.project)
compiler.initialize()
(flat_graph, linker) = compiler.compile()
Expand Down Expand Up @@ -823,12 +853,14 @@ def run_types_from_graph(self, include_spec, exclude_spec,
adapter = get_adapter(profile)

try:
self.try_create_schema()
if should_execute:
self.try_create_schema()

on_failure = self.on_model_failure(linker, selected_nodes)

results = self.execute_nodes(flat_graph, dependency_list,
on_failure, should_run_hooks)
on_failure, should_run_hooks,
should_execute)

finally:
adapter.cleanup_connections()
Expand All @@ -837,6 +869,21 @@ def run_types_from_graph(self, include_spec, exclude_spec,

# ------------------------------------

def compile_models(self, include_spec, exclude_spec):
resource_types = [
NodeType.Model,
NodeType.Test,
NodeType.Archive,
NodeType.Analysis
]

return self.run_types_from_graph(include_spec,
exclude_spec,
resource_types=resource_types,
tags=set(),
should_run_hooks=False,
should_execute=False)

def run_models(self, include_spec, exclude_spec):
return self.run_types_from_graph(include_spec,
exclude_spec,
Expand Down
19 changes: 19 additions & 0 deletions dbt/task/compile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import print_function

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.runner import RunManager


class CompileTask:
def __init__(self, args, project):
self.args = args
self.project = project

def run(self):
runner = RunManager(
self.project, self.project['target-path'], self.args
)

runner.compile_models(self.args.models, self.args.exclude)

logger.info('Done.')
2 changes: 0 additions & 2 deletions dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from dbt.runner import RunManager
import dbt.utils

THREAD_LIMIT = 9


class RunTask:
def __init__(self, args, project):
Expand Down