Skip to content

Commit

Permalink
Implement relations api (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmcarthur authored and drewbanin committed Apr 26, 2018
1 parent 7d7b557 commit 5344f54
Show file tree
Hide file tree
Showing 115 changed files with 1,757 additions and 735 deletions.
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ test-integration:
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py27,integration-postgres-py36,integration-snowflake-py27,integration-snowflake-py36,integration-bigquery-py27,integration-bigquery-py36

test-new:
@echo "Test run starting..."
@echo "Changed test files:"
@echo "${changed_tests}"
@docker-compose run test /usr/src/app/test/runner.sh ${changed_tests}

test-quick:
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py36 -- -x
7 changes: 7 additions & 0 deletions dbt/adapters/bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.adapters.bigquery.impl import BigQueryAdapter
from dbt.adapters.bigquery.relation import BigQueryRelation

__all__ = [
BigQueryAdapter,
BigQueryRelation,
]
148 changes: 87 additions & 61 deletions dbt/adapters/bigquery.py → dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from contextlib import contextmanager

import dbt.compat
import dbt.deprecations
import dbt.exceptions
import dbt.schema
import dbt.flags as flags
import dbt.clients.gcloud
import dbt.clients.agate_helper

from dbt.adapters.postgres import PostgresAdapter
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger

Expand All @@ -25,6 +27,7 @@
class BigQueryAdapter(PostgresAdapter):

context_functions = [
# deprecated -- use versions that take relations instead
"query_for_existing",
"execute_model",
"drop",
Expand All @@ -35,17 +38,24 @@ class BigQueryAdapter(PostgresAdapter):
"expand_target_column_types",
"load_dataframe",

# versions of adapter functions that take / return Relations
"list_relations",
"get_relation",
"drop_relation",
"rename_relation",

"get_columns_in_table"
]

Relation = BigQueryRelation
Column = dbt.schema.BigQueryColumn

SCOPE = ('https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive')

QUERY_TIMEOUT = 300

Column = dbt.schema.BigQueryColumn

@classmethod
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
Expand Down Expand Up @@ -165,60 +175,65 @@ def close(cls, connection):
return connection

@classmethod
def query_for_existing(cls, profile, schemas, model_name=None):
if not isinstance(schemas, (list, tuple)):
schemas = [schemas]

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')
def list_relations(cls, profile, project_cfg, schema, model_name=None):
connection = cls.get_connection(profile, model_name)
credentials = connection.get('credentials', {})
client = connection.get('handle')

all_tables = []
for schema in schemas:
dataset = cls.get_dataset(profile, schema, model_name)
all_tables.extend(client.list_tables(dataset))
bigquery_dataset = cls.get_dataset(
profile, project_cfg, schema, model_name)
all_tables = client.list_tables(bigquery_dataset)

relation_types = {
'TABLE': 'table',
'VIEW': 'view',
'EXTERNAL': 'external'
}

existing = [(table.table_id, relation_types.get(table.table_type))
for table in all_tables]

return dict(existing)
return [cls.Relation.create(
project=credentials.get('project'),
schema=schema,
identifier=table.table_id,
quote_policy={
'schema': True,
'identifier': True
},
type=relation_types.get(table.table_type))
for table in all_tables]

@classmethod
def table_exists(cls, profile, schema, table, model_name=None):
tables = cls.query_for_existing(profile, schema, model_name)
exists = tables.get(table) is not None
return exists

@classmethod
def drop(cls, profile, schema, relation, relation_type, model_name=None):
def drop_relation(cls, profile, project_cfg, relation, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
relation_object = dataset.table(relation)
dataset = cls.get_dataset(
profile, project_cfg, relation.schema, model_name)
relation_object = dataset.table(relation.identifier)
client.delete_table(relation_object)

@classmethod
def rename(cls, profile, schema, from_name, to_name, model_name=None):
def rename(cls, profile, project_cfg, schema,
from_name, to_name, model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename` is not implemented for this adapter!')

@classmethod
def rename_relation(cls, profile, project_cfg, from_relation, to_relation,
model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename_relation` is not implemented for this adapter!')

@classmethod
def get_timeout(cls, conn):
credentials = conn['credentials']
return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT)

@classmethod
def materialize_as_view(cls, profile, dataset, model):
def materialize_as_view(cls, profile, project_cfg, dataset, model):
model_name = model.get('name')
model_sql = model.get('injected_sql')

conn = cls.get_connection(profile, model_name)
conn = cls.get_connection(profile, project_cfg, model_name)
client = conn.get('handle')

view_ref = dataset.table(model_name)
Expand Down Expand Up @@ -249,21 +264,22 @@ def poll_until_job_completes(cls, job, timeout):
raise job.exception()

@classmethod
def make_date_partitioned_table(cls, profile, dataset_name, identifier,
model_name=None):
def make_date_partitioned_table(cls, profile, project_cfg, dataset_name,
identifier, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, dataset_name, identifier)
dataset = cls.get_dataset(profile, project_cfg,
dataset_name, identifier)
table_ref = dataset.table(identifier)
table = google.cloud.bigquery.Table(table_ref)
table.partitioning_type = 'DAY'

return client.create_table(table)

@classmethod
def materialize_as_table(cls, profile, dataset, model, model_sql,
decorator=None):
def materialize_as_table(cls, profile, project_cfg, dataset,
model, model_sql, decorator=None):
model_name = model.get('name')

conn = cls.get_connection(profile, model_name)
Expand All @@ -289,7 +305,8 @@ def materialize_as_table(cls, profile, dataset, model, model_sql,
return "CREATE TABLE"

@classmethod
def execute_model(cls, profile, model, materialization, sql_override=None,
def execute_model(cls, profile, project_cfg, model,
materialization, sql_override=None,
decorator=None, model_name=None):

if sql_override is None:
Expand All @@ -302,13 +319,15 @@ def execute_model(cls, profile, model, materialization, sql_override=None,
model_name = model.get('name')
model_schema = model.get('schema')

dataset = cls.get_dataset(profile, model_schema, model_name)
dataset = cls.get_dataset(profile, project_cfg,
model_schema, model_name)

if materialization == 'view':
res = cls.materialize_as_view(profile, dataset, model)
res = cls.materialize_as_view(profile, project_cfg, dataset, model)
elif materialization == 'table':
res = cls.materialize_as_table(profile, dataset, model,
sql_override, decorator)
res = cls.materialize_as_table(
profile, project_cfg, dataset, model,
sql_override, decorator)
else:
msg = "Invalid relation type: '{}'".format(materialization)
raise dbt.exceptions.RuntimeException(msg, model)
Expand Down Expand Up @@ -356,41 +375,42 @@ def add_begin_query(cls, profile, name):
'`add_begin_query` is not implemented for this adapter!')

@classmethod
def create_schema(cls, profile, schema, model_name=None):
def create_schema(cls, profile, project_cfg, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'create dataset', model_name):
client.create_dataset(dataset)

@classmethod
def drop_tables_in_schema(cls, profile, dataset):
def drop_tables_in_schema(cls, profile, project_cfg, dataset):
conn = cls.get_connection(profile)
client = conn.get('handle')

for table in client.list_tables(dataset):
client.delete_table(table.reference)

@classmethod
def drop_schema(cls, profile, schema, model_name=None):
def drop_schema(cls, profile, project_cfg, schema, model_name=None):
logger.debug('Dropping schema "%s".', schema)

if not cls.check_schema_exists(profile, schema, model_name):
if not cls.check_schema_exists(profile, project_cfg,
schema, model_name):
return

conn = cls.get_connection(profile)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'drop dataset', model_name):
cls.drop_tables_in_schema(profile, dataset)
cls.drop_tables_in_schema(profile, project_cfg, dataset)
client.delete_dataset(dataset)

@classmethod
def get_existing_schemas(cls, profile, model_name=None):
def get_existing_schemas(cls, profile, project_cfg, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -399,7 +419,8 @@ def get_existing_schemas(cls, profile, model_name=None):
return [ds.dataset_id for ds in all_datasets]

@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
def get_columns_in_table(cls, profile, project_cfg,
schema_name, table_name,
database=None, model_name=None):

# BigQuery does not have databases -- the database parameter is here
Expand All @@ -419,16 +440,15 @@ def get_columns_in_table(cls, profile, schema_name, table_name,

columns = []
for col in table_schema:
name = col.name
data_type = col.field_type

column = cls.Column(col.name, col.field_type, col.fields, col.mode)
column = cls.Column(
col.name, col.field_type, col.fields, col.mode)
columns.append(column)

return columns

@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
def check_schema_exists(cls, profile, project_cfg,
schema, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -437,7 +457,7 @@ def check_schema_exists(cls, profile, schema, model_name=None):
return any([ds.dataset_id == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
def get_dataset(cls, profile, project_cfg, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand Down Expand Up @@ -468,13 +488,18 @@ def quote(cls, identifier):
return '`{}`'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, schema, table, model_name=None):
def quote_schema_and_table(cls, profile, project_cfg, schema,
table, model_name=None):
return cls.render_relation(profile, project_cfg,
cls.quote(schema),
cls.quote(table))

@classmethod
def render_relation(cls, profile, project_cfg, schema, table):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
return '{}.{}.{}'.format(cls.quote(project),
cls.quote(schema),
cls.quote(table))
return '{}.{}.{}'.format(cls.quote(project), schema, table)

@classmethod
def convert_text_type(cls, agate_table, col_idx):
Expand Down Expand Up @@ -504,10 +529,11 @@ def _agate_to_schema(cls, agate_table, column_override):
return bq_schema

@classmethod
def load_dataframe(cls, profile, schema, table_name, agate_table,
def load_dataframe(cls, profile, project_cfg, schema,
table_name, agate_table,
column_override, model_name=None):
bq_schema = cls._agate_to_schema(agate_table, column_override)
dataset = cls.get_dataset(profile, schema, None)
dataset = cls.get_dataset(profile, project_cfg, schema, None)
table = dataset.table(table_name)
conn = cls.get_connection(profile, None)
client = conn.get('handle')
Expand All @@ -524,7 +550,7 @@ def load_dataframe(cls, profile, schema, table_name, agate_table,
cls.poll_until_job_completes(job, cls.get_timeout(conn))

@classmethod
def expand_target_column_types(cls, profile, temp_table, to_schema,
to_table, model_name=None):
def expand_target_column_types(cls, profile, project_cfg, temp_table,
to_schema, to_table, model_name=None):
# This is a no-op on BigQuery
pass
Loading

0 comments on commit 5344f54

Please sign in to comment.