Skip to content

Commit

Permalink
dbt Constraints / model contracts (#574)
Browse files Browse the repository at this point in the history
* Add support for constraints in Spark

* Add tests for constraints

* Update requirements for CI to pass

* Update dispatched macro with argument

* Use spark decorator for tests

* Update test to remove unsupported constraints

* Allow multiple queries to be sent

* Revert change on splitting satements in `execute`

* Add `call statement` for table with constraints

* Add checks when the split by `;` is empty

* Fix typo in JInja variable name

* Rename `constraints` to `constraints_check`

* Support constraints with `alter` statements

* Changie entry

* Fix missing `endif`

* Remove get_columns_spec_ddl as we use alter

* Remove unused dispatch macro

* Update dispatched macro

* Update tests to work with `alter` approach

* Make tests valid for databricks only for delta

* Try other way to call tests

* Add schema info

* Remove wrong argument to test

* Use new testing framework

* Add check on column names and order

* Check only when constraints enabled

* Remove config nesting

* constraint_check is not a list

* Fix CICD

* Typo

* Only allow not null

* Update expected SQL to the Spark one

* Make file_format delta

* Try this

* Check for earlier part of error message

* Check for any rather than all error messages

* Reset to dbt-core main

---------

Co-authored-by: Sung Won Chung <[email protected]>
Co-authored-by: Jeremy Cohen <[email protected]>
Co-authored-by: Michelle Ark <[email protected]>
  • Loading branch information
4 people authored Feb 17, 2023
1 parent 6f7307b commit b3f6558
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20230130-125855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: 'Support for data types constraints in Spark following the dbt Core feature
#6271'
time: 2023-01-30T12:58:55.972992+01:00
custom:
Author: b-per
Issue: "558"
PR: "574"
52 changes: 52 additions & 0 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@
{% else %}
create table {{ relation }}
{% endif %}
{% if config.get('constraints_enabled', False) %}
{{ get_assert_columns_equivalent(sql) }}
{% endif %}
{{ file_format_clause() }}
{{ options_clause() }}
{{ partition_cols(label="partitioned by") }}
Expand All @@ -160,6 +163,55 @@
{%- endmacro -%}


{% macro persist_constraints(relation, model) %}
{{ return(adapter.dispatch('persist_constraints', 'dbt')(relation, model)) }}
{% endmacro %}

{% macro spark__persist_constraints(relation, model) %}
{% if config.get('constraints_enabled', False) and config.get('file_format', 'delta') == 'delta' %}
{% do alter_table_add_constraints(relation, model.columns) %}
{% do alter_column_set_constraints(relation, model.columns) %}
{% endif %}
{% endmacro %}

{% macro alter_table_add_constraints(relation, constraints) %}
{{ return(adapter.dispatch('alter_table_add_constraints', 'dbt')(relation, constraints)) }}
{% endmacro %}

{% macro spark__alter_table_add_constraints(relation, column_dict) %}

{% for column_name in column_dict %}
{% set constraints_check = column_dict[column_name]['constraints_check'] %}
{% if constraints_check and not is_incremental() %}
{%- set constraint_hash = local_md5(column_name ~ ";" ~ constraint_check) -%}
{% call statement() %}
alter table {{ relation }} add constraint {{ constraint_hash }} check {{ constraints_check }};
{% endcall %}
{% endif %}
{% endfor %}
{% endmacro %}

{% macro alter_column_set_constraints(relation, column_dict) %}
{{ return(adapter.dispatch('alter_column_set_constraints', 'dbt')(relation, column_dict)) }}
{% endmacro %}

{% macro spark__alter_column_set_constraints(relation, column_dict) %}
{% for column_name in column_dict %}
{% set constraints = column_dict[column_name]['constraints'] %}
{% for constraint in constraints %}
{% if constraint != 'not null' %}
{{ exceptions.warn('Invalid constraint for column ' ~ column_name ~ '. Only `not null` is supported.') }}
{% else %}
{% set quoted_name = adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name %}
{% call statement() %}
alter table {{ relation }} change column {{ quoted_name }} set {{ constraint }};
{% endcall %}
{% endif %}
{% endfor %}
{% endfor %}
{% endmacro %}


{% macro spark__create_view_as(relation, sql) -%}
create or replace view {{ relation }}
{{ comment_clause() }}
Expand Down
2 changes: 2 additions & 0 deletions dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

{% do persist_docs(target_relation, model) %}

{% do persist_constraints(target_relation, model) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]})}}
Expand Down
58 changes: 58 additions & 0 deletions tests/functional/adapter/test_constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pytest
from dbt.tests.util import relation_from_name
from dbt.tests.adapter.constraints.test_constraints import (
BaseConstraintsColumnsEqual,
BaseConstraintsRuntimeEnforcement
)

# constraints are enforced via 'alter' statements that run after table creation
_expected_sql_spark = """
create or replace table {0}
using delta
as
select
1 as id,
'blue' as color,
cast('2019-01-01' as date) as date_day
"""

@pytest.mark.skip_profile('spark_session', 'apache_spark')
class TestSparkConstraintsColumnsEqual(BaseConstraintsColumnsEqual):
pass

@pytest.mark.skip_profile('spark_session', 'apache_spark')
class TestSparkConstraintsRuntimeEnforcement(BaseConstraintsRuntimeEnforcement):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+file_format": "delta",
}
}

@pytest.fixture(scope="class")
def expected_sql(self, project):
relation = relation_from_name(project.adapter, "my_model")
return _expected_sql_spark.format(relation)

# On Spark/Databricks, constraints are applied *after* the table is replaced.
# We don't have any way to "rollback" the table to its previous happy state.
# So the 'color' column will be updated to 'red', instead of 'blue'.
@pytest.fixture(scope="class")
def expected_color(self):
return "red"

@pytest.fixture(scope="class")
def expected_error_messages(self):
return [
"violate the new CHECK constraint",
"DELTA_NEW_CHECK_CONSTRAINT_VIOLATION",
"violate the new NOT NULL constraint",
]

def assert_expected_error_messages(self, error_message, expected_error_messages):
# This needs to be ANY instead of ALL
# The CHECK constraint is added before the NOT NULL constraint
# and different connection types display/truncate the error message in different ways...
assert any(msg in error_message for msg in expected_error_messages)

0 comments on commit b3f6558

Please sign in to comment.