Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add incremental models #1

Closed
Cabeda opened this issue Sep 8, 2021 · 15 comments · Fixed by #10
Closed

Add incremental models #1

Cabeda opened this issue Sep 8, 2021 · 15 comments · Fixed by #10
Assignees

Comments

@Cabeda
Copy link

Cabeda commented Sep 8, 2021

Opening this task to discuss the best approach to add incremental.

Trino is planning on adding the merge operation trinodb/trino#7708 but I'd say we can start by adding the feature with a delete+insert approach as stated here.

For now, I'd like to focus on adding support to Hive and Iceberg connector.

Any thoughts on this?

@MichelleArk
Copy link
Contributor

The hive.insert-existing-partitions-behavior: OVERWRITE configuration for the Hive connector implementation may be useful. I've only seen it documented in this post but there is an issue to add more documentation in trinodb.

@bachng2017
Copy link
Contributor

bachng2017 commented Sep 20, 2021

we've tried some thing like below (borrowed the code from other driver). Only tested for presto/hive. Do you think this approach will work ?

{% macro get_delete_insert_into_sql(tmp_relation, existing_relation, target_relation, unique_key, sql) %}

    -- 0. cleanup
    drop table if exists {{ tmp_relation }};

    -- 1. create a temp table
    {{ create_table_as(True, tmp_relation, sql) }};

    -- 2. delete old data by unique_key
    {% if unique_key is not none %}
    delete from {{ target_relation }}
    where ({{ unique_key }}) in (
       select ({{ unique_key }})
       from {{ tmp_relation }}
    );
    {% endif %}

    -- 3. insert
    insert into {{ target_relation }}
    select * from {{ tmp_relation }};

    -- 4. clean up the temp table
    drop table if exists {{ tmp_relation }};

{% endmacro %}


{%- materialization incremental, adapter='presto' -%}

  {%- set target_relation = this -%}
  {%- set existing_relation = load_relation(this) -%}
  {%- set tmp_relation = make_temp_relation(this) -%}
  {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
  {%- set unique_key = config.get('unique_key') -%}

  {{ run_hooks(pre_hooks) }}

  {% if existing_relation is none %}
    {% set build_sql = create_table_as(False, target_relation, sql) %}
  {% elif existing_relation.is_view or full_refresh_mode %}
    {% do drop_relation(existing_relation) %}
    {% set build_sql = create_table_as(False, target_relation, sql) %}
  {% else %}
    {% set build_sql = get_delete_insert_into_sql(tmp_relation,existing_relation,target_relation,unique_key,sql) %}
  {% endif %}

  {%- call statement('main') -%}
    {{ build_sql }}
  {%- endcall -%}

  {{ run_hooks(post_hooks) }}

  {{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

And sample model could be like this:

{{
    config(
        partition_by = 'id',
        unique_key = 'id',
        materialized = 'incremental',
        incremental_strategy = 'insert_overwrite'
    )
}}

select * from {{ ref('seed_data') }}

{% if is_incremental() %}

  -- this filter will only be applied on an incremental run
  where id > (select max(id) from {{ this }})

{% endif %}

@findinpath findinpath self-assigned this Sep 20, 2021
@findinpath
Copy link
Collaborator

findinpath commented Sep 20, 2021

Thanks @bachng2017 for the proof of concept for the insert_overwrite incremental strategy.

Use as additional reference:

Provide incremental model functionality with the following strategies:

  • append
  • insert_overwrite

@hovaesco
Copy link
Contributor

Here is the implementation of append incremental strategy in dbt-athena adapter (I didn't test that): https://github.com/Tomme/dbt-athena/blob/master/dbt/include/athena/macros/materializations/incremental.sql

I am looking to implement something similar here.

findinpath added a commit that referenced this issue Sep 21, 2021
The incremental strategies supported are:

- append
- insert_overwrite

The incremental strategy `append` inserts new records into target table, without updating or overwriting.

The incremental strategy `insert_overwrite` performs
first a DELETE statement on the target table to remove
the entries that exist with the same `unique_key` in
the source table. Afterwards, the records from the
source table are inserted into the target table.

Resolves: #1
@findinpath
Copy link
Collaborator

findinpath commented Sep 27, 2021

@bachng2017 I was following your suggestion for hive from #1 (comment) :

delete from "minio"."tiny"."customer_insert_overwrite"
    where (customer_id) in (
        select (customer_id)
        from "minio"."tiny"."customer_insert_overwrite__dbt_tmp"
        )

and the error I get on Trino is:

io.trino.spi.TrinoException: Deletes must match whole partitions for non-transactional tables
	at io.trino.plugin.hive.HiveMetadata.ensureTableSupportsDelete(HiveMetadata.java:2078)

Test project is available here:

https://github.com/findinpath/dbt-trino-incremental-hive

BTW this is the CREATE TABLE statement corresponding to the target model:

create table "minio"."tiny"."customer_insert_overwrite"
    WITH (format = 'ORC',
  partitioned_by = ARRAY['customer_id'])
  as (
    

with customers as (

    select * from "minio"."tiny"."stg_customer"

)

select
       first_name,
       last_name,
       email,
       customer_id
from customers
  )

Can you give me a hint on which kind of Hive table did you use the macro that you've pasted previously ?

@bachng2017
Copy link
Contributor

bachng2017 commented Sep 28, 2021

this works in my case. Hope it could be a hint.

{{
    config(
        partition_by = 'id',
        unique_key = 'id',
        materialized = 'incremental',
        incremental_strategy = 'insert_overwrite'
    )
}}

select * from {{ ref('seed_data') }}

{% if is_incremental() %}

  -- this filter will only be applied on an incremental run
  where id > (select max(id) from {{ this }})

{% endif %}

And one more thing. In our configuration, transactional=true is mandatory. So we enabled it at project level. (But this setting might not be vital)

What is your model ? I think the {%if} block is important.

@findinpath
Copy link
Collaborator

As pointed out here:
dbt-labs/dbt-presto#5 (comment)

i think that in the absence of merge, the option delete+insert can have unexpected results.
I will implement only the append strategy for now and once that MERGE statement trinodb/trino#7708 is available, we can extend the incremental materialization.

@findinpath
Copy link
Collaborator

@bachng2017 here is the project which i've used to test the insert_overwrite functionality for the incremental materialization on dbt-trino.
Unfortunately i didn't get it to work even when using transactional=true. Out of curiosity, do you have any clue what could be the cause of the problem?

@bachng2017
Copy link
Contributor

hello @findinpath
I took a look on your testing project. I might misunderstood something but I don't see any {%if block in your model. Which part that you expect the model is to be increased ?

@bachng2017
Copy link
Contributor

bachng2017 commented Sep 28, 2021

btw, what our code try to do is "simulate" the insert overwrite with the caveat that it is not an atom action. This is the run code

    -- 0. cleanup
    drop table if exists "hive"."lab_dbt_xxx_sample02"."data__dbt_tmp";

    -- 1. create a temp table
    create table "hive"."lab_dbt_xxx_sample02"."data__dbt_tmp"
    WITH (transactional = True)
  as (


select * from "hive"."lab_dbt_xxx_sample02"."seed_data"



  -- this filter will only be applied on an incremental run
  where id > (select max(id) from "hive"."lab_dbt_xxx_sample02"."data")


  );
;

    -- 2. delete old data by unique_key

    delete from "hive"."lab_dbt_xxx_sample02"."data"
    where (id) in (
       select (id)
       from "hive"."lab_dbt_xxx_sample02"."data__dbt_tmp"
    );


    -- 3. insert
    insert into "hive"."lab_dbt_xxx_sample02"."data"
    select * from "hive"."lab_dbt_xxx_sample02"."data__dbt_tmp";

    -- 4. clean up the temp table
    drop table if exists "hive"."lab_dbt_xxx_sample02"."data__dbt_tmp";

@bachng2017
Copy link
Contributor

I think because the lack of specific rows to delete you had that error. In above sample, the deletion happens with specific partitions only

    delete from "hive"."lab_dbt_xxx_sample02"."data"
    where (id) in (
       select (id)
       from "hive"."lab_dbt_xxx_sample02"."data__dbt_tmp"
    );

@bachng2017
Copy link
Contributor

bachng2017 commented Sep 28, 2021

or the error is related to this comment
スクリーンショット 2021-09-28 23 00 27

https://trino.io/docs/current/connector/hive.html

@hovaesco
Copy link
Contributor

the deletion happens with specific partitions only

@bachng2017 do you have any partitions specified in CREATE TABLE, I am wondering what is your setup here?

@bachng2017
Copy link
Contributor

bachng2017 commented Sep 28, 2021

@hovaesco as mentioned above, in our test, we do no have any CREATE table directly. Our model (above code) is created based on a seed (csv file)
this is our test model: #1 (comment)

findinpath added a commit that referenced this issue Sep 30, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
@findinpath
Copy link
Collaborator

@Cabeda / @bachng2017 / @MichelleArk
I've created a PR for supporting append / insert-only incremental strategy.
It is definitely not insert_overwrite / merge as initially requested, but in the absence of transactions/ merge statement in Trino, I'd rather stick to providing a functionality which doesn't leave the system in an inconsistent state in case of outages in the middle of the execution.

Please to review and provide feedback on the PR.

findinpath added a commit that referenced this issue Oct 1, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
findinpath added a commit that referenced this issue Oct 1, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
findinpath added a commit that referenced this issue Oct 1, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
findinpath added a commit that referenced this issue Oct 4, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
findinpath added a commit that referenced this issue Oct 4, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
findinpath added a commit that referenced this issue Oct 4, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
findinpath added a commit that referenced this issue Oct 7, 2021
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: #1
EminUZUN pushed a commit to EminUZUN/dbt-trino that referenced this issue Feb 14, 2023
The incremental strategy supported is to insert new records into target table,
without updating or overwriting.

Resolves: starburstdata#1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants