diff --git a/CHANGELOG.md b/CHANGELOG.md index 0512e4a6..e21a802b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## dbt-databricks 1.1.0 (Release TBD) +### Features +- Add support for [Delta constraints](https://docs.databricks.com/delta/delta-constraints.html) ([#71](https://github.com/databricks/dbt-databricks/pull/71)) + ### Under the hood - Port testing framework changes from [dbt-labs/dbt-spark#299](https://github.com/dbt-labs/dbt-spark/pull/299) and [dbt-labs/dbt-spark#314](https://github.com/dbt-labs/dbt-spark/pull/314) ([#70](https://github.com/databricks/dbt-databricks/pull/70)) diff --git a/README.md b/README.md index 743f6b3d..07781c0a 100644 --- a/README.md +++ b/README.md @@ -51,5 +51,5 @@ your_profile_name: ### Compatibility -The `dbt-databricks` adapter has been tested against `Databricks SQL` and `Databricks runtime releases 7.3 LTS` and later. +The `dbt-databricks` adapter has been tested against `Databricks SQL` and `Databricks runtime releases 9.1 LTS` and later. diff --git a/dbt/include/databricks/macros/adapters.sql b/dbt/include/databricks/macros/adapters.sql index 330d931f..65fb031b 100644 --- a/dbt/include/databricks/macros/adapters.sql +++ b/dbt/include/databricks/macros/adapters.sql @@ -87,3 +87,59 @@ {% endfor %} {% endif %} {% endmacro %} + +{# Persist table-level and column-level constraints. #} +{% macro persist_constraints(relation, model) %} + {{ return(adapter.dispatch('persist_constraints', 'dbt')(relation, model)) }} +{% endmacro %} + +{% macro databricks__persist_constraints(relation, model) %} + {% if config.get('persist_constraints', False) and config.get('file_format', 'delta') == 'delta' %} + {% do alter_table_add_constraints(relation, model.meta.constraints) %} + {% do alter_column_set_constraints(relation, model.columns) %} + {% endif %} +{% endmacro %} + +{% macro alter_table_add_constraints(relation, constraints) %} + {{ return(adapter.dispatch('alter_table_add_constraints', 'dbt')(relation, constraints)) }} +{% endmacro %} + +{% macro databricks__alter_table_add_constraints(relation, constraints) %} + {% if constraints is sequence %} + {% for constraint in constraints %} + {% set name = constraint['name'] %} + {% if not name %} + {{ exceptions.raise_compiler_error('Invalid check constraint name: ' ~ name) }} + {% endif %} + {% set condition = constraint['condition'] %} + {% if not condition %} + {{ exceptions.raise_compiler_error('Invalid check constraint condition: ' ~ condition) }} + {% endif %} + {# Skip if the update is incremental. #} + {% if not is_incremental() %} + {% call statement() %} + alter table {{ relation }} add constraint {{ name }} check ({{ condition }}); + {% endcall %} + {% endif %} + {% endfor %} + {% endif %} +{% endmacro %} + +{% macro alter_column_set_constraints(relation, column_dict) %} + {{ return(adapter.dispatch('alter_column_set_constraints', 'dbt')(relation, column_dict)) }} +{% endmacro %} + +{% macro databricks__alter_column_set_constraints(relation, column_dict) %} + {% for column_name in column_dict %} + {% set constraint = column_dict[column_name]['meta']['constraint'] %} + {% if constraint %} + {% if constraint != 'not_null' %} + {{ exceptions.raise_compiler_error('Invalid constraint for column ' ~ column_name ~ '. Only `not_null` is supported.') }} + {% endif %} + {% set quoted_name = adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name %} + {% call statement() %} + alter table {{ relation }} change column {{ quoted_name }} set not null + {% endcall %} + {% endif %} + {% endfor %} +{% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index 4eb28432..af7ca506 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -43,6 +43,8 @@ {% do persist_docs(target_relation, model) %} + {% do persist_constraints(target_relation, model) %} + {{ run_hooks(post_hooks) }} {{ return({'relations': [target_relation]}) }} diff --git a/dbt/include/databricks/macros/materializations/snapshot.sql b/dbt/include/databricks/macros/materializations/snapshot.sql index 40685e37..8a4cb50c 100644 --- a/dbt/include/databricks/macros/materializations/snapshot.sql +++ b/dbt/include/databricks/macros/materializations/snapshot.sql @@ -96,6 +96,8 @@ {% do persist_docs(target_relation, model) %} + {% do persist_constraints(target_relation, model) %} + {{ run_hooks(post_hooks, inside_transaction=True) }} {{ adapter.commit() }} diff --git a/dbt/include/databricks/macros/materializations/table.sql b/dbt/include/databricks/macros/materializations/table.sql index b6cf839b..8d583452 100644 --- a/dbt/include/databricks/macros/materializations/table.sql +++ b/dbt/include/databricks/macros/materializations/table.sql @@ -24,6 +24,8 @@ {% do persist_docs(target_relation, model) %} + {% do persist_constraints(target_relation, model) %} + {{ run_hooks(post_hooks) }} {{ return({'relations': [target_relation]})}} diff --git a/tests/integration/persist_constraints/insert_invalid_id.sql b/tests/integration/persist_constraints/insert_invalid_id.sql new file mode 100644 index 00000000..24c0ad03 --- /dev/null +++ b/tests/integration/persist_constraints/insert_invalid_id.sql @@ -0,0 +1 @@ +insert into {schema}.seed values (0, 'Cathy', '2022-03-01'); diff --git a/tests/integration/persist_constraints/insert_invalid_name.sql b/tests/integration/persist_constraints/insert_invalid_name.sql new file mode 100644 index 00000000..88c00660 --- /dev/null +++ b/tests/integration/persist_constraints/insert_invalid_name.sql @@ -0,0 +1 @@ +insert into {schema}.seed values (3, null, '2022-03-01'); diff --git a/tests/integration/persist_constraints/models/expected/expected_incremental_model.sql b/tests/integration/persist_constraints/models/expected/expected_incremental_model.sql new file mode 100644 index 00000000..47389f3f --- /dev/null +++ b/tests/integration/persist_constraints/models/expected/expected_incremental_model.sql @@ -0,0 +1,7 @@ +{{ config(materialized='table') }} + +select * from values + (1, 'Alice', '2022-01-01'), + (2, 'Bob', '2022-02-01'), + (3, 'Cathy', '2022-03-01') +t(id, name, date); diff --git a/tests/integration/persist_constraints/models/expected/expected_model.sql b/tests/integration/persist_constraints/models/expected/expected_model.sql new file mode 100644 index 00000000..88482338 --- /dev/null +++ b/tests/integration/persist_constraints/models/expected/expected_model.sql @@ -0,0 +1,6 @@ +{{ config(materialized='table') }} + +select * from values + (1, 'Alice', '2022-01-01'), + (2, 'Bob', '2022-02-01') +t(id, name, date); diff --git a/tests/integration/persist_constraints/models/expected/expected_model_with_invalid_name.sql b/tests/integration/persist_constraints/models/expected/expected_model_with_invalid_name.sql new file mode 100644 index 00000000..048ae5d8 --- /dev/null +++ b/tests/integration/persist_constraints/models/expected/expected_model_with_invalid_name.sql @@ -0,0 +1,7 @@ +{{ config(materialized='table') }} + +select * from values + (1, 'Alice', '2022-01-01'), + (2, 'Bob', '2022-02-01'), + (3, null, '2022-03-01') +t(id, name, date); diff --git a/tests/integration/persist_constraints/models/incremental_model.sql b/tests/integration/persist_constraints/models/incremental_model.sql new file mode 100644 index 00000000..9f8fc2f0 --- /dev/null +++ b/tests/integration/persist_constraints/models/incremental_model.sql @@ -0,0 +1,7 @@ +{{config(materialized='incremental')}} + +select * from {{ ref('seed') }} + +{% if is_incremental() %} + where date > (select max(date) from {{ this }}) +{% endif %} diff --git a/tests/integration/persist_constraints/models/invalid_check_constraint.sql b/tests/integration/persist_constraints/models/invalid_check_constraint.sql new file mode 100644 index 00000000..5f8e9141 --- /dev/null +++ b/tests/integration/persist_constraints/models/invalid_check_constraint.sql @@ -0,0 +1,3 @@ +{{config(materialized='table')}} + +select * from {{ ref('seed') }} diff --git a/tests/integration/persist_constraints/models/invalid_column_constraint.sql b/tests/integration/persist_constraints/models/invalid_column_constraint.sql new file mode 100644 index 00000000..989d5b08 --- /dev/null +++ b/tests/integration/persist_constraints/models/invalid_column_constraint.sql @@ -0,0 +1,4 @@ +{{config(materialized='table')}} + +select * from {{ ref('seed') }} + diff --git a/tests/integration/persist_constraints/models/schema.yml b/tests/integration/persist_constraints/models/schema.yml new file mode 100644 index 00000000..8c22905a --- /dev/null +++ b/tests/integration/persist_constraints/models/schema.yml @@ -0,0 +1,50 @@ +version: 2 + +models: + - name: table_model + meta: + constraints: + - name: id_greater_than_zero + condition: id > 0 + columns: + - name: id + - name: name + meta: + constraint: not_null + - name: date + + - name: incremental_model + meta: + constraints: + - name: id_greater_than_zero + condition: id > 0 + columns: + - name: id + - name: name + meta: + constraint: not_null + - name: date + + - name: invalid_check_constraint + meta: + constraints: + - name: invalid_constraint + condition: + + - name: invalid_column_constraint + columns: + - name: id + meta: + constraint: invalid + + - name: table_model_disable_constraints + meta: + constraints: + - name: id_greater_than_zero + condition: id > 0 + columns: + - name: id + - name: name + meta: + constraint: not_null + - name: date diff --git a/tests/integration/persist_constraints/models/table_model.sql b/tests/integration/persist_constraints/models/table_model.sql new file mode 100644 index 00000000..5f8e9141 --- /dev/null +++ b/tests/integration/persist_constraints/models/table_model.sql @@ -0,0 +1,3 @@ +{{config(materialized='table')}} + +select * from {{ ref('seed') }} diff --git a/tests/integration/persist_constraints/models/table_model_disable_constraints.sql b/tests/integration/persist_constraints/models/table_model_disable_constraints.sql new file mode 100644 index 00000000..c6ca50ce --- /dev/null +++ b/tests/integration/persist_constraints/models/table_model_disable_constraints.sql @@ -0,0 +1,3 @@ +{{config(materialized='table', persist_constraints=False)}} + +select * from {{ ref('seed') }} diff --git a/tests/integration/persist_constraints/seeds/seed.csv b/tests/integration/persist_constraints/seeds/seed.csv new file mode 100644 index 00000000..83303d5a --- /dev/null +++ b/tests/integration/persist_constraints/seeds/seed.csv @@ -0,0 +1,3 @@ +id,name,date +1,Alice,2022-01-01 +2,Bob,2022-02-01 diff --git a/tests/integration/persist_constraints/seeds/seed.yml b/tests/integration/persist_constraints/seeds/seed.yml new file mode 100644 index 00000000..661660da --- /dev/null +++ b/tests/integration/persist_constraints/seeds/seed.yml @@ -0,0 +1,9 @@ +version: 2 + +seeds: + - name: seed + config: + column_types: + id: int + name: string + date: string diff --git a/tests/integration/persist_constraints/snapshots/snapshot.sql b/tests/integration/persist_constraints/snapshots/snapshot.sql new file mode 100644 index 00000000..36af7049 --- /dev/null +++ b/tests/integration/persist_constraints/snapshots/snapshot.sql @@ -0,0 +1,15 @@ +{% snapshot my_snapshot %} + + {{ + config( + check_cols=["name", "date"], + unique_key="id", + strategy="check", + target_schema=schema + ) + + }} + + select * from {{ ref('seed') }} + +{% endsnapshot %} diff --git a/tests/integration/persist_constraints/snapshots/snapshot.yml b/tests/integration/persist_constraints/snapshots/snapshot.yml new file mode 100644 index 00000000..f2b0923c --- /dev/null +++ b/tests/integration/persist_constraints/snapshots/snapshot.yml @@ -0,0 +1,12 @@ +version: 2 + +snapshots: + - name: my_snapshot + meta: + constraints: + - name: id_greater_than_zero + condition: id > 0 + columns: + - name: name + meta: + constraint: not_null diff --git a/tests/integration/persist_constraints/test_persist_constraints.py b/tests/integration/persist_constraints/test_persist_constraints.py new file mode 100644 index 00000000..2cbc51d9 --- /dev/null +++ b/tests/integration/persist_constraints/test_persist_constraints.py @@ -0,0 +1,252 @@ +from tests.integration.base import DBTIntegrationTest, use_profile +from typing import Dict +from dbt.contracts.results import RunResult, RunStatus + + +class TestConstraints(DBTIntegrationTest): + @property + def schema(self): + return "constraints" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + "config-version": 2, + "models": {"test": {"+persist_constraints": True}}, + "snapshots": {"test": {"+persist_constraints": True}}, + } + + def check_constraints(self, model_name: str, expected: Dict[str, str]): + rows = self.run_sql(f"show tblproperties {self.unique_schema()}.{model_name}", fetch="all") + constraints = { + row.key: row.value for row in rows if row.key.startswith("delta.constraints") + } + assert len(constraints) == len(expected) + self.assertDictEqual(constraints, expected) + + def run_and_check_failure(self, model_name: str, err_msg: str): + result = self.run_dbt(["run", "--select", model_name], expect_pass=False) + assert len(result.results) == 1 + res: RunResult = result.results[0] + assert res.status == RunStatus.Error + assert err_msg in res.message + + +class TestTableConstraints(TestConstraints): + def test_table_constraints(self): + self.run_dbt(["seed"]) + model_name = "table_model" + expected_model_name = "expected_model" + updated_model_name = "expected_model_with_invalid_name" + self.run_dbt(["run", "--select", model_name]) + self.run_dbt(["run", "--select", expected_model_name]) + self.assertTablesEqual(model_name, expected_model_name) + + self.check_constraints(model_name, {"delta.constraints.id_greater_than_zero": "id > 0"}) + + # Insert a row into the seed model that violates the NOT NULL constraint on name. + self.run_sql_file("insert_invalid_name.sql") + self.run_and_check_failure( + model_name, err_msg="violate the new NOT NULL constraint on name" + ) + + # Check the table is still created with the invalid row. + self.run_dbt(["run", "--select", updated_model_name]) + self.assertTablesEqual(model_name, updated_model_name) + + @use_profile("databricks_cluster") + def test_databricks_cluster(self): + self.test_table_constraints() + + @use_profile("databricks_uc_cluster") + def test_databricks_uc_cluster(self): + self.test_table_constraints() + + @use_profile("databricks_sql_endpoint") + def test_databricks_sql_endpoint(self): + self.test_table_constraints() + + @use_profile("databricks_uc_sql_endpoint") + def test_databricks_uc_sql_endpoint(self): + self.test_table_constraints() + + +class TestIncrementalConstraints(TestConstraints): + def test_incremental_constraints(self): + self.run_dbt(["seed"]) + model_name = "incremental_model" + self.run_dbt(["run", "--select", model_name, "--full-refresh"]) + self.check_constraints(model_name, {"delta.constraints.id_greater_than_zero": "id > 0"}) + + schema = self.unique_schema() + + # Insert a row into the seed model with an invalid id. + self.run_sql_file("insert_invalid_id.sql") + self.run_and_check_failure( + model_name, + err_msg="CHECK constraint id_greater_than_zero (id > 0) violated", + ) + self.run_sql(f"delete from {schema}.seed where id = 0") + + # Insert a row into the seed model with an invalid name. + self.run_sql_file("insert_invalid_name.sql") + self.run_and_check_failure( + model_name, err_msg="NOT NULL constraint violated for column: name" + ) + self.run_sql(f"delete from {schema}.seed where id = 3") + + # Insert a valid row into the seed model. + self.run_sql(f"insert into {schema}.seed values (3, 'Cathy', '2022-03-01')") + self.run_dbt(["run", "--select", model_name]) + expected_model_name = "expected_incremental_model" + self.run_dbt(["run", "--select", expected_model_name]) + self.assertTablesEqual(model_name, expected_model_name) + + @use_profile("databricks_cluster") + def test_databricks_cluster(self): + self.test_incremental_constraints() + + @use_profile("databricks_uc_cluster") + def test_databricks_uc_cluster(self): + self.test_incremental_constraints() + + @use_profile("databricks_sql_endpoint") + def test_databricks_sql_endpoint(self): + self.test_incremental_constraints() + + @use_profile("databricks_uc_sql_endpoint") + def test_databricks_uc_sql_endpoint(self): + self.test_incremental_constraints() + + +class TestSnapshotConstraints(TestConstraints): + def check_snapshot_results(self, num_rows: int): + results = self.run_sql(f"select * from {self.unique_schema()}.my_snapshot", fetch="all") + self.assertEqual(len(results), num_rows) + + def test_snapshot(self): + self.run_dbt(["seed"]) + self.run_dbt(["snapshot"]) + self.check_snapshot_results(num_rows=2) + + self.run_sql_file("insert_invalid_name.sql") + results = self.run_dbt(["snapshot"], expect_pass=False) + assert "NOT NULL constraint violated for column: name" in results.results[0].message + + self.run_dbt(["seed"]) + self.run_sql_file("insert_invalid_id.sql") + results = self.run_dbt(["snapshot"], expect_pass=False) + assert ( + "CHECK constraint id_greater_than_zero (id > 0) violated by row with values" + in results.results[0].message + ) + + # Check the snapshot table is not updated. + self.check_snapshot_results(num_rows=2) + + @use_profile("databricks_cluster") + def test_databricks_cluster(self): + self.test_snapshot() + + @use_profile("databricks_uc_cluster") + def test_databricks_uc_cluster(self): + self.test_snapshot() + + @use_profile("databricks_sql_endpoint") + def test_databricks_sql_endpoint(self): + self.test_snapshot() + + @use_profile("databricks_uc_sql_endpoint") + def test_databricks_uc_sql_endpoint(self): + self.test_snapshot() + + +class TestInvalidCheckConstraints(TestConstraints): + def test_invalid_check_constraints(self): + model_name = "invalid_check_constraint" + self.run_dbt(["seed"]) + self.run_and_check_failure(model_name, err_msg="Invalid check constraint condition") + + @use_profile("databricks_cluster") + def test_databricks_cluster(self): + self.test_invalid_check_constraints() + + @use_profile("databricks_uc_cluster") + def test_databricks_uc_cluster(self): + self.test_invalid_check_constraints() + + @use_profile("databricks_sql_endpoint") + def test_databricks_sql_endpoint(self): + self.test_invalid_check_constraints() + + @use_profile("databricks_uc_sql_endpoint") + def test_databricks_uc_sql_endpoint(self): + self.test_invalid_check_constraints() + + +class TestInvalidColumnConstraints(TestConstraints): + def test_invalid_column_constraints(self): + model_name = "invalid_column_constraint" + self.run_dbt(["seed"]) + self.run_and_check_failure( + model_name, + err_msg="Invalid constraint for column id. Only `not_null` is supported.", + ) + + @use_profile("databricks_cluster") + def test_databricks_cluster(self): + self.test_invalid_column_constraints() + + @use_profile("databricks_uc_cluster") + def test_databricks_uc_cluster(self): + self.test_invalid_column_constraints() + + @use_profile("databricks_sql_endpoint") + def test_databricks_sql_endpoint(self): + self.test_invalid_column_constraints() + + @use_profile("databricks_uc_sql_endpoint") + def test_databricks_uc_sql_endpoint(self): + self.test_invalid_column_constraints() + + +class TestTableWithConstraintsDisabled(TestConstraints): + def test_delta_constraints_disabled(self): + self.run_dbt(["seed"]) + model_name = "table_model_disable_constraints" + expected_model_name = "expected_model" + updated_model_name = "expected_model_with_invalid_name" + self.run_dbt(["run", "--select", model_name]) + self.run_dbt(["run", "--select", expected_model_name]) + self.assertTablesEqual(model_name, expected_model_name) + + # No check constraint should be added. + self.check_constraints(model_name, {}) + + # Insert a row into the seed model with the name being null. + self.run_sql(f"insert into {self.unique_schema()}.seed values (3, null, '2022-03-01')") + + # Check the table can be created without failure. + self.run_dbt(["run", "--select", model_name]) + self.run_dbt(["run", "--select", updated_model_name]) + self.assertTablesEqual(model_name, updated_model_name) + + @use_profile("databricks_cluster") + def test_databricks_cluster(self): + self.test_delta_constraints_disabled() + + @use_profile("databricks_uc_cluster") + def test_databricks_uc_cluster(self): + self.test_delta_constraints_disabled() + + @use_profile("databricks_sql_endpoint") + def test_databricks_sql_endpoint(self): + self.test_delta_constraints_disabled() + + @use_profile("databricks_uc_sql_endpoint") + def test_databricks_uc_sql_endpoint(self): + self.test_delta_constraints_disabled()