From e86bbe08167dfd682122c77ed19863820c22246f Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 16 Jul 2018 07:13:15 -0600 Subject: [PATCH] Add bigquery hooks (#779) (#836) --- .circleci/config.yml | 2 + CHANGELOG.md | 1 + dbt/adapters/bigquery/impl.py | 11 +- .../materializations/table/bigquery_table.sql | 3 + .../materializations/view/bigquery_view.sql | 4 + dbt/node_runners.py | 8 +- .../{example_seed.sql => example_seed.csv} | 0 .../macros/before-and-after-bq.sql | 30 ++++ .../014_hook_tests/seed-models-bq/schema.yml | 5 + .../014_hook_tests/seed_model_bigquery.sql | 18 ++ .../014_hook_tests/seed_run_bigquery.sql | 18 ++ .../014_hook_tests/test_model_hooks.py | 8 +- .../014_hook_tests/test_model_hooks_bq.py | 156 ++++++++++++++++++ .../014_hook_tests/test_run_hooks.py | 13 +- .../014_hook_tests/test_run_hooks_bq.py | 99 +++++++++++ test/integration/base.py | 37 +++-- 16 files changed, 386 insertions(+), 27 deletions(-) rename test/integration/014_hook_tests/data/{example_seed.sql => example_seed.csv} (100%) create mode 100644 test/integration/014_hook_tests/macros/before-and-after-bq.sql create mode 100644 test/integration/014_hook_tests/seed-models-bq/schema.yml create mode 100644 test/integration/014_hook_tests/seed_model_bigquery.sql create mode 100644 test/integration/014_hook_tests/seed_run_bigquery.sql create mode 100644 test/integration/014_hook_tests/test_model_hooks_bq.py create mode 100644 test/integration/014_hook_tests/test_run_hooks_bq.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 47fa209a7ad..11186999dec 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,6 +43,7 @@ jobs: - run: name: Run tests command: tox -e integration-snowflake-py36 + no_output_timeout: 1h integration-redshift-py36: docker: *py36 steps: @@ -87,6 +88,7 @@ jobs: - run: name: Run tests command: tox -e integration-snowflake-py27 + no_output_timeout: 1h integration-redshift-py27: docker: *py27 steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index ea3fddb9d09..4f9a6dae09a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Changes + - Add pre/post hook support for bigquery ([#836](https://github.com/fishtown-analytics/dbt/pull/836)) - Improve consistency of BigQuery list_relations, create shortcut for most materializations ([#835](https://github.com/fishtown-analytics/dbt/pull/835)) - Support external BigQuery relations ([#828](https://github.com/fishtown-analytics/dbt/pull/828)) - Added tox environments that have the user specify what tests should be run ([#837](https://github.com/fishtown-analytics/dbt/pull/837)) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index a2e36b18e54..d36da44365f 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -369,7 +369,7 @@ def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs): query_job = client.query(sql, job_config) # this blocks until the query has completed - with cls.exception_handler(profile, 'create dataset', model_name): + with cls.exception_handler(profile, sql, model_name): iterator = query_job.result() if fetch: @@ -392,10 +392,15 @@ def get_table_from_response(cls, resp): rows = [dict(row.items()) for row in resp] return dbt.clients.agate_helper.table_from_data(rows, column_names) + # BigQuery doesn't support BEGIN/COMMIT, so stub these out. + @classmethod def add_begin_query(cls, profile, name): - raise dbt.exceptions.NotImplementedException( - '`add_begin_query` is not implemented for this adapter!') + pass + + @classmethod + def add_commit_query(cls, profile, name): + pass @classmethod def create_schema(cls, profile, project_cfg, schema, model_name=None): diff --git a/dbt/include/global_project/macros/materializations/table/bigquery_table.sql b/dbt/include/global_project/macros/materializations/table/bigquery_table.sql index fb827db1733..b140e6107f2 100644 --- a/dbt/include/global_project/macros/materializations/table/bigquery_table.sql +++ b/dbt/include/global_project/macros/materializations/table/bigquery_table.sql @@ -51,6 +51,8 @@ {% endif %} {% endif %} + {{ run_hooks(pre_hooks) }} + {# Since dbt uses WRITE_TRUNCATE mode for tables, we only need to drop this thing if it is not a table. If it _is_ already a table, then we can overwrite it without downtime @@ -70,5 +72,6 @@ {% endcall -%} {% endif %} + {{ run_hooks(post_hooks) }} {% endmaterialization %} diff --git a/dbt/include/global_project/macros/materializations/view/bigquery_view.sql b/dbt/include/global_project/macros/materializations/view/bigquery_view.sql index aecda7a8794..225d6e61e84 100644 --- a/dbt/include/global_project/macros/materializations/view/bigquery_view.sql +++ b/dbt/include/global_project/macros/materializations/view/bigquery_view.sql @@ -12,6 +12,8 @@ identifier=identifier, schema=schema, type='view') -%} + {{ run_hooks(pre_hooks) }} + -- drop if exists {%- if old_relation is not none -%} {%- if old_relation.is_table and not flags.FULL_REFRESH -%} @@ -33,4 +35,6 @@ {%- endcall %} {%- endif %} + {{ run_hooks(post_hooks) }} + {%- endmaterialization %} diff --git a/dbt/node_runners.py b/dbt/node_runners.py index 00dbb162202..47a599c4496 100644 --- a/dbt/node_runners.py +++ b/dbt/node_runners.py @@ -326,8 +326,8 @@ def run_hooks(cls, project, adapter, flat_graph, hook_type): sql = hook_dict.get('sql', '') if len(sql.strip()) > 0: - adapter.execute_one(profile, sql, model_name=model_name, - auto_begin=False) + adapter.execute(profile, sql, model_name=model_name, + auto_begin=False, fetch=False) adapter.release_connection(profile, model_name) @@ -500,10 +500,6 @@ def describe_node(self): schema_name = self.node.get('schema') return "seed file {}.{}".format(schema_name, self.node['alias']) - @classmethod - def before_run(cls, project, adapter, flat_graph): - cls.create_schemas(project, adapter, flat_graph) - def before_execute(self): description = self.describe_node() dbt.ui.printer.print_start_line(description, self.node_index, diff --git a/test/integration/014_hook_tests/data/example_seed.sql b/test/integration/014_hook_tests/data/example_seed.csv similarity index 100% rename from test/integration/014_hook_tests/data/example_seed.sql rename to test/integration/014_hook_tests/data/example_seed.csv diff --git a/test/integration/014_hook_tests/macros/before-and-after-bq.sql b/test/integration/014_hook_tests/macros/before-and-after-bq.sql new file mode 100644 index 00000000000..81958efc91c --- /dev/null +++ b/test/integration/014_hook_tests/macros/before-and-after-bq.sql @@ -0,0 +1,30 @@ + +{% macro custom_run_hook_bq(state, target, run_started_at, invocation_id) %} + + insert into {{ target.schema }}.on_run_hook ( + state, + target_dbname, + target_host, + target_name, + target_schema, + target_type, + target_user, + target_pass, + target_threads, + run_started_at, + invocation_id + ) VALUES ( + '{{ state }}', + '{{ target.dbname }}', + '{{ target.host }}', + '{{ target.name }}', + '{{ target.schema }}', + '{{ target.type }}', + '{{ target.user }}', + '{{ target.pass }}', + {{ target.threads }}, + '{{ run_started_at }}', + '{{ invocation_id }}' + ) + +{% endmacro %} diff --git a/test/integration/014_hook_tests/seed-models-bq/schema.yml b/test/integration/014_hook_tests/seed-models-bq/schema.yml new file mode 100644 index 00000000000..c112d2d3ebf --- /dev/null +++ b/test/integration/014_hook_tests/seed-models-bq/schema.yml @@ -0,0 +1,5 @@ + +example_seed: + constraints: + not_null: + - a diff --git a/test/integration/014_hook_tests/seed_model_bigquery.sql b/test/integration/014_hook_tests/seed_model_bigquery.sql new file mode 100644 index 00000000000..7093a47e898 --- /dev/null +++ b/test/integration/014_hook_tests/seed_model_bigquery.sql @@ -0,0 +1,18 @@ + +drop table if exists {schema}.on_model_hook; + +create table {schema}.on_model_hook ( + state STRING, -- start|end + + target_dbname STRING, + target_host STRING, + target_name STRING, + target_schema STRING, + target_type STRING, + target_user STRING, + target_pass STRING, + target_threads INT64, + + run_started_at STRING, + invocation_id STRING +); diff --git a/test/integration/014_hook_tests/seed_run_bigquery.sql b/test/integration/014_hook_tests/seed_run_bigquery.sql new file mode 100644 index 00000000000..d9d5212ef5f --- /dev/null +++ b/test/integration/014_hook_tests/seed_run_bigquery.sql @@ -0,0 +1,18 @@ + +drop table if exists {schema}.on_run_hook; + +create table {schema}.on_run_hook ( + state STRING, -- start|end + + target_dbname STRING, + target_host STRING, + target_name STRING, + target_schema STRING, + target_type STRING, + target_user STRING, + target_pass STRING, + target_threads INT64, + + run_started_at STRING, + invocation_id STRING +); \ No newline at end of file diff --git a/test/integration/014_hook_tests/test_model_hooks.py b/test/integration/014_hook_tests/test_model_hooks.py index eb2433bb9ac..7ee721fec58 100644 --- a/test/integration/014_hook_tests/test_model_hooks.py +++ b/test/integration/014_hook_tests/test_model_hooks.py @@ -64,8 +64,8 @@ class TestPrePostModelHooks(DBTIntegrationTest): - def setUp(self): + self.adapter_type = 'bigquery' DBTIntegrationTest.setUp(self) self.run_sql_file("test/integration/014_hook_tests/seed_model.sql") @@ -178,6 +178,8 @@ def project_config(self): @attr(type='postgres') def test_hooks_on_seeds(self): - self.run_dbt(['seed']) - self.run_dbt(['test']) + res = self.run_dbt(['seed']) + self.assertEqual(len(res), 1, 'Expected exactly one item') + res = self.run_dbt(['test']) + self.assertEqual(len(res), 1, 'Expected exactly one item') diff --git a/test/integration/014_hook_tests/test_model_hooks_bq.py b/test/integration/014_hook_tests/test_model_hooks_bq.py new file mode 100644 index 00000000000..25577515d51 --- /dev/null +++ b/test/integration/014_hook_tests/test_model_hooks_bq.py @@ -0,0 +1,156 @@ +from nose.plugins.attrib import attr +from test.integration.base import DBTIntegrationTest + +MODEL_PRE_HOOK = """ + insert into {{this.schema}}.on_model_hook ( + state, + target_name, + target_schema, + target_type, + target_threads, + run_started_at, + invocation_id + ) VALUES ( + 'start', + '{{ target.name }}', + '{{ target.schema }}', + '{{ target.type }}', + {{ target.threads }}, + '{{ run_started_at }}', + '{{ invocation_id }}' + ) +""" + + +MODEL_POST_HOOK = """ + insert into {{this.schema}}.on_model_hook ( + state, + target_name, + target_schema, + target_type, + target_threads, + run_started_at, + invocation_id + ) VALUES ( + 'end', + '{{ target.name }}', + '{{ target.schema }}', + '{{ target.type }}', + {{ target.threads }}, + '{{ run_started_at }}', + '{{ invocation_id }}' + ) +""" + +class TestBigqueryPrePostModelHooks(DBTIntegrationTest): + def setUp(self): + DBTIntegrationTest.setUp(self) + self.use_profile('bigquery') + self.use_default_project() + self.run_sql_file("test/integration/014_hook_tests/seed_model_bigquery.sql") + + self.fields = [ + 'state', + 'target_name', + 'target_schema', + 'target_threads', + 'target_type', + 'run_started_at', + 'invocation_id' + ] + + @property + def schema(self): + return "model_hooks_014" + + @property + def profile_config(self): + profile = self.bigquery_profile() + profile['test']['outputs']['default2']['threads'] = 3 + return profile + + @property + def project_config(self): + return { + 'macro-paths': ['test/integration/014_hook_tests/macros'], + 'models': { + 'test': { + 'pre-hook': [MODEL_PRE_HOOK], + + 'post-hook':[MODEL_POST_HOOK] + } + } + } + + @property + def models(self): + return "test/integration/014_hook_tests/models" + + def get_ctx_vars(self, state): + field_list = ", ".join(self.fields) + query = "select {field_list} from `{schema}.on_model_hook` where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state) + + vals = self.run_sql(query, fetch='all') + self.assertFalse(len(vals) == 0, 'nothing inserted into hooks table') + self.assertFalse(len(vals) > 1, 'too many rows in hooks table') + ctx = dict(zip(self.fields, vals[0])) + + return ctx + + def check_hooks(self, state): + ctx = self.get_ctx_vars(state) + + self.assertEqual(ctx['state'], state) + self.assertEqual(ctx['target_name'], 'default2') + self.assertEqual(ctx['target_schema'], self.unique_schema()) + self.assertEqual(ctx['target_threads'], 3) + self.assertEqual(ctx['target_type'], 'bigquery') + self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set') + self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set') + + @attr(type='bigquery') + def test_pre_and_post_model_hooks(self): + self.run_dbt(['run']) + + self.check_hooks('start') + self.check_hooks('end') + + +class TestBigqueryPrePostModelHooksOnSeeds(DBTIntegrationTest): + def setUp(self): + DBTIntegrationTest.setUp(self) + self.use_profile('bigquery') + self.use_default_project() + + @property + def schema(self): + return "model_hooks_014" + + @property + def models(self): + return "test/integration/014_hook_tests/seed-models-bq" + + @property + def project_config(self): + return { + 'data-paths': ['test/integration/014_hook_tests/data'], + 'models': {}, + 'seeds': { + 'post-hook': [ + 'insert into {{ this }} (a, b, c) VALUES (10, 11, 12)', + ] + } + } + + @attr(type='bigquery') + def test_hooks_on_seeds(self): + res = self.run_dbt(['seed']) + self.assertEqual(len(res), 1, 'Expected exactly one item') + res = self.run_dbt(['test']) + self.assertEqual(len(res), 1, 'Expected exactly one item') + result = self.run_sql( + 'select a, b, c from `{schema}`.`example_seed` where a = 10', + fetch='all' + ) + self.assertFalse(len(result) == 0, 'nothing inserted into table by hook') + self.assertFalse(len(result) > 1, 'too many rows in table') diff --git a/test/integration/014_hook_tests/test_run_hooks.py b/test/integration/014_hook_tests/test_run_hooks.py index b249eb3d709..304f3b82b61 100644 --- a/test/integration/014_hook_tests/test_run_hooks.py +++ b/test/integration/014_hook_tests/test_run_hooks.py @@ -1,8 +1,6 @@ from nose.plugins.attrib import attr from test.integration.base import DBTIntegrationTest -import os.path - class TestPrePostRunHooks(DBTIntegrationTest): def setUp(self): @@ -33,6 +31,7 @@ def schema(self): def project_config(self): return { 'macro-paths': ['test/integration/014_hook_tests/macros'], + 'data-paths': ['test/integration/014_hook_tests/data'], # The create and drop table statements here validate that these hooks run # in the same order that they are defined. Drop before create is an error. @@ -88,6 +87,16 @@ def test_pre_and_post_run_hooks(self): self.check_hooks('start') self.check_hooks('end') + self.assertTableDoesNotExist("start_hook_order_test") + self.assertTableDoesNotExist("end_hook_order_test") + + @attr(type='postgres') + def test_pre_and_post_seed_hooks(self): + self.run_dbt(['seed']) + + self.check_hooks('start') + self.check_hooks('end') self.assertTableDoesNotExist("start_hook_order_test") self.assertTableDoesNotExist("end_hook_order_test") + diff --git a/test/integration/014_hook_tests/test_run_hooks_bq.py b/test/integration/014_hook_tests/test_run_hooks_bq.py new file mode 100644 index 00000000000..c8699f0e97a --- /dev/null +++ b/test/integration/014_hook_tests/test_run_hooks_bq.py @@ -0,0 +1,99 @@ +from nose.plugins.attrib import attr +from test.integration.base import DBTIntegrationTest + +class TestBigqueryPrePostRunHooks(DBTIntegrationTest): + + def setUp(self): + DBTIntegrationTest.setUp(self) + self.use_profile('bigquery') + self.use_default_project() + self.run_sql_file("test/integration/014_hook_tests/seed_run_bigquery.sql") + + self.fields = [ + 'state', + 'target_name', + 'target_schema', + 'target_threads', + 'target_type', + 'run_started_at', + 'invocation_id' + ] + + @property + def schema(self): + return "run_hooks_014" + + @property + def profile_config(self): + profile = self.bigquery_profile() + profile['test']['outputs']['default2']['threads'] = 3 + return profile + + + @property + def project_config(self): + return { + 'macro-paths': ['test/integration/014_hook_tests/macros'], + 'data-paths': ['test/integration/014_hook_tests/data'], + + # The create and drop table statements here validate that these hooks run + # in the same order that they are defined. Drop before create is an error. + # Also check that the table does not exist below. + "on-run-start": [ + "{{ custom_run_hook_bq('start', target, run_started_at, invocation_id) }}", + "create table {{ target.schema }}.start_hook_order_test ( id INT64 )", + "drop table {{ target.schema }}.start_hook_order_test", + ], + "on-run-end": [ + "{{ custom_run_hook_bq('end', target, run_started_at, invocation_id) }}", + "create table {{ target.schema }}.end_hook_order_test ( id INT64 )", + "drop table {{ target.schema }}.end_hook_order_test", + ] + } + + @property + def models(self): + return "test/integration/014_hook_tests/models" + + def get_ctx_vars(self, state): + field_list = ", ".join(self.fields) + query = "select {field_list} from `{schema}.on_run_hook` where state = '{state}'".format(field_list=field_list, schema=self.unique_schema(), state=state) + + vals = self.run_sql(query, fetch='all') + self.assertFalse(len(vals) == 0, 'nothing inserted into on_run_hook table') + self.assertFalse(len(vals) > 1, 'too many rows in hooks table') + ctx = dict(zip(self.fields, vals[0])) + + return ctx + + def check_hooks(self, state): + ctx = self.get_ctx_vars(state) + + self.assertEqual(ctx['state'], state) + self.assertEqual(ctx['target_name'], 'default2') + self.assertEqual(ctx['target_schema'], self.unique_schema()) + self.assertEqual(ctx['target_threads'], 3) + self.assertEqual(ctx['target_type'], 'bigquery') + + self.assertTrue(ctx['run_started_at'] is not None and len(ctx['run_started_at']) > 0, 'run_started_at was not set') + self.assertTrue(ctx['invocation_id'] is not None and len(ctx['invocation_id']) > 0, 'invocation_id was not set') + + @attr(type='bigquery') + def test_bigquery_pre_and_post_run_hooks(self): + self.run_dbt(['run']) + + self.check_hooks('start') + self.check_hooks('end') + + self.assertTableDoesNotExist("start_hook_order_test") + self.assertTableDoesNotExist("end_hook_order_test") + + @attr(type='bigquery') + def test_pre_and_post_seed_hooks(self): + self.run_dbt(['seed']) + + self.check_hooks('start') + self.check_hooks('end') + + self.assertTableDoesNotExist("start_hook_order_test") + self.assertTableDoesNotExist("end_hook_order_test") diff --git a/test/integration/base.py b/test/integration/base.py index 5aa990f3207..188aeb04802 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -2,6 +2,7 @@ import dbt.main as dbt import os, shutil import yaml +import random import time import json @@ -30,7 +31,7 @@ def __init__(self): class DBTIntegrationTest(unittest.TestCase): - prefix = "test{}".format(int(time.time())) + prefix = "test{}{:04}".format(int(time.time()), random.randint(0, 9999)) def postgres_profile(self): return { @@ -359,13 +360,26 @@ def transform_sql(self, query): return to_return + def run_sql_bigquery(self, sql, fetch): + """Run an SQL query on a bigquery adapter. No cursors, transactions, + etc. to worry about. If fetch is not 'None', all records are fetched. + """ + adapter = get_adapter(self._profile) + fetch = fetch != 'None' + _, res = adapter.execute(self._profile, sql, fetch=fetch) + return res + def run_sql(self, query, fetch='None'): if query.strip() == "": return + sql = self.transform_sql(query) + if self.adapter_type == 'bigquery': + return self.run_sql_bigquery(sql, fetch) + with self.handle.cursor() as cursor: try: - cursor.execute(self.transform_sql(query)) + cursor.execute(sql) self.handle.commit() if fetch == 'one': return cursor.fetchone() @@ -381,17 +395,14 @@ def run_sql(self, query, fetch='None'): def get_table_columns(self, table, schema=None): schema = self.unique_schema() if schema is None else schema - sql = """ - select column_name, data_type, character_maximum_length - from information_schema.columns - where table_name ilike '{}' - and table_schema ilike '{}' - order by column_name asc""" - - result = self.run_sql(sql.format(table.replace('"', ''), schema), - fetch='all') - - return result + columns = self.adapter.get_columns_in_table( + self._profile, + self.project_config, + schema, + table + ) + return sorted(((c.name, c.dtype, c.char_size) for c in columns), + key=lambda x: x[0]) def get_models_in_schema(self, schema=None): schema = self.unique_schema() if schema is None else schema