Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake create or replace #1409

Merged
merged 34 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9772c1c
Adding incremental logic to Snowflake plugins
Feb 18, 2019
56801f9
Adding changes based on Drew's recommendations
Mar 2, 2019
38254a8
make create or replace snowflake macro
bastienboutonnet Apr 16, 2019
9222c79
implement create or replace in table mater
bastienboutonnet Apr 16, 2019
a35ad18
implement insert when no unique key and full refresh solution
bastienboutonnet Apr 20, 2019
2d5525e
add some logging
bastienboutonnet Apr 20, 2019
6a104c1
test
bastienboutonnet Apr 20, 2019
d168bdd
Merge branch 'snowflake_create_or_replace' of github.com:bastienbouto…
bastienboutonnet Apr 20, 2019
91d869e
revert test
bastienboutonnet Apr 20, 2019
fb26ce5
Merge branch 'snowflake_create_or_replace' of github.com:bastienbouto…
bastienboutonnet Apr 20, 2019
dacce7c
fix insert cols call and temp workaround call of snowflake
bastienboutonnet Apr 21, 2019
54c02ef
some logging and temp call workaround
bastienboutonnet Apr 21, 2019
2830b6a
make default create or replace macro to allow snowflake adapter pick …
bastienboutonnet Apr 22, 2019
95c9f76
remove snowflake__ direct call in incremental
bastienboutonnet Apr 22, 2019
0433369
remove testing logging messages
bastienboutonnet Apr 22, 2019
e83edd3
fixme/todo regarding non destructive flag
bastienboutonnet Apr 22, 2019
9591b86
(PR fdbk) rm extra macro
bastienboutonnet Apr 26, 2019
f99efbf
remove references to temp and backup relations
bastienboutonnet Apr 26, 2019
5c1c588
more explicit comments and quick formatting
bastienboutonnet Apr 26, 2019
43a9db5
quick todo marker
bastienboutonnet Apr 27, 2019
3ab8238
Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_replace
bastienboutonnet Apr 27, 2019
4f62978
Revert "Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_…
bastienboutonnet Apr 27, 2019
08820a2
fixing my jetlagged introduced bugs
bastienboutonnet Apr 27, 2019
0432c1d
conflic resolve
bastienboutonnet Apr 27, 2019
90f8e0b
Revert "Revert "Merge branch 'dev/wilt-chamberlain' into snowflake_cr…
bastienboutonnet Apr 27, 2019
afe236d
cleaning up some commented out stuff
bastienboutonnet Apr 27, 2019
8af7984
remove non-destructive logic
bastienboutonnet Apr 28, 2019
85eac05
cleaner select
bastienboutonnet Apr 28, 2019
3ef519d
todo and comments clean up
bastienboutonnet Apr 28, 2019
7a2279e
move unique key workaround to snowflake macro
bastienboutonnet Apr 28, 2019
3a7dcd9
Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_replace
drewbanin Apr 30, 2019
1f97fe4
Merge branch 'dev/wilt-chamberlain' into snowflake_create_or_replace
drewbanin Apr 30, 2019
8d74550
fix tests
drewbanin May 10, 2019
90abc2d
(closes #1455) Qualify Snowflake temp tables with a database and schema
drewbanin May 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
);
{% endmacro %}


{% macro create_view_as(relation, sql) -%}
{{ adapter_macro('create_view_as', relation, sql) }}
{%- endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

{%- set transient = config.get('transient', default=true) -%}

create {% if temporary -%}
create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@

{% materialization incremental, adapter='snowflake' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set sql_where = config.get('sql_where') -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(database=database, identifier=identifier, schema=schema, type='table') -%}
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
{%- set force_create = full_refresh_mode -%}

-- setup

{% set source_sql -%}
bastienboutonnet marked this conversation as resolved.
Show resolved Hide resolved
-- wrap sql in parens to make it a subquery --
bastienboutonnet marked this conversation as resolved.
Show resolved Hide resolved
(
select * from (
{{ sql }}
)
)
{%- endset -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if force_create or old_relation is none -%}
{%- call statement('main') -%}

{# -- create or replace logic because we're in a full refresh or table is non existant. #}
{% if old_relation is not none and old_relation.type == 'view' %}
bastienboutonnet marked this conversation as resolved.
Show resolved Hide resolved
{# -- I'm preserving one of the old checks here for a view, and to make sure Snowflake doesn't
-- complain that we're running a replace table on a view. #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ adapter.drop_relation(old_relation) }}
{% endif %}
{# -- now create or replace the table because we're in full-refresh #}
{{create_table_as(false, target_relation, source_sql)}}
{%- endcall -%}

{%- else -%}
{# -- here is the incremental part #}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}
{%- call statement('main') -%}

{%- if unique_key is none -%}
{# -- if no unique_key is provided run regular insert as Snowflake may complain #}
insert into {{ target_relation }} ({{ dest_cols_csv }})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really good fix for the on false issue with Snowflake's merge statements. Do you think it makes sense to put this logic here? Or should we move it into the Snowflake implementation of get_merge_sql?

I like the idea of making materializations represent business logic instead of database logic, as they become a lot more generalizable. Curious what you think!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes total sense! I actually was feeling a bit "awkward" about having this logic sit there but didn't think too much about where else it could live and this is very good, so I'm going to go ahead and change this as you suggest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I think this would be the place to implement it. If unique_key is provided, then we can proceed with common_get_merge_sql, otherwise we should return the insert statement you've built here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, its exactly what I just started doing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing, I realised there is no incremental deletes anymore, and the merge statement doesn't call a delete. Would you think we need it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation of incremental models on Snowflake used delete statements to approximate an upsert. Before we did:

create temp table as (select * from model code)
delete from destination table where destination unique key = temp table unique key
insert into destination table (select * from temp table)

So, records were only deleted if they were going to be immediately re-inserted. We'd actually prefer not to call a delete, and instead use the merge to update these rows in-place. This should be handled by the when matched clause in the merge statement.

I do think there's a conversation to be had about performance. I wonder if there's any difference between:

  1. Deleting existing records and reinserting them (with new values)
  2. Updating existing records in place

An example

Destination table

unique_key value
1 abc
2 def

Temp table (generated from model select)

unique_key value
2 ghi
3 xyz

Desired destination table state

unique_key value
1 abc
2 ghi
3 xyz

So, there are two ways to accomplish this desired end-state. We can either (pseudocode):

1. delete + insert

delete from destination table where id = 2
insert into destination table (id, value) values (2, ghi), (3, xyz)

2. update + insert (via merge)

merge into destination table
from temp table
when matched update -- updates row with id = 2
when not matched insert -- adds rows with id = 3

This does raise an interesting question about edge-case behavior with merge. What happens if there are duplicate unique_ids in either 1) the destination table or 2) the staging table?

Previously, it was straightforward to understand how the delete + insert pattern behaved. While having a duplicated unique_key would probably lead to undesirable results, the insert and delete queries would execute successfully.

With the merge implementation, I think users will see an error about non-deterministic results if their unique_key is not unique! All told, I think this will actually be a good thing, as it should help alert users to bugs in their model code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. From what you say here's what I think. Merge is definitely the preferable option and I think unless there's really a good reason for it, you should be getting an error if you're trying to insert dupes. There is probably something fucked up with the source.

Alternatively we could add support for the ERROR_ON_NONDETERMINISTIC_MERGE session parameter (when FALSE it would pick one of the duplicated rows and insert it) but there doesn't seem to be a clear way on how to select the row and I think this is just bad anyway. I don't really see the point of inserting a dupe row. So I agree with your last point in that comment. So I think the current implementation is cool.

(
select {{ dest_cols_csv }}
from {{ source_sql }}
);
{%- else -%}
{# -- use merge if a unique key is provided #}
{{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns) }}
{%- endif -%}
{% endcall %}

{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,44 +1,34 @@
{% materialization table, adapter='snowflake' %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema,
database=database, type='table') -%}

/*
/* --TODO: Is this still up to date?
bastienboutonnet marked this conversation as resolved.
Show resolved Hide resolved
See ../view/view.sql for more information about this relation.
*/

-- drop the backup relation if it exists, then make a new one that uses the old relation's type
{%- set backup_relation = adapter.get_relation(database=database, schema=schema, identifier=backup_identifier) -%}
{% if backup_relation is not none -%}
{{ adapter.drop_relation(backup_relation) }}
{%- endif %}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema,
database=database,
type=(old_relation.type or 'table')) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set create_as_temporary = (exists_as_table and non_destructive_mode) -%}


-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{# {{ adapter.drop_relation(intermediate_relation) }} #}

--- FIXME: Do we want to put this block all together? I think it serves no purpose, but need to check
-- setup: if the target relation already exists, truncate or drop it (if it's a view)
{# TODO: Would like to check this. New materialsiation makes these tests a bit moot. We should
bastienboutonnet marked this conversation as resolved.
Show resolved Hide resolved
be able to deprecate non-destructive flag all together here. #}
{% if non_destructive_mode -%}
bastienboutonnet marked this conversation as resolved.
Show resolved Hide resolved
{% if exists_as_table -%}
{{ adapter.truncate_relation(old_relation) }}
--noop we can do away with this step all together since the table can be replaced in Snowflake.
{# {{ adapter.truncate_relation(old_relation) }} #}
{% elif exists_as_view -%}
--noop. I think we should also be able to do away with this and call a replace.
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}
Expand All @@ -49,52 +39,22 @@
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
--build model
{% call statement('main') -%}
{%- if non_destructive_mode -%}
{%- if old_relation is not none -%}
{{ create_table_as(create_as_temporary, intermediate_relation, sql) }}

{% set dest_columns = adapter.get_columns_in_relation(old_relation) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

insert into {{ target_relation }} ({{ dest_cols_csv }}) (
select {{ dest_cols_csv }}
from {{ intermediate_relation.include(database=(not create_as_temporary), schema=(not create_as_temporary)) }}
);
{%- else -%}
{{ create_table_as(create_as_temporary, target_relation, sql) }}
{%- endif -%}
{%- else -%}
{{ create_table_as(create_as_temporary, intermediate_relation, sql) }}
{%- endif -%}
{%- endcall %}

-- cleanup
{% if non_destructive_mode -%}
-- noop
{%- else -%}
{% if old_relation is not none %}
{% if old_relation.type == 'view' %}
{#-- This is the primary difference between Snowflake and Redshift. Renaming this view
-- would cause an error if the view has become invalid due to upstream schema changes #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% else %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{# Drop the relation if it was a view to essencially "convert" it in a table. This does lead to
downtime but I think it makes sense and should happen. Impact will be minimal I suspect. #}
{% if old_relation is not none and old_relation.type == 'view' %}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}
{{ create_table_as(create_as_temporary, target_relation, sql) }}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}