Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement relations api #727

Merged
merged 74 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
b8920b9
wip: remove quotes from schema/table names
drewbanin Mar 14, 2018
d6f58b5
update for new integration project model names
drewbanin Mar 15, 2018
dbcd124
more model name updates
drewbanin Mar 15, 2018
c205dfa
Merge branch 'integration-test-models-reserved-words' into SNOWFLAKE-…
drewbanin Mar 15, 2018
5032d1b
no more quotes!
drewbanin Mar 15, 2018
94502bc
quieter logs
drewbanin Mar 24, 2018
ee3406f
many changes to fix broken code, tests
drewbanin Mar 27, 2018
b27a374
fix for archival
drewbanin Mar 27, 2018
d52058f
fixes for bq
drewbanin Mar 28, 2018
0455ea9
relations api
Apr 1, 2018
75286a0
wip; good progress on Relation obj - implementing in materializations
drewbanin Apr 2, 2018
c706263
hack for ephemerals; dont include type
drewbanin Apr 2, 2018
83b617c
Merge branch 'development' of github.com:fishtown-analytics/dbt into …
Apr 4, 2018
aae580e
wiring up Relation to adapter fns, first pass
Apr 4, 2018
e015338
code moved around, defaultrelation written, starting to wire up
Apr 5, 2018
57981e0
rippin stuff out
Apr 5, 2018
9ecf92d
tests passing(ish)
Apr 5, 2018
370012d
query_for_existing ripped out
Apr 6, 2018
b2ee678
rename > rename_relation
Apr 6, 2018
803f1b7
bigquery, csv
Apr 6, 2018
f253261
remove extraneous dbt_project file
Apr 7, 2018
107f5ff
merged development
Apr 7, 2018
b9c7d92
move types into "api" namespace, fix archival
Apr 7, 2018
29f93b9
rip out table_exists
Apr 7, 2018
01c7cff
include relations
Apr 7, 2018
87adc8a
table / view macros using relations
Apr 7, 2018
79da833
fix some tests
Apr 7, 2018
82db36b
do not include schema if created as temp table
Apr 9, 2018
cadbad5
snowflake fixes
Apr 9, 2018
614d067
fix some tests
Apr 9, 2018
b7f948a
fix bq relations
Apr 9, 2018
a8db16a
bq getting closer
Apr 9, 2018
58c2712
archive macros
Apr 9, 2018
d4b2516
fixing postgres tests
Apr 9, 2018
cea3a43
fix postgres tests
Apr 9, 2018
5ce7aa7
Implement relations api dp fix (#729)
drewbanin Apr 9, 2018
1acde56
merged development
Apr 9, 2018
a466b81
address code review comments
Apr 9, 2018
2f4dcb9
deprecations
Apr 10, 2018
cd8df15
fix tests
Apr 10, 2018
c1cfc31
make relations hashable for compatibility with existing dbt code
Apr 10, 2018
9273e22
Merge branch 'development' of github.com:fishtown-analytics/dbt into …
Apr 10, 2018
596e433
rip out deprecations, make api more consistent
Apr 10, 2018
f62e47f
merge development
Apr 10, 2018
533f5c3
actually fix merge conflict
Apr 11, 2018
9fe9ca8
global quoting config
Apr 22, 2018
a0dedfa
fix default quoting config in project
Apr 22, 2018
909be11
merge development
Apr 24, 2018
9a635ca
merged implement-relations-api
Apr 24, 2018
4a550e5
relations, fix another merge conflict
Apr 24, 2018
ba58e7e
seed + relations
Apr 24, 2018
2f54e52
fix unit tests
Apr 24, 2018
1c61341
add snowflake quoting off tests
Apr 24, 2018
c7f83fd
fix bad merge
Apr 24, 2018
396ea65
Merge branch 'implement-relations-api' of github.com:fishtown-analyti…
Apr 24, 2018
8694d3f
fix for view --> table model switch
drewbanin Apr 24, 2018
852a0eb
Merge branch 'implement-relations-api' into quote-config
drewbanin Apr 24, 2018
5bafcd6
fix exception code for seed onto view
drewbanin Apr 24, 2018
ac62a4d
projects everywhere
Apr 24, 2018
8ee8e76
test fixes
Apr 24, 2018
dd707c8
switch test, fix schema creation / dropping
Apr 24, 2018
e269c25
quote tests properly
Apr 24, 2018
8cc3696
spruce up snowflake warning, add link
drewbanin Apr 25, 2018
a661512
fix snowflake tests
Apr 25, 2018
41aa908
Merge branch 'quote-config' of github.com:fishtown-analytics/dbt into…
Apr 25, 2018
126ab23
add already_exists tests
drewbanin Apr 25, 2018
e564dee
fix for incremental models, plus test
drewbanin Apr 26, 2018
d33ec80
fix unit tests, pep8
Apr 26, 2018
da5b82c
Merge pull request #742 from fishtown-analytics/quote-config
cmcarthur Apr 26, 2018
0338a48
merged development
Apr 26, 2018
124e14b
self code review
Apr 26, 2018
23d2a55
fix project var name from merge conflict
Apr 26, 2018
692cc55
pep8
Apr 26, 2018
c37ded2
fix relation schemas
Apr 26, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ test-integration:
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py27,integration-postgres-py36,integration-snowflake-py27,integration-snowflake-py36,integration-bigquery-py27,integration-bigquery-py36

test-new:
@echo "Test run starting..."
@echo "Changed test files:"
@echo "${changed_tests}"
@docker-compose run test /usr/src/app/test/runner.sh ${changed_tests}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good change :)

test-quick:
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py36 -- -x
7 changes: 7 additions & 0 deletions dbt/adapters/bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.adapters.bigquery.impl import BigQueryAdapter
from dbt.adapters.bigquery.relation import BigQueryRelation

__all__ = [
BigQueryAdapter,
BigQueryRelation,
]
148 changes: 87 additions & 61 deletions dbt/adapters/bigquery.py → dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from contextlib import contextmanager

import dbt.compat
import dbt.deprecations
import dbt.exceptions
import dbt.schema
import dbt.flags as flags
import dbt.clients.gcloud
import dbt.clients.agate_helper

from dbt.adapters.postgres import PostgresAdapter
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger

Expand All @@ -25,6 +27,7 @@
class BigQueryAdapter(PostgresAdapter):

context_functions = [
# deprecated -- use versions that take relations instead
"query_for_existing",
"execute_model",
"drop",
Expand All @@ -35,17 +38,24 @@ class BigQueryAdapter(PostgresAdapter):
"expand_target_column_types",
"load_dataframe",

# versions of adapter functions that take / return Relations
"list_relations",
"get_relation",
"drop_relation",
"rename_relation",

"get_columns_in_table"
]

Relation = BigQueryRelation
Column = dbt.schema.BigQueryColumn

SCOPE = ('https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive')

QUERY_TIMEOUT = 300

Column = dbt.schema.BigQueryColumn

@classmethod
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
Expand Down Expand Up @@ -165,60 +175,65 @@ def close(cls, connection):
return connection

@classmethod
def query_for_existing(cls, profile, schemas, model_name=None):
if not isinstance(schemas, (list, tuple)):
schemas = [schemas]

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')
def list_relations(cls, profile, project_cfg, schema, model_name=None):
connection = cls.get_connection(profile, model_name)
credentials = connection.get('credentials', {})
client = connection.get('handle')

all_tables = []
for schema in schemas:
dataset = cls.get_dataset(profile, schema, model_name)
all_tables.extend(client.list_tables(dataset))
bigquery_dataset = cls.get_dataset(
profile, project_cfg, schema, model_name)
all_tables = client.list_tables(bigquery_dataset)

relation_types = {
'TABLE': 'table',
'VIEW': 'view',
'EXTERNAL': 'external'
}

existing = [(table.table_id, relation_types.get(table.table_type))
for table in all_tables]

return dict(existing)
return [cls.Relation.create(
project=credentials.get('project'),
schema=schema,
identifier=table.table_id,
quote_policy={
'schema': True,
'identifier': True
},
type=relation_types.get(table.table_type))
for table in all_tables]

@classmethod
def table_exists(cls, profile, schema, table, model_name=None):
tables = cls.query_for_existing(profile, schema, model_name)
exists = tables.get(table) is not None
return exists

@classmethod
def drop(cls, profile, schema, relation, relation_type, model_name=None):
def drop_relation(cls, profile, project_cfg, relation, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
relation_object = dataset.table(relation)
dataset = cls.get_dataset(
profile, project_cfg, relation.schema, model_name)
relation_object = dataset.table(relation.identifier)
client.delete_table(relation_object)

@classmethod
def rename(cls, profile, schema, from_name, to_name, model_name=None):
def rename(cls, profile, project_cfg, schema,
from_name, to_name, model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename` is not implemented for this adapter!')

@classmethod
def rename_relation(cls, profile, project_cfg, from_relation, to_relation,
model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename_relation` is not implemented for this adapter!')

@classmethod
def get_timeout(cls, conn):
credentials = conn['credentials']
return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT)

@classmethod
def materialize_as_view(cls, profile, dataset, model):
def materialize_as_view(cls, profile, project_cfg, dataset, model):
model_name = model.get('name')
model_sql = model.get('injected_sql')

conn = cls.get_connection(profile, model_name)
conn = cls.get_connection(profile, project_cfg, model_name)
client = conn.get('handle')

view_ref = dataset.table(model_name)
Expand Down Expand Up @@ -249,21 +264,22 @@ def poll_until_job_completes(cls, job, timeout):
raise job.exception()

@classmethod
def make_date_partitioned_table(cls, profile, dataset_name, identifier,
model_name=None):
def make_date_partitioned_table(cls, profile, project_cfg, dataset_name,
identifier, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, dataset_name, identifier)
dataset = cls.get_dataset(profile, project_cfg,
dataset_name, identifier)
table_ref = dataset.table(identifier)
table = google.cloud.bigquery.Table(table_ref)
table.partitioning_type = 'DAY'

return client.create_table(table)

@classmethod
def materialize_as_table(cls, profile, dataset, model, model_sql,
decorator=None):
def materialize_as_table(cls, profile, project_cfg, dataset,
model, model_sql, decorator=None):
model_name = model.get('name')

conn = cls.get_connection(profile, model_name)
Expand All @@ -289,7 +305,8 @@ def materialize_as_table(cls, profile, dataset, model, model_sql,
return "CREATE TABLE"

@classmethod
def execute_model(cls, profile, model, materialization, sql_override=None,
def execute_model(cls, profile, project_cfg, model,
materialization, sql_override=None,
decorator=None, model_name=None):

if sql_override is None:
Expand All @@ -302,13 +319,15 @@ def execute_model(cls, profile, model, materialization, sql_override=None,
model_name = model.get('name')
model_schema = model.get('schema')

dataset = cls.get_dataset(profile, model_schema, model_name)
dataset = cls.get_dataset(profile, project_cfg,
model_schema, model_name)

if materialization == 'view':
res = cls.materialize_as_view(profile, dataset, model)
res = cls.materialize_as_view(profile, project_cfg, dataset, model)
elif materialization == 'table':
res = cls.materialize_as_table(profile, dataset, model,
sql_override, decorator)
res = cls.materialize_as_table(
profile, project_cfg, dataset, model,
sql_override, decorator)
else:
msg = "Invalid relation type: '{}'".format(materialization)
raise dbt.exceptions.RuntimeException(msg, model)
Expand Down Expand Up @@ -356,41 +375,42 @@ def add_begin_query(cls, profile, name):
'`add_begin_query` is not implemented for this adapter!')

@classmethod
def create_schema(cls, profile, schema, model_name=None):
def create_schema(cls, profile, project_cfg, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'create dataset', model_name):
client.create_dataset(dataset)

@classmethod
def drop_tables_in_schema(cls, profile, dataset):
def drop_tables_in_schema(cls, profile, project_cfg, dataset):
conn = cls.get_connection(profile)
client = conn.get('handle')

for table in client.list_tables(dataset):
client.delete_table(table.reference)

@classmethod
def drop_schema(cls, profile, schema, model_name=None):
def drop_schema(cls, profile, project_cfg, schema, model_name=None):
logger.debug('Dropping schema "%s".', schema)

if not cls.check_schema_exists(profile, schema, model_name):
if not cls.check_schema_exists(profile, project_cfg,
schema, model_name):
return

conn = cls.get_connection(profile)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'drop dataset', model_name):
cls.drop_tables_in_schema(profile, dataset)
cls.drop_tables_in_schema(profile, project_cfg, dataset)
client.delete_dataset(dataset)

@classmethod
def get_existing_schemas(cls, profile, model_name=None):
def get_existing_schemas(cls, profile, project_cfg, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -399,7 +419,8 @@ def get_existing_schemas(cls, profile, model_name=None):
return [ds.dataset_id for ds in all_datasets]

@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
def get_columns_in_table(cls, profile, project_cfg,
schema_name, table_name,
database=None, model_name=None):

# BigQuery does not have databases -- the database parameter is here
Expand All @@ -419,16 +440,15 @@ def get_columns_in_table(cls, profile, schema_name, table_name,

columns = []
for col in table_schema:
name = col.name
data_type = col.field_type

column = cls.Column(col.name, col.field_type, col.fields, col.mode)
column = cls.Column(
col.name, col.field_type, col.fields, col.mode)
columns.append(column)

return columns

@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
def check_schema_exists(cls, profile, project_cfg,
schema, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -437,7 +457,7 @@ def check_schema_exists(cls, profile, schema, model_name=None):
return any([ds.dataset_id == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
def get_dataset(cls, profile, project_cfg, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand Down Expand Up @@ -468,13 +488,18 @@ def quote(cls, identifier):
return '`{}`'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, schema, table, model_name=None):
def quote_schema_and_table(cls, profile, project_cfg, schema,
table, model_name=None):
return cls.render_relation(profile, project_cfg,
cls.quote(schema),
cls.quote(table))

@classmethod
def render_relation(cls, profile, project_cfg, schema, table):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
return '{}.{}.{}'.format(cls.quote(project),
cls.quote(schema),
cls.quote(table))
return '{}.{}.{}'.format(cls.quote(project), schema, table)

@classmethod
def convert_text_type(cls, agate_table, col_idx):
Expand Down Expand Up @@ -504,10 +529,11 @@ def _agate_to_schema(cls, agate_table, column_override):
return bq_schema

@classmethod
def load_dataframe(cls, profile, schema, table_name, agate_table,
def load_dataframe(cls, profile, project_cfg, schema,
table_name, agate_table,
column_override, model_name=None):
bq_schema = cls._agate_to_schema(agate_table, column_override)
dataset = cls.get_dataset(profile, schema, None)
dataset = cls.get_dataset(profile, project_cfg, schema, None)
table = dataset.table(table_name)
conn = cls.get_connection(profile, None)
client = conn.get('handle')
Expand All @@ -524,7 +550,7 @@ def load_dataframe(cls, profile, schema, table_name, agate_table,
cls.poll_until_job_completes(job, cls.get_timeout(conn))

@classmethod
def expand_target_column_types(cls, profile, temp_table, to_schema,
to_table, model_name=None):
def expand_target_column_types(cls, profile, project_cfg, temp_table,
to_schema, to_table, model_name=None):
# This is a no-op on BigQuery
pass
Loading