Skip to content

Commit

Permalink
dbt test --store-failures (#3316)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed May 27, 2021
1 parent 0f018ea commit c0d757a
Show file tree
Hide file tree
Showing 33 changed files with 365 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- Support optional `updated_at` config parameter with `check` strategy snapshots. If not supplied, will use current timestamp (default). ([#1844](https://github.com/fishtown-analytics/dbt/issues/1844), [#3376](https://github.com/fishtown-analytics/dbt/pull/3376))
- Add the opt-in `--use-experimental-parser` flag ([#3307](https://github.com/fishtown-analytics/dbt/issues/3307))
- Store test failures in the database ([#517](https://github.com/fishtown-analytics/dbt/issues/517), [#903](https://github.com/fishtown-analytics/dbt/issues/903), [#2593](https://github.com/fishtown-analytics/dbt/issues/2593), [#3316](https://github.com/fishtown-analytics/dbt/issues/3316))

### Fixes
- Fix compiled sql for ephemeral models ([#3317](https://github.com/fishtown-analytics/dbt/issues/3317), [#3318](https://github.com/fishtown-analytics/dbt/pull/3318))
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ def add_ephemeral_prefix(self, name: str):

def _get_relation_name(self, node: ParsedNode):
relation_name = None
if (node.resource_type in NodeType.refable() and
not node.is_ephemeral_model):
if node.is_relational and not node.is_ephemeral_model:
adapter = get_adapter(self.config)
relation_cls = adapter.Relation
relation_name = str(relation_cls.create_from(self.config, node))
Expand Down
12 changes: 8 additions & 4 deletions core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ class CompiledSchemaTestNode(CompiledNode, HasTestMetadata):
config: TestConfig = field(default_factory=TestConfig)

def same_config(self, other) -> bool:
return (
self.unrendered_config.get('severity') ==
other.unrendered_config.get('severity')
)
comparisons = [
self.unrendered_config.get(modifier) == other.unrendered_config.get(modifier) or (
self.unrendered_config.get(modifier) is None and
other.unrendered_config.get(modifier) is None
)
for modifier in ('severity', 'store_failures')
]
return all(comparisons)

def same_column_name(self, other) -> bool:
return self.column_name == other.column_name
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,13 @@ class SeedConfig(NodeConfig):

@dataclass
class TestConfig(NodeConfig):
schema: Optional[str] = field(
default='dbt_test__audit',
metadata=CompareBehavior.Exclude.meta(),
)
materialized: str = 'test'
severity: Severity = Severity('ERROR')
store_failures: Optional[bool] = None


@dataclass
Expand Down
27 changes: 23 additions & 4 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ class ParsedNodeMixins(dbtClassMixin):
def is_refable(self):
return self.resource_type in NodeType.refable()

@property
def should_store_failures(self):
return self.resource_type == NodeType.Test and (
self.config.store_failures if self.config.store_failures is not None
else flags.STORE_FAILURES
)

# will this node map to an object in the database?
@property
def is_relational(self):
return (
self.resource_type in NodeType.refable() or
self.should_store_failures
)

@property
def is_ephemeral(self):
return self.config.materialized == 'ephemeral'
Expand Down Expand Up @@ -370,10 +385,14 @@ class ParsedSchemaTestNode(ParsedNode, HasTestMetadata):
config: TestConfig = field(default_factory=TestConfig)

def same_config(self, other) -> bool:
return (
self.unrendered_config.get('severity') ==
other.unrendered_config.get('severity')
)
comparisons = [
self.unrendered_config.get(modifier) == other.unrendered_config.get(modifier) or (
self.unrendered_config.get(modifier) is None and
other.unrendered_config.get(modifier) is None
)
for modifier in ('severity', 'store_failures')
]
return all(comparisons)

def same_column_name(self, other) -> bool:
return self.column_name == other.column_name
Expand Down
10 changes: 8 additions & 2 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
WRITE_JSON = None
PARTIAL_PARSE = None
USE_COLORS = None
STORE_FAILURES = None


def env_set_truthy(key: str) -> Optional[str]:
Expand Down Expand Up @@ -54,7 +55,8 @@ def _get_context():

def reset():
global STRICT_MODE, FULL_REFRESH, USE_CACHE, WARN_ERROR, TEST_NEW_PARSER, \
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS, \
STORE_FAILURES

STRICT_MODE = False
FULL_REFRESH = False
Expand All @@ -66,11 +68,13 @@ def reset():
PARTIAL_PARSE = False
MP_CONTEXT = _get_context()
USE_COLORS = True
STORE_FAILURES = False


def set_from_args(args):
global STRICT_MODE, FULL_REFRESH, USE_CACHE, WARN_ERROR, TEST_NEW_PARSER, \
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS, \
STORE_FAILURES

USE_CACHE = getattr(args, 'use_cache', USE_CACHE)

Expand All @@ -94,6 +98,8 @@ def set_from_args(args):
if use_colors_override is not None:
USE_COLORS = use_colors_override

STORE_FAILURES = getattr(args, 'store_failures', STORE_FAILURES)


# initialize everything to the defaults on module load
reset()
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@
{% endif %}
{% do return(config_full_refresh) %}
{% endmacro %}


{% macro should_store_failures() %}
{% set config_store_failures = config.get('store_failures') %}
{% if config_store_failures is none %}
{% set config_store_failures = flags.STORE_FAILURES %}
{% endif %}
{% do return(config_store_failures) %}
{% endmacro %}
44 changes: 40 additions & 4 deletions core/dbt/include/global_project/macros/materializations/test.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,46 @@
{%- materialization test, default -%}

{% set relations = [] %}

{% if should_store_failures() %}

{% set identifier = model['alias'] %}
{% set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %}
{% set target_relation = api.Relation.create(
identifier=identifier, schema=schema, database=database, type='table') -%} %}

{% if old_relation %}
{% do adapter.drop_relation(old_relation) %}
{% endif %}

{% call statement(auto_begin=True) %}
{{ create_table_as(False, target_relation, sql) }}
{% endcall %}

{% do relations.append(target_relation) %}

{% set main_sql %}
select count(*) as validation_errors
from {{ target_relation }}
{% endset %}

{{ adapter.commit() }}

{% else %}

{% set main_sql %}
select count(*) as validation_errors
from (
{{ sql }}
) _dbt_internal_test
{% endset %}

{% endif %}

{% call statement('main', fetch_result=True) -%}
select count(*) as validation_errors
from (
{{ sql }}
) _dbt_internal_test
{{ main_sql }}
{%- endcall %}

{{ return({'relations': relations}) }}

{%- endmaterialization -%}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

with all_values as (

select distinct
{{ column_name }} as value_field
select
{{ column_name }} as value_field,
count(*) as n_records

from {{ model }}
group by 1

)

select
value_field

select *
from all_values
where value_field not in (
{% for value in values -%}
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,13 @@ def _build_test_subparser(subparsers, base_subparser):
Stop execution upon a first test failure.
'''
)
sub.add_argument(
'--store-failures',
action='store_true',
help='''
Store test results (failing rows) in the database
'''
)

sub.set_defaults(cls=test_task.TestTask, which='test', rpc_method='test')
return sub
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def _check_resource_uniqueness(
alias_resources: Dict[str, ManifestNode] = {}

for resource, node in manifest.nodes.items():
if node.resource_type not in NodeType.refable():
if not node.is_relational:
continue
# appease mypy - sources aren't refable!
assert not isinstance(node, ParsedSourceDefinition)
Expand Down
33 changes: 24 additions & 9 deletions core/dbt/parser/schema_test_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ def get_nice_schema_test_name(
clean_flat_args = [re.sub('[^0-9a-zA-Z_]+', '_', arg) for arg in flat_args]
unique = "__".join(clean_flat_args)

cutoff = 32
if len(unique) <= cutoff:
label = unique
else:
label = hashlib.md5(unique.encode('utf-8')).hexdigest()
# for the file path + alias, the name must be <64 characters
# if the full name is too long, include the first 30 identifying chars plus
# a 32-character hash of the full contents

test_identifier = '{}_{}'.format(test_type, test_name)
full_name = '{}_{}'.format(test_identifier, unique)

filename = '{}_{}_{}'.format(test_type, test_name, label)
name = '{}_{}_{}'.format(test_type, test_name, unique)
if len(full_name) >= 64:
test_trunc_identifier = test_identifier[:30]
label = hashlib.md5(full_name.encode('utf-8')).hexdigest()
short_name = '{}_{}'.format(test_trunc_identifier, label)
else:
short_name = full_name

return filename, name
return short_name, full_name


@dataclass
Expand Down Expand Up @@ -185,7 +190,7 @@ class TestBuilder(Generic[Testable]):
r'(?P<test_name>([a-zA-Z_][0-9a-zA-Z_]*))'
)
# kwargs representing test configs
MODIFIER_ARGS = ('severity', 'tags', 'enabled')
MODIFIER_ARGS = ('severity', 'tags', 'enabled', 'store_failures')

def __init__(
self,
Expand Down Expand Up @@ -231,6 +236,10 @@ def __init__(
self.compiled_name: str = compiled_name
self.fqn_name: str = fqn_name

# use hashed name as alias if too long
if compiled_name != fqn_name:
self.modifiers['alias'] = compiled_name

def _bad_type(self) -> TypeError:
return TypeError('invalid target type "{}"'.format(type(self.target)))

Expand Down Expand Up @@ -271,13 +280,19 @@ def extract_test_args(test, name=None) -> Tuple[str, Dict[str, Any]]:
def enabled(self) -> Optional[bool]:
return self.modifiers.get('enabled')

def alias(self) -> Optional[str]:
return self.modifiers.get('alias')

def severity(self) -> Optional[str]:
sev = self.modifiers.get('severity')
if sev:
return sev.upper()
else:
return None

def store_failures(self) -> Optional[bool]:
return self.modifiers.get('store_failures')

def tags(self) -> List[str]:
tags = self.modifiers.get('tags', [])
if isinstance(tags, str):
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ def render_test_update(self, node, config, builder):
node.config['severity'] = builder.severity()
if builder.enabled() is not None:
node.config['enabled'] = builder.enabled()
# note: this does not respect generate_alias_name() macro
if builder.alias() is not None:
node.unrendered_config['alias'] = builder.alias()
node.config['alias'] = builder.alias()
node.alias = builder.alias()
# source node tests are processed at patch_source time
if isinstance(builder.target, UnpatchedSourceDefinition):
sources = [builder.target.fqn[-2], builder.target.fqn[-1]]
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ def print_run_result_error(
logger.info(" compiled SQL at {}".format(
result.node.compiled_path))

if result.node.should_store_failures:
with TextOnly():
logger.info("")
msg = f"select * from {result.node.relation_name}"
border = '-' * len(msg)
logger.info(f" See test failures:\n {border}\n {msg}\n {border}")

elif result.message is not None:
first = True
for line in result.message.split("\n"):
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def get_model_schemas(
for node in self.manifest.nodes.values():
if node.unique_id not in selected_uids:
continue
if node.is_refable and not node.is_ephemeral:
if node.is_relational and not node.is_ephemeral:
relation = adapter.Relation.create_from(self.config, node)
result.add(relation.without_identifier())

Expand Down Expand Up @@ -525,7 +525,6 @@ def create_schema(relation: BaseRelation) -> None:
db_schema = (db_lower, schema.lower())
if db_schema not in existing_schemas_lowered:
existing_schemas_lowered.add(db_schema)

fut = tpe.submit_connected(
adapter, f'create_{info.database or ""}_{info.schema}',
create_schema, info
Expand Down
Loading

0 comments on commit c0d757a

Please sign in to comment.