diff --git a/core/dbt/include/global_project/macros/materializations/helpers.sql b/core/dbt/include/global_project/macros/materializations/helpers.sql index dc21ac5d3ba..871bfd1c408 100644 --- a/core/dbt/include/global_project/macros/materializations/helpers.sql +++ b/core/dbt/include/global_project/macros/materializations/helpers.sql @@ -48,8 +48,18 @@ {{ make_hook_config(sql, inside_transaction=False) }} {% endmacro %} + {% macro drop_relation_if_exists(relation) %} {% if relation is not none %} {{ adapter.drop_relation(relation) }} {% endif %} {% endmacro %} + + +{% macro load_relation(relation) %} + {% do return(adapter.get_relation( + database=relation.database, + schema=relation.schema, + identifier=relation.identifier + )) -%} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql b/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql new file mode 100644 index 00000000000..aec81e02dbe --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql @@ -0,0 +1,20 @@ + +{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + {%- if unique_key is not none -%} + delete + from {{ target_relation }} + where ({{ unique_key }}) in ( + select ({{ unique_key }}) + from {{ tmp_relation }} + ); + {%- endif %} + + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + ); +{%- endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql index 0c5f7a8d248..4a06ac476ca 100644 --- a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql @@ -1,81 +1,50 @@ -{% macro dbt__incremental_delete(target_relation, tmp_relation) -%} - - {%- set unique_key = config.require('unique_key') -%} - - delete - from {{ target_relation }} - where ({{ unique_key }}) in ( - select ({{ unique_key }}) - from {{ tmp_relation.include(schema=False, database=False) }} - ); - -{%- endmacro %} {% materialization incremental, default -%} - {%- set unique_key = config.get('unique_key') -%} - - {%- set identifier = model['alias'] -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%} - {%- set tmp_relation = make_temp_relation(target_relation) %} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + {% set unique_key = config.get('unique_key') %} + {% set full_refresh_mode = flags.FULL_REFRESH %} - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%} + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} - {%- set should_drop = (full_refresh_mode or exists_not_as_table) -%} - - -- setup - {% if old_relation is none -%} - -- noop - {%- elif should_drop -%} - {{ adapter.drop_relation(old_relation) }} - {%- set old_relation = none -%} - {%- endif %} + {# -- set the type so our rename / drop uses the correct syntax #} + {% set backup_type = existing_relation.type | default("table") %} + {% set backup_relation = make_temp_relation(this, "__dbt_backup").incorporate(type=backup_type) %} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - -- build model - {% if full_refresh_mode or old_relation is none -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall -%} - {%- else -%} - {%- call statement() -%} - - {{ dbt.create_table_as(True, tmp_relation, sql) }} - - {%- endcall -%} - - {{ adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation) }} - - {%- call statement('main') -%} - {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %} - - {% if unique_key is not none -%} - - {{ dbt__incremental_delete(target_relation, tmp_relation) }} - - {%- endif %} - - insert into {{ target_relation }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ tmp_relation }} - ); - {% endcall %} - {%- endif %} + {% set to_drop = [] %} + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% do to_drop.append(backup_relation) %} + {% else %} + {% set tmp_relation = make_temp_relation(target_relation) %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} + {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here - {{ adapter.commit() }} + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do drop_relation(rel) %} + {% endfor %} {{ run_hooks(post_hooks, inside_transaction=False) }} diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 596aff2a66c..b1b1da58b12 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -346,6 +346,36 @@ def execute_model(self, model, materialization, sql_override=None, return res + @available.parse(lambda *a, **k: True) + def is_replaceable(self, relation, conf_partition, conf_cluster): + """ + Check if a given partition and clustering column spec for a table + can replace an existing relation in the database. BigQuery does not + allow tables to be replaced with another table that has a different + partitioning spec. This method returns True if the given config spec is + identical to that of the existing table. + """ + try: + table = self.connections.get_bq_table( + database=relation.database, + schema=relation.schema, + identifier=relation.identifier + ) + except google.cloud.exceptions.NotFound: + return True + + table_partition = table.time_partitioning + if table_partition is not None: + table_partition = table_partition.field + + table_cluster = table.clustering_fields + + if isinstance(conf_cluster, str): + conf_cluster = [conf_cluster] + + return table_partition == conf_partition \ + and table_cluster == conf_cluster + @available.parse_none def alter_table_add_columns(self, relation, columns): diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 60d5d4e509e..719860ea40c 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -37,9 +37,12 @@ {% do opts.update({'expiration_timestamp': 'TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)'}) %} {% endif %} - OPTIONS({% for opt_key, opt_val in opts.items() %} - {{ opt_key }}={{ opt_val }}{{ "," if not loop.last }} - {% endfor %}) + {% set options -%} + OPTIONS({% for opt_key, opt_val in opts.items() %} + {{ opt_key }}={{ opt_val }}{{ "," if not loop.last }} + {% endfor %}) + {%- endset %} + {% do return(options) %} {%- endmacro -%} {% macro bigquery__create_table_as(temporary, relation, sql) -%} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 7c8525fccb8..d2e62e4f8e6 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -2,50 +2,45 @@ {% materialization incremental, adapter='bigquery' -%} {%- set unique_key = config.get('unique_key') -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {%- set identifier = model['alias'] -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set target_relation = this %} + {%- set existing_relation = load_relation(this) %} + {%- set tmp_relation = make_temp_relation(this) %} - {%- set target_relation = api.Relation.create(database=database, identifier=identifier, schema=schema, type='table') -%} + {%- set partition_by = config.get('partition_by', none) -%} + {%- set cluster_by = config.get('cluster_by', none) -%} - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%} - - {%- set should_drop = (full_refresh_mode or exists_not_as_table) -%} - {%- set force_create = (full_refresh_mode) -%} + {{ run_hooks(pre_hooks) }} - -- setup - {% if old_relation is none -%} - -- noop - {%- elif should_drop -%} - {{ adapter.drop_relation(old_relation) }} - {%- set old_relation = none -%} - {%- endif %} + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view %} + {#-- There's no way to atomically replace a view with a table on BQ --#} + {{ adapter.drop_relation(existing_relation) }} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif full_refresh_mode %} + {#-- If the partition/cluster config has changed, then we must drop and recreate --#} + {% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %} + {% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %} + {{ adapter.drop_relation(existing_relation) }} + {% endif %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} - {% set source_sql -%} {#-- wrap sql in parens to make it a subquery --#} - ( - {{ sql }} - ) - {%- endset -%} - - - {{ run_hooks(pre_hooks) }} - - -- build model - {% if force_create or old_relation is none -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall -%} - {%- else -%} - {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {%- call statement('main') -%} - {{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns) }} - {% endcall %} - {%- endif %} + {% set source_sql -%} + ( + {{ sql }} + ) + {%- endset -%} + {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {% endcall %} {{ run_hooks(post_hooks) }} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql index 32c75a2d386..92836ec0557 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql @@ -1,18 +1,5 @@ -{% materialization incremental, adapter='snowflake' -%} - - {%- set unique_key = config.get('unique_key') -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {%- set identifier = model['alias'] -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(database=database, - schema=schema, - identifier=identifier, - type='table') -%} - - {%- set tmp_relation = make_temp_relation(target_relation) %} - +{% macro dbt_snowflake_validate_get_incremental_strategy(config) %} {#-- Find and validate the incremental strategy #} {%- set strategy = config.get("incremental_strategy", default="merge") -%} @@ -24,47 +11,58 @@ {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {% endif %} - -- setup - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} + {% do return(strategy) %} +{% endmacro %} - {# -- If the destination is a view, then we have no choice but to drop it #} - {% if old_relation is not none and old_relation.type == 'view' %} - {{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }} - {{ adapter.drop_relation(old_relation) }} - {% set old_relation = none %} +{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} + {% if strategy == 'merge' %} + {% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %} + {% elif strategy == 'delete+insert' %} + {% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %} + {% else %} + {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} {% endif %} +{% endmacro %} - -- build model - {% if full_refresh_mode or old_relation is none -%} +{% materialization incremental, adapter='snowflake' -%} - {%- call statement('main') -%} - {{ create_table_as(false, target_relation, sql) }} + {%- set unique_key = config.get('unique_key') -%} + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {%- endcall -%} + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} - {%- else -%} + {#-- Validate early so we don't run SQL if the strategy is invalid --#} + {% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%} - {%- call statement() -%} - {{ create_table_as(true, tmp_relation, sql) }} - {%- endcall -%} + -- setup + {{ run_hooks(pre_hooks, inside_transaction=False) }} - {{ adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation) }} + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view %} + {#-- Can't overwrite a view with a table - we must drop --#} + {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif full_refresh_mode %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {%- call statement('main') -%} - {% if strategy == 'merge' %} - {{ get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) }} - {% elif strategy == 'delete+insert' %} - {{ get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) }} - {% else %} - {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} - {% endif %} - {% endcall %} - - {%- endif %} + {% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {%- endcall -%} {{ run_hooks(post_hooks, inside_transaction=True) }}