Skip to content

Commit

Permalink
Add support for on_schema_change
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Oct 14, 2021
1 parent f39169e commit dc0ce42
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ test/integration/.user.yml
.DS_Store
.vscode
*.log
logs/
47 changes: 46 additions & 1 deletion dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@

{% macro spark__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
describe extended {{ relation }}
describe extended {{ relation.include(schema=(schema is not none)) }}
{% endcall %}
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}
Expand Down Expand Up @@ -194,3 +194,48 @@
{% endfor %}
{% endif %}
{% endmacro %}


{% macro spark__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_relation = base_relation.incorporate(path = {
"identifier": tmp_identifier,
"schema": None
}) -%}

{% do return(tmp_relation) %}
{% endmacro %}


{% macro spark__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} alter column {{ column_name }} type {{ new_column_type }};
{% endcall %}
{% endmacro %}


{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% endif %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% if add_columns %} add columns {% endif %}
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{% do run_query(sql) %}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
Expand All @@ -31,6 +33,7 @@
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{{
config(
materialized='incremental',
on_schema_change='append_new_columns'
)
}}

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{
config(materialized='table')
}}

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

with source_data as (

select * from {{ ref('model_a') }}

)

select id
,cast(field1 as {{string_type}}) as field1
,cast(field2 as {{string_type}}) as field2
,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3
,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4

from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized='incremental',
on_schema_change='fail'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id, field1, field2 FROM source_data

{% else %}

SELECT id, field1, field3 FROm source_data

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized='incremental',
on_schema_change='ignore'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id, field1, field2 FROM source_data LIMIT 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

select id
,field1
,field2

from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{
config(
materialized='incremental',
on_schema_change='sync_all_columns'

)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3, -- to validate new fields
cast(field4 as {{string_type}}) AS field4 -- to validate new fields

FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

select id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2

from source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

select id
,cast(field1 as {{string_type}}) as field1
--,field2
,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3
,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4

from source_data
order by id
22 changes: 22 additions & 0 deletions test/custom/incremental_on_schema_change/models/model_a.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{
config(materialized='table')
}}

with source_data as (

select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4
union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4
union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4
union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4
union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4
union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4

)

select id
,field1
,field2
,field3
,field4

from source_data
Loading

0 comments on commit dc0ce42

Please sign in to comment.