-
Notifications
You must be signed in to change notification settings - Fork 224
/
incremental.sql
133 lines (101 loc) · 4.74 KB
/
incremental.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}
{% endmacro %}
{% macro dbt_spark_validate_get_file_format() %}
{#-- Find and validate the file format #}
{%- set file_format = config.get("file_format", default="parquet") -%}
{% set invalid_file_format_msg -%}
Invalid file format provided: {{ file_format }}
Expected one of: 'text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'
{%- endset %}
{% if file_format not in ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}
{% do return(file_format) %}
{% endmacro %}
{% macro dbt_spark_validate_get_incremental_strategy(file_format) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
{%- endset %}
{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ strategy }}
You can only choose this strategy when file_format is set to 'delta'
{%- endset %}
{% if strategy not in ['merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if strategy == 'merge' and file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% endif %}
{% do return(strategy) %}
{% endmacro %}
{% macro dbt_spark_validate_merge(file_format) %}
{% set invalid_file_format_msg -%}
You can only choose the 'merge' incremental_strategy when file_format is set to 'delta'
{%- endset %}
{% if file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}
{% endmacro %}
{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}
merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
when matched then update set *
when not matched then insert *
{% endmacro %}
{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %}
{%- if strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- else -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- endif -%}
{% endmacro %}
{% materialization incremental, adapter='spark' -%}
{#-- Validate early so we don't run SQL if the file_format is invalid --#}
{% set file_format = dbt_spark_validate_get_file_format() -%}
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
{% if strategy == 'merge' %}
{%- set unique_key = config.require('unique_key') -%}
{% do dbt_spark_validate_merge(file_format) %}
{% endif %}
{% if config.get('partition_by') %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}
{% call statement() %}
set spark.sql.hive.convertMetastoreParquet = false
{% endcall %}
{{ run_hooks(pre_hooks) }}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}
{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}
{{ run_hooks(post_hooks) }}
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}