Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rationalize incremental materialization #141

Merged
merged 16 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
- store_artifacts:
path: ./logs

integration-spark-databricks-odbc:
integration-spark-databricks-odbc-cluster: &databricks-odbc
environment:
DBT_INVOCATION_ENV: circle
ODBC_DRIVER: Simba # TODO: move env var to Docker image
Expand All @@ -74,7 +74,18 @@ jobs:
- checkout
- run:
name: Run integration tests
command: tox -e integration-spark-databricks-odbc-cluster,integration-spark-databricks-odbc-sql-endpoint
command: tox -e integration-spark-databricks-odbc-cluster
no_output_timeout: 1h
- store_artifacts:
path: ./logs

integration-spark-databricks-odbc-endpoint:
<<: *databricks-odbc
steps:
- checkout
- run:
name: Run integration tests
command: tox -e integration-spark-databricks-odbc-sql-endpoint
no_output_timeout: 1h
- store_artifacts:
path: ./logs
Expand All @@ -89,7 +100,10 @@ workflows:
- unit
- integration-spark-databricks-http:
requires:
- unit
- integration-spark-databricks-odbc:
- integration-spark-thrift
- integration-spark-databricks-odbc-cluster:
requires:
- unit
- integration-spark-thrift
- integration-spark-databricks-odbc-endpoint:
requires:
- integration-spark-thrift
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dbt-integration-tests
test/integration/.user.yml
.DS_Store
.vscode
*.log
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## dbt-spark 0.19.0 (Release TBD)

### Breaking changes
- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141))

### Fixes
- Capture hard-deleted records in snapshot merge, when `invalidate_hard_deletes` config is set ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/143), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/144))

Expand Down
47 changes: 33 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,48 +161,67 @@ The following configurations can be supplied to models run with the dbt-spark pl
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` |
| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` |
| persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` |


**Incremental Models**

To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will overwrite the entire table as an atomic operation, replacing it with new data of the same schema. This is analogous to `truncate` + `insert`.
dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations:
- `append` (default): Insert new records without updating or overwriting any existing data.
- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For dynamic partition replacement with `method: odbc` + Databricks `cluster`, you must you **must** include `set spark.sql.sources.partitionOverwriteMode DYNAMIC` in the [cluster SparkConfig](https://docs.databricks.com/clusters/configure.html#spark-config). For atomic replacement of Delta tables, use the `table` materialization instead.]
- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.]

Examples:

```sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}


-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
```

```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/
-- Every partition returned by this query will overwrite existing partitions

select
date_day,
count(*) as users

from {{ ref('events') }}
where date_day::date >= '2019-01-01'
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.

```
```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
unique_key='event_id',
file_format='delta'
) }}

select *
from {{ ref('events') }}
-- Existing events, matched on `event_id`, will be updated
-- New events will be appended

select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
Expand Down
133 changes: 0 additions & 133 deletions dbt/include/spark/macros/materializations/incremental.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{% materialization incremental, adapter='spark' -%}

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='append') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}

{% set merge_condition %}
{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
{{ merge_condition }}
when matched then update set *
when not matched then insert *
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
{%- endset %}
{%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%}
{%- endif -%}

{% endmacro %}
Loading