Skip to content

Commit

Permalink
Add bigquery hooks (#779) (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
beckjake committed Jul 16, 2018
1 parent 8548b6d commit e86bbe0
Show file tree
Hide file tree
Showing 16 changed files with 386 additions and 27 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jobs:
- run:
name: Run tests
command: tox -e integration-snowflake-py36
no_output_timeout: 1h
integration-redshift-py36:
docker: *py36
steps:
Expand Down Expand Up @@ -87,6 +88,7 @@ jobs:
- run:
name: Run tests
command: tox -e integration-snowflake-py27
no_output_timeout: 1h
integration-redshift-py27:
docker: *py27
steps:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Changes

- Add pre/post hook support for bigquery ([#836](https://github.com/fishtown-analytics/dbt/pull/836))
- Improve consistency of BigQuery list_relations, create shortcut for most materializations ([#835](https://github.com/fishtown-analytics/dbt/pull/835))
- Support external BigQuery relations ([#828](https://github.com/fishtown-analytics/dbt/pull/828))
- Added tox environments that have the user specify what tests should be run ([#837](https://github.com/fishtown-analytics/dbt/pull/837))
Expand Down
11 changes: 8 additions & 3 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
query_job = client.query(sql, job_config)

# this blocks until the query has completed
with cls.exception_handler(profile, 'create dataset', model_name):
with cls.exception_handler(profile, sql, model_name):
iterator = query_job.result()

if fetch:
Expand All @@ -392,10 +392,15 @@ def get_table_from_response(cls, resp):
rows = [dict(row.items()) for row in resp]
return dbt.clients.agate_helper.table_from_data(rows, column_names)

# BigQuery doesn't support BEGIN/COMMIT, so stub these out.

@classmethod
def add_begin_query(cls, profile, name):
raise dbt.exceptions.NotImplementedException(
'`add_begin_query` is not implemented for this adapter!')
pass

@classmethod
def add_commit_query(cls, profile, name):
pass

@classmethod
def create_schema(cls, profile, project_cfg, schema, model_name=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
{% endif %}
{% endif %}

{{ run_hooks(pre_hooks) }}

{#
Since dbt uses WRITE_TRUNCATE mode for tables, we only need to drop this thing
if it is not a table. If it _is_ already a table, then we can overwrite it without downtime
Expand All @@ -70,5 +72,6 @@
{% endcall -%}
{% endif %}

{{ run_hooks(post_hooks) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
identifier=identifier, schema=schema,
type='view') -%}

{{ run_hooks(pre_hooks) }}

-- drop if exists
{%- if old_relation is not none -%}
{%- if old_relation.is_table and not flags.FULL_REFRESH -%}
Expand All @@ -33,4 +35,6 @@
{%- endcall %}
{%- endif %}

{{ run_hooks(post_hooks) }}

{%- endmaterialization %}
8 changes: 2 additions & 6 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ def run_hooks(cls, project, adapter, flat_graph, hook_type):
sql = hook_dict.get('sql', '')

if len(sql.strip()) > 0:
adapter.execute_one(profile, sql, model_name=model_name,
auto_begin=False)
adapter.execute(profile, sql, model_name=model_name,
auto_begin=False, fetch=False)

adapter.release_connection(profile, model_name)

Expand Down Expand Up @@ -500,10 +500,6 @@ def describe_node(self):
schema_name = self.node.get('schema')
return "seed file {}.{}".format(schema_name, self.node['alias'])

@classmethod
def before_run(cls, project, adapter, flat_graph):
cls.create_schemas(project, adapter, flat_graph)

def before_execute(self):
description = self.describe_node()
dbt.ui.printer.print_start_line(description, self.node_index,
Expand Down
30 changes: 30 additions & 0 deletions test/integration/014_hook_tests/macros/before-and-after-bq.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

{% macro custom_run_hook_bq(state, target, run_started_at, invocation_id) %}

insert into {{ target.schema }}.on_run_hook (
state,
target_dbname,
target_host,
target_name,
target_schema,
target_type,
target_user,
target_pass,
target_threads,
run_started_at,
invocation_id
) VALUES (
'{{ state }}',
'{{ target.dbname }}',
'{{ target.host }}',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
'{{ target.user }}',
'{{ target.pass }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)

{% endmacro %}
5 changes: 5 additions & 0 deletions test/integration/014_hook_tests/seed-models-bq/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

example_seed:
constraints:
not_null:
- a
18 changes: 18 additions & 0 deletions test/integration/014_hook_tests/seed_model_bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

drop table if exists {schema}.on_model_hook;

create table {schema}.on_model_hook (
state STRING, -- start|end

target_dbname STRING,
target_host STRING,
target_name STRING,
target_schema STRING,
target_type STRING,
target_user STRING,
target_pass STRING,
target_threads INT64,

run_started_at STRING,
invocation_id STRING
);
18 changes: 18 additions & 0 deletions test/integration/014_hook_tests/seed_run_bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

drop table if exists {schema}.on_run_hook;

create table {schema}.on_run_hook (
state STRING, -- start|end

target_dbname STRING,
target_host STRING,
target_name STRING,
target_schema STRING,
target_type STRING,
target_user STRING,
target_pass STRING,
target_threads INT64,

run_started_at STRING,
invocation_id STRING
);
8 changes: 5 additions & 3 deletions test/integration/014_hook_tests/test_model_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@


class TestPrePostModelHooks(DBTIntegrationTest):

def setUp(self):
self.adapter_type = 'bigquery'
DBTIntegrationTest.setUp(self)

self.run_sql_file("test/integration/014_hook_tests/seed_model.sql")
Expand Down Expand Up @@ -178,6 +178,8 @@ def project_config(self):

@attr(type='postgres')
def test_hooks_on_seeds(self):
self.run_dbt(['seed'])
self.run_dbt(['test'])
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')

156 changes: 156 additions & 0 deletions test/integration/014_hook_tests/test_model_hooks_bq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from nose.plugins.attrib import attr
from test.integration.base import DBTIntegrationTest

MODEL_PRE_HOOK = """
insert into {{this.schema}}.on_model_hook (
state,
target_name,
target_schema,
target_type,
target_threads,
run_started_at,
invocation_id
) VALUES (
'start',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""


MODEL_POST_HOOK = """
insert into {{this.schema}}.on_model_hook (
state,
target_name,
target_schema,
target_type,
target_threads,
run_started_at,
invocation_id
) VALUES (
'end',
'{{ target.name }}',
'{{ target.schema }}',
'{{ target.type }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
)
"""

class TestBigqueryPrePostModelHooks(DBTIntegrationTest):
def setUp(self):
DBTIntegrationTest.setUp(self)
self.use_profile('bigquery')
self.use_default_project()
self.run_sql_file("test/integration/014_hook_tests/seed_model_bigquery.sql")

self.fields = [
'state',
'target_name',
'target_schema',
'target_threads',
'target_type',
'run_started_at',
'invocation_id'
]

@property
def schema(self):
return "model_hooks_014"

@property
def profile_config(self):
profile = self.bigquery_profile()
profile['test']['outputs']['default2']['threads'] = 3
return profile

@property
def project_config(self):
return {
'macro-paths': ['test/integration/014_hook_tests/macros'],
'models': {
'test': {
'pre-hook': [MODEL_PRE_HOOK],

'post-hook':[MODEL_POST_HOOK]
}
}
}

@property
def models(self):
return "test/integration/014_hook_tests/models"

def get_ctx_vars(self, state):
field_list = ", ".join(self.fields)
query = "select {field_list} from `{schema}.on_model_hook` where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state)

vals = self.run_sql(query, fetch='all')
self.assertFalse(len(vals) == 0, 'nothing inserted into hooks table')
self.assertFalse(len(vals) > 1, 'too many rows in hooks table')
ctx = dict(zip(self.fields, vals[0]))

return ctx

def check_hooks(self, state):
ctx = self.get_ctx_vars(state)

self.assertEqual(ctx['state'], state)
self.assertEqual(ctx['target_name'], 'default2')
self.assertEqual(ctx['target_schema'], self.unique_schema())
self.assertEqual(ctx['target_threads'], 3)
self.assertEqual(ctx['target_type'], 'bigquery')
self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set')
self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set')

@attr(type='bigquery')
def test_pre_and_post_model_hooks(self):
self.run_dbt(['run'])

self.check_hooks('start')
self.check_hooks('end')


class TestBigqueryPrePostModelHooksOnSeeds(DBTIntegrationTest):
def setUp(self):
DBTIntegrationTest.setUp(self)
self.use_profile('bigquery')
self.use_default_project()

@property
def schema(self):
return "model_hooks_014"

@property
def models(self):
return "test/integration/014_hook_tests/seed-models-bq"

@property
def project_config(self):
return {
'data-paths': ['test/integration/014_hook_tests/data'],
'models': {},
'seeds': {
'post-hook': [
'insert into {{ this }} (a, b, c) VALUES (10, 11, 12)',
]
}
}

@attr(type='bigquery')
def test_hooks_on_seeds(self):
res = self.run_dbt(['seed'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
res = self.run_dbt(['test'])
self.assertEqual(len(res), 1, 'Expected exactly one item')
result = self.run_sql(
'select a, b, c from `{schema}`.`example_seed` where a = 10',
fetch='all'
)
self.assertFalse(len(result) == 0, 'nothing inserted into table by hook')
self.assertFalse(len(result) > 1, 'too many rows in table')
13 changes: 11 additions & 2 deletions test/integration/014_hook_tests/test_run_hooks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from nose.plugins.attrib import attr
from test.integration.base import DBTIntegrationTest

import os.path

class TestPrePostRunHooks(DBTIntegrationTest):

def setUp(self):
Expand Down Expand Up @@ -33,6 +31,7 @@ def schema(self):
def project_config(self):
return {
'macro-paths': ['test/integration/014_hook_tests/macros'],
'data-paths': ['test/integration/014_hook_tests/data'],

# The create and drop table statements here validate that these hooks run
# in the same order that they are defined. Drop before create is an error.
Expand Down Expand Up @@ -88,6 +87,16 @@ def test_pre_and_post_run_hooks(self):
self.check_hooks('start')
self.check_hooks('end')

self.assertTableDoesNotExist("start_hook_order_test")
self.assertTableDoesNotExist("end_hook_order_test")

@attr(type='postgres')
def test_pre_and_post_seed_hooks(self):
self.run_dbt(['seed'])

self.check_hooks('start')
self.check_hooks('end')

self.assertTableDoesNotExist("start_hook_order_test")
self.assertTableDoesNotExist("end_hook_order_test")

Loading

0 comments on commit e86bbe0

Please sign in to comment.