diff --git a/core/dbt/include/global_project/macros/adapters/common.sql b/core/dbt/include/global_project/macros/adapters/common.sql index b30ba91a85b..b0e96d7c3b9 100644 --- a/core/dbt/include/global_project/macros/adapters/common.sql +++ b/core/dbt/include/global_project/macros/adapters/common.sql @@ -60,7 +60,6 @@ ); {% endmacro %} - {% macro create_view_as(relation, sql) -%} {{ adapter_macro('create_view_as', relation, sql) }} {%- endmacro %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql index 2870448b22a..644569cbf56 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql @@ -1,15 +1,11 @@ {% macro snowflake__create_table_as(temporary, relation, sql) -%} - {% if temporary %} - use schema {{ adapter.quote_as_configured(schema, 'schema') }}; - {% endif %} - {%- set transient = config.get('transient', default=true) -%} - create {% if temporary -%} + create or replace {% if temporary -%} temporary {%- elif transient -%} transient - {%- endif %} table {{ relation.include(database=(not temporary), schema=(not temporary)) }} + {%- endif %} table {{ relation }} as ( {{ sql }} ); diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql new file mode 100644 index 00000000000..ef7a2ec8e35 --- /dev/null +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql @@ -0,0 +1,67 @@ + +{% materialization incremental, adapter='snowflake' -%} + + {%- set unique_key = config.get('unique_key') -%} + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + {%- set identifier = model['alias'] -%} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(database=database, + schema=schema, + identifier=identifier, + type='table') -%} + + {%- set tmp_relation = api.Relation.create(database=database, + schema=schema, + identifier=identifier ~ "__dbt_tmp", + type='table') -%} + + -- setup + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {# -- If the destination is a view, then we have no choice but to drop it #} + {% if old_relation is not none and old_relation.type == 'view' %} + {{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }} + {{ adapter.drop_relation(old_relation) }} + {% set old_relation = none %} + {% endif %} + + -- build model + {% if full_refresh_mode or old_relation is none -%} + + {%- call statement('main') -%} + {{ create_table_as(false, target_relation, sql) }} + {%- endcall -%} + + {%- else -%} + + {%- call statement() -%} + {{ create_table_as(true, tmp_relation, sql) }} + {%- endcall -%} + + {{ adapter.expand_target_column_types(temp_table=tmp_relation.identifier, + to_relation=target_relation) }} + {% set incremental_sql %} + ( + select * from {{ tmp_relation }} + ) + {% endset %} + + {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} + {%- call statement('main') -%} + {{ get_merge_sql(target_relation, incremental_sql, unique_key, dest_columns) }} + {% endcall %} + + {%- endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {{ adapter.commit() }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + +{%- endmaterialization %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql index ac92f2ef26e..caa506f7e7b 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/merge.sql @@ -1,3 +1,17 @@ -{% macro snowflake__get_merge_sql(target, source, unique_key, dest_columns) %} - {{ common_get_merge_sql(target, source, unique_key, dest_columns) }} +{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns) %} + {%- set dest_cols_csv = dest_columns | map(attribute="name") | join(', ') -%} + {%- if unique_key is none -%} + {# workaround for Snowflake not being happy with "on false" merge. + when no unique key is provided we'll do a regular insert, other times we'll + use the preferred merge. #} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source_sql }} + ); + {%- else -%} + {# call regular merge when a unique key is present. #} + {{ common_get_merge_sql(target, source_sql, unique_key, dest_columns) }} + {%- endif -%} + {% endmacro %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/table.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/table.sql index 0fd14b1cc76..10b50074190 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/table.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/table.sql @@ -1,68 +1,32 @@ {% materialization table, adapter='snowflake' %} {%- set identifier = model['alias'] -%} - {%- set tmp_identifier = model['name'] + '__dbt_tmp' -%} - {%- set backup_identifier = model['name'] + '__dbt_backup' -%} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%} - {%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier, - schema=schema, - database=database, type='table') -%} - - /* - See ../view/view.sql for more information about this relation. - */ - - -- drop the backup relation if it exists, then make a new one that uses the old relation's type - {%- set backup_relation = adapter.get_relation(database=database, schema=schema, identifier=backup_identifier) -%} - {% if backup_relation is not none -%} - {{ adapter.drop_relation(backup_relation) }} - {%- endif %} - {%- set backup_relation_type = 'table' if old_relation is none else old_relation.type -%} - {%- set backup_relation = api.Relation.create(identifier=backup_identifier, - schema=schema, - database=database, - type=backup_relation_type) -%} - - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} - - -- drop the temp relations if they exists for some reason - {{ adapter.drop_relation(intermediate_relation) }} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {#-- Drop the relation if it was a view to "convert" it in a table. This may lead to + -- downtime, but it should be a relatively infrequent occurrence #} + {% if old_relation is not none and not old_relation.is_table %} + {{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }} + {{ drop_relation_if_exists(old_relation) }} + {% endif %} - -- build model + --build model {% call statement('main') -%} - {{ create_table_as(False, intermediate_relation, sql) }} + {{ create_table_as(false, target_relation, sql) }} {%- endcall %} - -- cleanup - {% if old_relation is not none %} - {% if old_relation.type == 'view' %} - {#-- This is the primary difference between Snowflake and Redshift. Renaming this view - -- would cause an error if the view has become invalid due to upstream schema changes #} - {{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }} - {{ drop_relation_if_exists(old_relation) }} - {% else %} - {{ adapter.rename_relation(target_relation, backup_relation) }} - {% endif %} - {% endif %} - - {{ adapter.rename_relation(intermediate_relation, target_relation) }} - {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here {{ adapter.commit() }} - -- finally, drop the existing/backup relation after the commit - {{ drop_relation_if_exists(backup_relation) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} {% endmaterialization %} diff --git a/test/integration/017_runtime_materialization_tests/test_runtime_materialization.py b/test/integration/017_runtime_materialization_tests/test_runtime_materialization.py index 2236fe82fa0..be70d3c3f87 100644 --- a/test/integration/017_runtime_materialization_tests/test_runtime_materialization.py +++ b/test/integration/017_runtime_materialization_tests/test_runtime_materialization.py @@ -52,16 +52,3 @@ def test_postgres_delete__dbt_tmp_relation(self): self.assertTableDoesNotExist('view__dbt_tmp') self.assertTablesEqual("seed","view") - - - @use_profile('snowflake') - def test_snowflake_backup_different_type(self): - self.run_sql_file( - 'test/integration/017_runtime_materialization_tests/create_backup_and_original.sql' - ) - results = self.run_dbt(['run', '--model', 'materialized']) - self.assertEqual(len(results), 1) - - self.assertTableDoesNotExist('materialized__dbt_tmp') - self.assertTableDoesNotExist('materialized__dbt_backup') - self.assertTablesEqual("seed", "materialized")