Skip to content

Commit

Permalink
Fix snapshot to always drop the staging table (#96)
Browse files Browse the repository at this point in the history
### Description

We introduced Delta constraints at #71 and now that snapshot query could fail when it's violating the constraints.
In that case, the temporary view keeps existing because snapshot uses a permanent view and drop it later.

We should always drop them.
  • Loading branch information
ueshin authored May 20, 2022
1 parent 7701a2e commit e649410
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 9 deletions.
20 changes: 19 additions & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from dataclasses import dataclass
from typing import Optional, List, Dict, Union
from typing import Optional, List, Dict, Tuple, Union

from agate import Table

from dbt.contracts.connection import AdapterResponse
from dbt.adapters.base import AdapterConfig
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.databricks import DatabricksConnectionManager
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.column import DatabricksColumn
Expand Down Expand Up @@ -40,3 +44,17 @@ def check_schema_exists(self, database: Optional[str], schema: str) -> bool:
"""Check if a schema exists."""
results = self.connections.list_schemas(database=database, schema=schema)
return schema.lower() in [row[0].lower() for row in results]

def execute(
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
*,
staging_table: Optional[BaseRelation] = None,
) -> Tuple[AdapterResponse, Table]:
try:
return super().execute(sql=sql, auto_begin=auto_begin, fetch=fetch)
finally:
if staging_table is not None:
self.drop_relation(staging_table)
16 changes: 8 additions & 8 deletions dbt/include/databricks/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
Expand Down Expand Up @@ -88,11 +92,11 @@
)
%}

{% endif %}
{% call statement_with_staging_table('main', staging_table) %}
{{ final_sql }}
{% endcall %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}
{% endif %}

{% do persist_docs(target_relation, model) %}

Expand All @@ -102,10 +106,6 @@

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
Expand Down
17 changes: 17 additions & 0 deletions dbt/include/databricks/macros/statement.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{# executes a query and explicitly drops the staging table. #}
{% macro statement_with_staging_table(name=None, staging_table=None, fetch_result=False, auto_begin=True) -%}
{%- if execute: -%}
{%- set sql = caller() -%}

{%- if name == 'main' -%}
{{ log('Writing runtime SQL for node "{}"'.format(model['unique_id'])) }}
{{ write(sql) }}
{%- endif -%}

{%- set res, table = adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result, staging_table=staging_table) -%}
{%- if name is not none -%}
{{ store_result(name, response=res, agate_table=table) }}
{%- endif -%}

{%- endif -%}
{%- endmacro %}
12 changes: 12 additions & 0 deletions tests/integration/persist_constraints/test_persist_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def run_and_check_failure(self, model_name: str, err_msg: str):
assert res.status == RunStatus.Error
assert err_msg in res.message

def check_staging_table_cleaned(self):
tmp_tables = self.run_sql(
f"SHOW TABLES IN {self.unique_schema()} LIKE '*__dbt_tmp'", fetch="all"
)
assert len(tmp_tables) == 0


class TestTableConstraints(TestConstraints):
def test_table_constraints(self):
Expand All @@ -53,6 +59,7 @@ def test_table_constraints(self):
self.run_and_check_failure(
model_name, err_msg="violate the new NOT NULL constraint on name"
)
self.check_staging_table_cleaned()

# Check the table is still created with the invalid row.
self.run_dbt(["run", "--select", updated_model_name])
Expand Down Expand Up @@ -90,13 +97,15 @@ def test_incremental_constraints(self):
model_name,
err_msg="CHECK constraint id_greater_than_zero",
)
self.check_staging_table_cleaned()
self.run_sql(f"delete from {schema}.seed where id = 0")

# Insert a row into the seed model with an invalid name.
self.run_sql_file("insert_invalid_name.sql")
self.run_and_check_failure(
model_name, err_msg="NOT NULL constraint violated for column: name"
)
self.check_staging_table_cleaned()
self.run_sql(f"delete from {schema}.seed where id = 3")

# Insert a valid row into the seed model.
Expand Down Expand Up @@ -132,15 +141,18 @@ def test_snapshot(self):
self.run_dbt(["seed"])
self.run_dbt(["snapshot"])
self.check_snapshot_results(num_rows=2)
self.check_staging_table_cleaned()

self.run_sql_file("insert_invalid_name.sql")
results = self.run_dbt(["snapshot"], expect_pass=False)
assert "NOT NULL constraint violated for column: name" in results.results[0].message
self.check_staging_table_cleaned()

self.run_dbt(["seed"])
self.run_sql_file("insert_invalid_id.sql")
results = self.run_dbt(["snapshot"], expect_pass=False)
assert "CHECK constraint id_greater_than_zero" in results.results[0].message
self.check_staging_table_cleaned()

# Check the snapshot table is not updated.
self.check_snapshot_results(num_rows=2)
Expand Down

0 comments on commit e649410

Please sign in to comment.