Skip to content

Commit

Permalink
show output for archival jobs (#325)
Browse files Browse the repository at this point in the history
* support composite UKs for archival

* add composite uk note to changelog

* show output for archival jobs

* typo

* integration test fix for snowflake archival

this broke because of uppercasing of identifiers
  • Loading branch information
drewbanin committed Mar 17, 2017
1 parent a817c9f commit 7e3b361
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes

- Fix ephemeral load order bug ([#292](https://github.com/fishtown-analytics/dbt/pull/292), [#285](https://github.com/fishtown-analytics/dbt/pull/285))
- Support composite unique key in archivals ([#324](https://github.com/fishtown-analytics/dbt/pull/324))
- Fix target paths ([#331](https://github.com/fishtown-analytics/dbt/pull/331), [#329](https://github.com/fishtown-analytics/dbt/issues/329))
- Ignore commented-out schema tests ([#330](https://github.com/fishtown-analytics/dbt/pull/330), [#328](https://github.com/fishtown-analytics/dbt/issues/328))

Expand Down
36 changes: 34 additions & 2 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def print_start_line(node, schema_name, index, total):
print_model_start_line(node, schema_name, index, total)
if node.get('resource_type') == NodeType.Test:
print_test_start_line(node, schema_name, index, total)
if node.get('resource_type') == NodeType.Archive:
print_archive_start_line(node, index, total)


def print_test_start_line(model, schema_name, index, total):
Expand All @@ -114,13 +116,23 @@ def print_model_start_line(model, schema_name, index, total):
print_fancy_output_line(msg, 'RUN', index, total)


def print_archive_start_line(model, index, total):
cfg = model.get('config', {})
msg = "START archive {source_schema}.{source_table} --> "\
"{target_schema}.{target_table}".format(**cfg)

print_fancy_output_line(msg, 'RUN', index, total)


def print_result_line(result, schema_name, index, total):
node = result.node

if node.get('resource_type') == NodeType.Model:
print_model_result_line(result, schema_name, index, total)
elif node.get('resource_type') == NodeType.Test:
print_test_result_line(result, schema_name, index, total)
elif node.get('resource_type') == NodeType.Archive:
print_archive_result_line(result, index, total)


def print_test_result_line(result, schema_name, index, total):
Expand All @@ -146,6 +158,24 @@ def print_test_result_line(result, schema_name, index, total):
result.execution_time)


def print_archive_result_line(result, index, total):
model = result.node
info = 'OK archived'

if result.errored:
info = 'ERROR archiving'

cfg = model.get('config', {})

print_fancy_output_line(
"{info} {source_schema}.{source_table} --> "
"{target_schema}.{target_table}".format(info=info, **cfg),
result.status,
index,
total,
result.execution_time)


def execute_test(profile, test):
adapter = get_adapter(profile)
_, cursor = adapter.execute_one(
Expand Down Expand Up @@ -288,6 +318,8 @@ def execute_archive(profile, node, context):
profile, node_cfg.get('source_schema'), node_cfg.get('source_table'))

if len(source_columns) == 0:
source_schema = node_cfg.get('source_schema')
source_table = node_cfg.get('source_table')
raise RuntimeError(
'Source table "{}"."{}" does not '
'exist'.format(source_schema, source_table))
Expand All @@ -304,8 +336,8 @@ def execute_archive(profile, node, context):
schema=node_cfg.get('target_schema'),
table=node_cfg.get('target_table'),
columns=dest_columns,
sort=node_cfg.get('updated_at'),
dist=node_cfg.get('unique_key'))
sort='dbt_updated_at',
dist='scd_id')

# TODO move this to inject_runtime_config, generate archive SQL
# in wrap step. can't do this right now because we actually need
Expand Down
10 changes: 5 additions & 5 deletions dbt/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ def wrap(self, opts):
{% for col in get_columns_in_table(source_schema, source_table) %}
"{{ col.name }}" {% if not loop.last %},{% endif %}
{% endfor %},
"{{ updated_at }}" as "dbt_updated_at",
"{{ unique_key }}" as "dbt_pk",
"{{ updated_at }}" as "valid_from",
{{ updated_at }} as "dbt_updated_at",
{{ unique_key }} as "dbt_pk",
{{ updated_at }} as "valid_from",
null::timestamp as "tmp_valid_to"
from "{{ source_schema }}"."{{ source_table }}"
Expand All @@ -134,8 +134,8 @@ def wrap(self, opts):
{% for col in get_columns_in_table(source_schema, source_table) %}
"{{ col.name }}" {% if not loop.last %},{% endif %}
{% endfor %},
"{{ updated_at }}" as "dbt_updated_at",
"{{ unique_key }}" as "dbt_pk",
{{ updated_at }} as "dbt_updated_at",
{{ unique_key }} as "dbt_pk",
"valid_from",
"valid_to" as "tmp_valid_to"
from "{{ target_schema }}"."{{ target_table }}"
Expand Down
2 changes: 1 addition & 1 deletion test/integration/004_simple_archive_test/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ select
"updated_at" as valid_from,
null::timestamp as valid_to,
"updated_at" as dbt_updated_at,
md5("id" || '|' || "updated_at"::text) as scd_id
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as scd_id
from "simple_archive_004"."seed";
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def project_config(self):
{
"source_table": "seed",
"target_table": "archive_actual",
"updated_at": "updated_at",
"unique_key": "id"
"updated_at": '"updated_at"',
"unique_key": '''"id" || '-' || "first_name"'''
}
]
}
Expand Down
4 changes: 2 additions & 2 deletions test/integration/004_simple_archive_test/update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ select
"updated_at" as "valid_from",
null::timestamp as "valid_to",
"updated_at" as "dbt_updated_at",
md5("id" || '|' || "updated_at"::text) as "scd_id"
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "scd_id"
from "simple_archive_004"."seed"
where "id" >= 10 and "id" <= 20;

Expand Down Expand Up @@ -72,6 +72,6 @@ select
"updated_at" as "valid_from",
null::timestamp as "valid_to",
"updated_at" as "dbt_updated_at",
md5("id" || '|' || "updated_at"::text) as "scd_id"
md5("id" || '-' || "first_name" || '|' || "updated_at"::text) as "scd_id"
from "simple_archive_004"."seed"
where "id" > 20;

0 comments on commit 7e3b361

Please sign in to comment.