Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore ability to utilize updated_at for check_cols snapshots #5077

Merged
merged 5 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20220415-112927.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Restore ability to utilize `updated_at` for check_cols snapshots
time: 2022-04-15T11:29:27.063462-06:00
custom:
Author: dbeatty10
Issue: "5076"
PR: "5077"
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}
{% set updated_at = snapshot_get_time() %}
{% set updated_at = config.get('updated_at', snapshot_get_time()) %}

{% set column_added = false %}

Expand Down
11 changes: 7 additions & 4 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,15 @@ def check_relation_types(adapter, relation_to_type):
# by doing a separate call for each set of tables/relations.
# Wraps check_relations_equal_with_relations by creating relations
# from the list of names passed in.
def check_relations_equal(adapter, relation_names):
def check_relations_equal(adapter, relation_names, compare_snapshot_cols=False):
if len(relation_names) < 2:
raise TestProcessingException(
"Not enough relations to compare",
)
relations = [relation_from_name(adapter, name) for name in relation_names]
return check_relations_equal_with_relations(adapter, relations)
return check_relations_equal_with_relations(
adapter, relations, compare_snapshot_cols=compare_snapshot_cols
)


# This can be used when checking relations in different schemas, by supplying
Expand All @@ -288,16 +290,17 @@ def check_relations_equal(adapter, relation_names):
# adapter.get_columns_in_relation
# adapter.get_rows_different_sql
# adapter.execute
def check_relations_equal_with_relations(adapter, relations):
def check_relations_equal_with_relations(adapter, relations, compare_snapshot_cols=False):

with get_connection(adapter):
basis, compares = relations[0], relations[1:]
# Skip columns starting with "dbt_" because we don't want to
# compare those, since they are time sensitive
# (unless comparing "dbt_" snapshot columns is explicitly enabled)
column_names = [
c.name
for c in adapter.get_columns_in_relation(basis)
if not c.name.lower().startswith("dbt_")
if not c.name.lower().startswith("dbt_") or compare_snapshot_cols
]

for relation in compares:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pytest
from dbt.tests.util import run_dbt, check_relations_equal

snapshot_sql = """
{% snapshot snapshot_check_cols_updated_at_actual %}
{{
config(
target_database=database,
target_schema=schema,
unique_key='id',
strategy='check',
check_cols='all',
updated_at="'" ~ var("updated_at") ~ "'::timestamp",
)
}}

{% if var('version') == 1 %}

select 'a' as id, 10 as counter, '2016-01-01T00:00:00Z'::timestamp as timestamp_col union all
select 'b' as id, 20 as counter, '2016-01-01T00:00:00Z'::timestamp as timestamp_col

{% elif var('version') == 2 %}

select 'a' as id, 30 as counter, '2016-01-02T00:00:00Z'::timestamp as timestamp_col union all
select 'b' as id, 20 as counter, '2016-01-01T00:00:00Z'::timestamp as timestamp_col union all
select 'c' as id, 40 as counter, '2016-01-02T00:00:00Z'::timestamp as timestamp_col

{% else %}

select 'a' as id, 30 as counter, '2016-01-02T00:00:00Z'::timestamp as timestamp_col union all
select 'c' as id, 40 as counter, '2016-01-02T00:00:00Z'::timestamp as timestamp_col

{% endif %}

{% endsnapshot %}
"""

expected_csv = """
id,counter,timestamp_col,dbt_scd_id,dbt_updated_at,dbt_valid_from,dbt_valid_to
a,10,2016-01-01 00:00:00.000,927354aa091feffd9437ead0bdae7ae1,2016-07-01 00:00:00.000,2016-07-01 00:00:00.000,2016-07-02 00:00:00.000
b,20,2016-01-01 00:00:00.000,40ace4cbf8629f1720ec8a529ed76f8c,2016-07-01 00:00:00.000,2016-07-01 00:00:00.000,
a,30,2016-01-02 00:00:00.000,e9133f2b302c50e36f43e770944cec9b,2016-07-02 00:00:00.000,2016-07-02 00:00:00.000,
c,40,2016-01-02 00:00:00.000,09d33d35101e788c152f65d0530b6837,2016-07-02 00:00:00.000,2016-07-02 00:00:00.000,
""".lstrip()


@pytest.fixture(scope="class")
def snapshots():
return {"snapshot_check_cols_updated_at_actual.sql": snapshot_sql}


@pytest.fixture(scope="class")
def seeds():
return {"snapshot_check_cols_updated_at_expected.csv": expected_csv}


@pytest.fixture(scope="class")
def project_config_update():
return {
"seeds": {
"quote_columns": False,
"test": {
"snapshot_check_cols_updated_at_expected": {
"+column_types": {
"timestamp_col": "timestamp without time zone",
"dbt_updated_at": "timestamp without time zone",
"dbt_valid_from": "timestamp without time zone",
"dbt_valid_to": "timestamp without time zone",
},
},
},
},
}


def test_simple_snapshot(project):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure I understand the test case correctly here.
dbt seed would create thesnapshot_check_cols_updated_at_expected table. All data in snapshot_check_cols_updated_at_actual is purely generated by the logic in the snapshot.sql.

And for the command run_dbt(["snapshot", "--vars", "{version: 3, updated_at: 2016-07-03}"]) command there's no data in snapshot table got generated because no data got modified.

This test is also designed to make sure the time for dbt_updated_at actually has the time that we passed in through '--vars'. Which is the main change this PR is doing

This is a great test! I would recommend adding a few short comments to document the intention of each command.

Copy link
Contributor Author

@dbeatty10 dbeatty10 Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this excellent feedback @ChenyuLInx!

You are correct in your understanding of the test case. I've updated the description of the pull request to reflect your feedback. Or were you thinking of adding these as comments within the code itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments to explain the testing approach.

"""
Test that the `dbt_updated_at` column reflects the `updated_at` timestamp expression in the config.

Approach:
1. Create a table that represents the expected data after a series of snapshots
- Use dbt seed to create the expected relation (`snapshot_check_cols_updated_at_expected`)
2. Execute a series of snapshots to create the data
- Use a series of (3) dbt snapshot commands to create the actual relation (`snapshot_check_cols_updated_at_actual`)
- The logic can switch between 3 different versions of the data (depending on the `version` number)
- The `updated_at` value is passed in via `--vars` and cast to a timestamp in the snapshot config
3. Compare the two relations for equality
"""

# 1. Create a table that represents the expected data after a series of snapshots
results = run_dbt(["seed", "--show", "--vars", "{version: 1, updated_at: 2016-07-01}"])
assert len(results) == 1

# 2. Execute a series of snapshots to create the data

# Snapshot day 1
results = run_dbt(["snapshot", "--vars", "{version: 1, updated_at: 2016-07-01}"])
assert len(results) == 1

# Snapshot day 2
results = run_dbt(["snapshot", "--vars", "{version: 2, updated_at: 2016-07-02}"])
assert len(results) == 1

# Snapshot day 3
results = run_dbt(["snapshot", "--vars", "{version: 3, updated_at: 2016-07-03}"])
assert len(results) == 1

# 3. Compare the two relations for equality
check_relations_equal(
project.adapter,
["snapshot_check_cols_updated_at_actual", "snapshot_check_cols_updated_at_expected"],
compare_snapshot_cols=True,
)