Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake create or replace #1409

Merged
merged 34 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9772c1c
Adding incremental logic to Snowflake plugins
Feb 18, 2019
56801f9
Adding changes based on Drew's recommendations
Mar 2, 2019
38254a8
make create or replace snowflake macro
bastienboutonnet Apr 16, 2019
9222c79
implement create or replace in table mater
bastienboutonnet Apr 16, 2019
a35ad18
implement insert when no unique key and full refresh solution
bastienboutonnet Apr 20, 2019
2d5525e
add some logging
bastienboutonnet Apr 20, 2019
6a104c1
test
bastienboutonnet Apr 20, 2019
d168bdd
Merge branch 'snowflake_create_or_replace' of github.com:bastienbouto…
bastienboutonnet Apr 20, 2019
91d869e
revert test
bastienboutonnet Apr 20, 2019
fb26ce5
Merge branch 'snowflake_create_or_replace' of github.com:bastienbouto…
bastienboutonnet Apr 20, 2019
dacce7c
fix insert cols call and temp workaround call of snowflake
bastienboutonnet Apr 21, 2019
54c02ef
some logging and temp call workaround
bastienboutonnet Apr 21, 2019
2830b6a
make default create or replace macro to allow snowflake adapter pick …
bastienboutonnet Apr 22, 2019
95c9f76
remove snowflake__ direct call in incremental
bastienboutonnet Apr 22, 2019
0433369
remove testing logging messages
bastienboutonnet Apr 22, 2019
e83edd3
fixme/todo regarding non destructive flag
bastienboutonnet Apr 22, 2019
9591b86
(PR fdbk) rm extra macro
bastienboutonnet Apr 26, 2019
f99efbf
remove references to temp and backup relations
bastienboutonnet Apr 26, 2019
5c1c588
more explicit comments and quick formatting
bastienboutonnet Apr 26, 2019
43a9db5
quick todo marker
bastienboutonnet Apr 27, 2019
3ab8238
Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_replace
bastienboutonnet Apr 27, 2019
4f62978
Revert "Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_…
bastienboutonnet Apr 27, 2019
08820a2
fixing my jetlagged introduced bugs
bastienboutonnet Apr 27, 2019
0432c1d
conflic resolve
bastienboutonnet Apr 27, 2019
90f8e0b
Revert "Revert "Merge branch 'dev/wilt-chamberlain' into snowflake_cr…
bastienboutonnet Apr 27, 2019
afe236d
cleaning up some commented out stuff
bastienboutonnet Apr 27, 2019
8af7984
remove non-destructive logic
bastienboutonnet Apr 28, 2019
85eac05
cleaner select
bastienboutonnet Apr 28, 2019
3ef519d
todo and comments clean up
bastienboutonnet Apr 28, 2019
7a2279e
move unique key workaround to snowflake macro
bastienboutonnet Apr 28, 2019
3a7dcd9
Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_replace
drewbanin Apr 30, 2019
1f97fe4
Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_replace
drewbanin Apr 30, 2019
8d74550
fix tests
drewbanin May 10, 2019
90abc2d
(closes #1455) Qualify Snowflake temp tables with a database and schema
drewbanin May 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
);
{% endmacro %}


{% macro create_view_as(relation, sql) -%}
{{ adapter_macro('create_view_as', relation, sql) }}
{%- endmacro %}
Expand Down
8 changes: 2 additions & 6 deletions plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
{% macro snowflake__create_table_as(temporary, relation, sql) -%}
{% if temporary %}
use schema {{ adapter.quote_as_configured(schema, 'schema') }};
{% endif %}

{%- set transient = config.get('transient', default=true) -%}

create {% if temporary -%}
create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation.include(database=(not temporary), schema=(not temporary)) }}
{%- endif %} table {{ relation }}
as (
{{ sql }}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

{% materialization incremental, adapter='snowflake' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(database=database,
schema=schema,
identifier=identifier,
type='table') -%}

{%- set tmp_relation = api.Relation.create(database=database,
schema=schema,
identifier=identifier ~ "__dbt_tmp",
type='table') -%}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{# -- If the destination is a view, then we have no choice but to drop it #}
{% if old_relation is not none and old_relation.type == 'view' %}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ adapter.drop_relation(old_relation) }}
{% set old_relation = none %}
{% endif %}

-- build model
{% if full_refresh_mode or old_relation is none -%}

{%- call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}
{%- endcall -%}

{%- else -%}

{%- call statement() -%}
{{ create_table_as(true, tmp_relation, sql) }}
{%- endcall -%}

{{ adapter.expand_target_column_types(temp_table=tmp_relation.identifier,
to_relation=target_relation) }}
{% set incremental_sql %}
(
select * from {{ tmp_relation }}
)
{% endset %}

{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- call statement('main') -%}
{{ get_merge_sql(target_relation, incremental_sql, unique_key, dest_columns) }}
{% endcall %}

{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
{% macro snowflake__get_merge_sql(target, source, unique_key, dest_columns) %}
{{ common_get_merge_sql(target, source, unique_key, dest_columns) }}
{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns) %}
{%- set dest_cols_csv = dest_columns | map(attribute="name") | join(', ') -%}
{%- if unique_key is none -%}
{# workaround for Snowflake not being happy with "on false" merge.
when no unique key is provided we'll do a regular insert, other times we'll
use the preferred merge. #}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source_sql }}
);
{%- else -%}
{# call regular merge when a unique key is present. #}
{{ common_get_merge_sql(target, source_sql, unique_key, dest_columns) }}
{%- endif -%}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,68 +1,32 @@
{% materialization table, adapter='snowflake' %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema,
database=database, type='table') -%}

/*
See ../view/view.sql for more information about this relation.
*/

-- drop the backup relation if it exists, then make a new one that uses the old relation's type
{%- set backup_relation = adapter.get_relation(database=database, schema=schema, identifier=backup_identifier) -%}
{% if backup_relation is not none -%}
{{ adapter.drop_relation(backup_relation) }}
{%- endif %}
{%- set backup_relation_type = 'table' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema,
database=database,
type=backup_relation_type) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

-- build model
--build model
{% call statement('main') -%}
{{ create_table_as(False, intermediate_relation, sql) }}
{{ create_table_as(false, target_relation, sql) }}
{%- endcall %}

-- cleanup
{% if old_relation is not none %}
{% if old_relation.type == 'view' %}
{#-- This is the primary difference between Snowflake and Redshift. Renaming this view
-- would cause an error if the view has become invalid due to upstream schema changes #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% else %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,3 @@ def test_postgres_delete__dbt_tmp_relation(self):

self.assertTableDoesNotExist('view__dbt_tmp')
self.assertTablesEqual("seed","view")


@use_profile('snowflake')
def test_snowflake_backup_different_type(self):
self.run_sql_file(
'test/integration/017_runtime_materialization_tests/create_backup_and_original.sql'
)
results = self.run_dbt(['run', '--model', 'materialized'])
self.assertEqual(len(results), 1)

self.assertTableDoesNotExist('materialized__dbt_tmp')
self.assertTableDoesNotExist('materialized__dbt_backup')
self.assertTablesEqual("seed", "materialized")