From 0f1693a9d7ebf931dc88970147df72eaa755fea3 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 13 Aug 2019 21:44:54 -0400 Subject: [PATCH 1/4] (#525) drop existing relation at end of full-refresh incremental build --- .../macros/materializations/helpers.sql | 10 ++ .../materializations/incremental/helpers.sql | 20 ++++ .../incremental/incremental.sql | 95 +++++++------------ .../macros/materializations/incremental.sql | 63 +++++------- .../macros/materializations/incremental.sql | 90 +++++++++--------- 5 files changed, 131 insertions(+), 147 deletions(-) create mode 100644 core/dbt/include/global_project/macros/materializations/incremental/helpers.sql diff --git a/core/dbt/include/global_project/macros/materializations/helpers.sql b/core/dbt/include/global_project/macros/materializations/helpers.sql index dc21ac5d3ba..871bfd1c408 100644 --- a/core/dbt/include/global_project/macros/materializations/helpers.sql +++ b/core/dbt/include/global_project/macros/materializations/helpers.sql @@ -48,8 +48,18 @@ {{ make_hook_config(sql, inside_transaction=False) }} {% endmacro %} + {% macro drop_relation_if_exists(relation) %} {% if relation is not none %} {{ adapter.drop_relation(relation) }} {% endif %} {% endmacro %} + + +{% macro load_relation(relation) %} + {% do return(adapter.get_relation( + database=relation.database, + schema=relation.schema, + identifier=relation.identifier + )) -%} +{% endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql b/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql new file mode 100644 index 00000000000..db693646c04 --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql @@ -0,0 +1,20 @@ + +{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + {%- if unique_key is not none -%} + delete + from {{ target_relation }} + where ({{ unique_key }}) in ( + select ({{ unique_key }}) + from {{ tmp_relation.include(schema=False, database=False) }} + ); + {%- endif %} + + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + ); +{%- endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql index 0c5f7a8d248..4a06ac476ca 100644 --- a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql @@ -1,81 +1,50 @@ -{% macro dbt__incremental_delete(target_relation, tmp_relation) -%} - - {%- set unique_key = config.require('unique_key') -%} - - delete - from {{ target_relation }} - where ({{ unique_key }}) in ( - select ({{ unique_key }}) - from {{ tmp_relation.include(schema=False, database=False) }} - ); - -{%- endmacro %} {% materialization incremental, default -%} - {%- set unique_key = config.get('unique_key') -%} - - {%- set identifier = model['alias'] -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%} - {%- set tmp_relation = make_temp_relation(target_relation) %} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + {% set unique_key = config.get('unique_key') %} + {% set full_refresh_mode = flags.FULL_REFRESH %} - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%} + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} - {%- set should_drop = (full_refresh_mode or exists_not_as_table) -%} - - -- setup - {% if old_relation is none -%} - -- noop - {%- elif should_drop -%} - {{ adapter.drop_relation(old_relation) }} - {%- set old_relation = none -%} - {%- endif %} + {# -- set the type so our rename / drop uses the correct syntax #} + {% set backup_type = existing_relation.type | default("table") %} + {% set backup_relation = make_temp_relation(this, "__dbt_backup").incorporate(type=backup_type) %} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - -- build model - {% if full_refresh_mode or old_relation is none -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall -%} - {%- else -%} - {%- call statement() -%} - - {{ dbt.create_table_as(True, tmp_relation, sql) }} - - {%- endcall -%} - - {{ adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation) }} - - {%- call statement('main') -%} - {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %} - - {% if unique_key is not none -%} - - {{ dbt__incremental_delete(target_relation, tmp_relation) }} - - {%- endif %} - - insert into {{ target_relation }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ tmp_relation }} - ); - {% endcall %} - {%- endif %} + {% set to_drop = [] %} + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% do to_drop.append(backup_relation) %} + {% else %} + {% set tmp_relation = make_temp_relation(target_relation) %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} + {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} {{ run_hooks(post_hooks, inside_transaction=True) }} -- `COMMIT` happens here - {{ adapter.commit() }} + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do drop_relation(rel) %} + {% endfor %} {{ run_hooks(post_hooks, inside_transaction=False) }} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 7c8525fccb8..9899eac8cc8 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -2,50 +2,37 @@ {% materialization incremental, adapter='bigquery' -%} {%- set unique_key = config.get('unique_key') -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {%- set identifier = model['alias'] -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - - {%- set target_relation = api.Relation.create(database=database, identifier=identifier, schema=schema, type='table') -%} + {%- set target_relation = this %} + {%- set existing_relation = load_relation(this) %} + {%- set tmp_relation = make_temp_relation(this) %} - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%} - - {%- set should_drop = (full_refresh_mode or exists_not_as_table) -%} - {%- set force_create = (full_refresh_mode) -%} + {{ run_hooks(pre_hooks) }} - -- setup - {% if old_relation is none -%} - -- noop - {%- elif should_drop -%} - {{ adapter.drop_relation(old_relation) }} - {%- set old_relation = none -%} - {%- endif %} + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view %} + {#-- There's no way to atomically replace a view with a table on BQ --#} + {{ adapter.drop_relation(existing_relation) }} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif full_refresh_mode %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} - {% set source_sql -%} {#-- wrap sql in parens to make it a subquery --#} - ( - {{ sql }} - ) - {%- endset -%} - - - {{ run_hooks(pre_hooks) }} - - -- build model - {% if force_create or old_relation is none -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall -%} - {%- else -%} - {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {%- call statement('main') -%} - {{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns) }} - {% endcall %} - {%- endif %} + {% set source_sql -%} + ( + {{ sql }} + ) + {%- endset -%} + {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {% endcall %} {{ run_hooks(post_hooks) }} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql index 32c75a2d386..92836ec0557 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/materializations/incremental.sql @@ -1,18 +1,5 @@ -{% materialization incremental, adapter='snowflake' -%} - - {%- set unique_key = config.get('unique_key') -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {%- set identifier = model['alias'] -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(database=database, - schema=schema, - identifier=identifier, - type='table') -%} - - {%- set tmp_relation = make_temp_relation(target_relation) %} - +{% macro dbt_snowflake_validate_get_incremental_strategy(config) %} {#-- Find and validate the incremental strategy #} {%- set strategy = config.get("incremental_strategy", default="merge") -%} @@ -24,47 +11,58 @@ {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {% endif %} - -- setup - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} + {% do return(strategy) %} +{% endmacro %} - {# -- If the destination is a view, then we have no choice but to drop it #} - {% if old_relation is not none and old_relation.type == 'view' %} - {{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }} - {{ adapter.drop_relation(old_relation) }} - {% set old_relation = none %} +{% macro dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} + {% if strategy == 'merge' %} + {% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %} + {% elif strategy == 'delete+insert' %} + {% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %} + {% else %} + {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} {% endif %} +{% endmacro %} - -- build model - {% if full_refresh_mode or old_relation is none -%} +{% materialization incremental, adapter='snowflake' -%} - {%- call statement('main') -%} - {{ create_table_as(false, target_relation, sql) }} + {%- set unique_key = config.get('unique_key') -%} + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {%- endcall -%} + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} - {%- else -%} + {#-- Validate early so we don't run SQL if the strategy is invalid --#} + {% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%} - {%- call statement() -%} - {{ create_table_as(true, tmp_relation, sql) }} - {%- endcall -%} + -- setup + {{ run_hooks(pre_hooks, inside_transaction=False) }} - {{ adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation) }} + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view %} + {#-- Can't overwrite a view with a table - we must drop --#} + {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif full_refresh_mode %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {%- call statement('main') -%} - {% if strategy == 'merge' %} - {{ get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) }} - {% elif strategy == 'delete+insert' %} - {{ get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) }} - {% else %} - {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} - {% endif %} - {% endcall %} - - {%- endif %} + {% set build_sql = dbt_snowflake_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {%- endcall -%} {{ run_hooks(post_hooks, inside_transaction=True) }} From 95a0587499daf897517e45d7887a2a6a690295e7 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 14 Oct 2019 22:48:52 -0400 Subject: [PATCH 2/4] handle changing partition/cluster configs on BQ --- .../bigquery/dbt/adapters/bigquery/impl.py | 42 +++++++++++++++++++ .../dbt/include/bigquery/macros/adapters.sql | 9 ++-- .../macros/materializations/incremental.sql | 8 ++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 596aff2a66c..d9ce05f8cac 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -346,6 +346,48 @@ def execute_model(self, model, materialization, sql_override=None, return res + def _get_table(self, relation): + logger.debug('Fetching metadata for relation {}'.format(relation)) + conn = self.connections.get_thread_connection() + client = conn.handle + table_ref = self.connections.table_ref( + relation.database, + relation.schema, + relation.identifier, + conn + ) + + # Handle 404 + try: + return client.get_table(table_ref) + except (google.cloud.exceptions.NotFound) as e: + return None + + @available.parse_none + def is_replaceable(self, relation, conf_partition, conf_cluster): + """ + Check if a given partition and clustering column spec for a table + can replace an existing relation in the database. BigQuery does not + allow tables to be replaced with another table that has a different + partitioning spec. This method returns True if the given config spec is + identical to that of the existing table. + """ + table = self._get_table(relation) + if not table: + return True + + table_partition = table.time_partitioning + if table_partition is not None: + table_partition = table_partition.field + + table_cluster = table.clustering_fields + + if isinstance(conf_cluster, str): + conf_cluster = [conf_cluster] + + return table_partition == conf_partition \ + and table_cluster == conf_cluster + @available.parse_none def alter_table_add_columns(self, relation, columns): diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 60d5d4e509e..719860ea40c 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -37,9 +37,12 @@ {% do opts.update({'expiration_timestamp': 'TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)'}) %} {% endif %} - OPTIONS({% for opt_key, opt_val in opts.items() %} - {{ opt_key }}={{ opt_val }}{{ "," if not loop.last }} - {% endfor %}) + {% set options -%} + OPTIONS({% for opt_key, opt_val in opts.items() %} + {{ opt_key }}={{ opt_val }}{{ "," if not loop.last }} + {% endfor %}) + {%- endset %} + {% do return(options) %} {%- endmacro -%} {% macro bigquery__create_table_as(temporary, relation, sql) -%} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 9899eac8cc8..d2e62e4f8e6 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -8,6 +8,9 @@ {%- set existing_relation = load_relation(this) %} {%- set tmp_relation = make_temp_relation(this) %} + {%- set partition_by = config.get('partition_by', none) -%} + {%- set cluster_by = config.get('cluster_by', none) -%} + {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} @@ -17,6 +20,11 @@ {{ adapter.drop_relation(existing_relation) }} {% set build_sql = create_table_as(False, target_relation, sql) %} {% elif full_refresh_mode %} + {#-- If the partition/cluster config has changed, then we must drop and recreate --#} + {% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %} + {% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %} + {{ adapter.drop_relation(existing_relation) }} + {% endif %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% else %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} From 43b8293a07c941cc89f04b0544644207c0d46838 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 14 Oct 2019 23:13:09 -0400 Subject: [PATCH 3/4] pep8; code cleanup --- .../bigquery/dbt/adapters/bigquery/impl.py | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index d9ce05f8cac..0cb5f2700de 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -346,23 +346,6 @@ def execute_model(self, model, materialization, sql_override=None, return res - def _get_table(self, relation): - logger.debug('Fetching metadata for relation {}'.format(relation)) - conn = self.connections.get_thread_connection() - client = conn.handle - table_ref = self.connections.table_ref( - relation.database, - relation.schema, - relation.identifier, - conn - ) - - # Handle 404 - try: - return client.get_table(table_ref) - except (google.cloud.exceptions.NotFound) as e: - return None - @available.parse_none def is_replaceable(self, relation, conf_partition, conf_cluster): """ @@ -372,8 +355,13 @@ def is_replaceable(self, relation, conf_partition, conf_cluster): partitioning spec. This method returns True if the given config spec is identical to that of the existing table. """ - table = self._get_table(relation) - if not table: + try: + table = self.connections.get_bq_table( + database=relation.database, + schema=relation.schema, + identifier=relation.identifier + ) + except google.cloud.exceptions.NotFound: return True table_partition = table.time_partitioning From 50f4f8a5b0bcfb7f2840ef90cd27eb6d201696f0 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 15 Oct 2019 11:31:09 -0400 Subject: [PATCH 4/4] pr feedback --- .../macros/materializations/incremental/helpers.sql | 2 +- plugins/bigquery/dbt/adapters/bigquery/impl.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql b/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql index db693646c04..aec81e02dbe 100644 --- a/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql +++ b/core/dbt/include/global_project/macros/materializations/incremental/helpers.sql @@ -8,7 +8,7 @@ from {{ target_relation }} where ({{ unique_key }}) in ( select ({{ unique_key }}) - from {{ tmp_relation.include(schema=False, database=False) }} + from {{ tmp_relation }} ); {%- endif %} diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index 0cb5f2700de..b1b1da58b12 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -346,7 +346,7 @@ def execute_model(self, model, materialization, sql_override=None, return res - @available.parse_none + @available.parse(lambda *a, **k: True) def is_replaceable(self, relation, conf_partition, conf_cluster): """ Check if a given partition and clustering column spec for a table