Skip to content

Commit

Permalink
Merge pull request #1983 from fishtown-analytics/fix/restore-drop-schema
Browse files Browse the repository at this point in the history
Restore drop_schema to adapter in jinja context (#1980)
  • Loading branch information
beckjake authored Dec 5, 2019
2 parents a993d9c + 143402d commit d14d250
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 20 deletions.
1 change: 1 addition & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ def create_schema(self, database: str, schema: str):
)

@abc.abstractmethod
@available.parse_none
def drop_schema(self, database: str, schema: str):
"""Drop the given schema (and everything in it) if it exists."""
raise dbt.exceptions.NotImplementedException(
Expand Down
60 changes: 45 additions & 15 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from collections import namedtuple
import threading
from copy import deepcopy
from typing import List, Iterable
import threading

from dbt.logger import CACHE_LOGGER as logger
import dbt.exceptions

Expand Down Expand Up @@ -181,29 +183,36 @@ def __init__(self):
self.lock = threading.RLock()
self.schemas = set()

def add_schema(self, database, schema):
def add_schema(self, database: str, schema: str):
"""Add a schema to the set of known schemas (case-insensitive)
:param str database: The database name to add.
:param str schema: The schema name to add.
:param database: The database name to add.
:param schema: The schema name to add.
"""
self.schemas.add((_lower(database), _lower(schema)))

def remove_schema(self, database, schema):
"""Remove a schema from the set of known schemas (case-insensitive)
def drop_schema(self, database: str, schema: str):
"""Drop the given schema and remove it from the set of known schemas.
If the schema does not exist, it will be ignored - it could just be a
temporary table.
:param str database: The database name to remove.
:param str schema: The schema name to remove.
Then remove all its contents (and their dependents, etc) as well.
"""
self.schemas.discard((_lower(database), _lower(schema)))
key = (_lower(database), _lower(schema))
if key not in self.schemas:
return

def update_schemas(self, schemas):
# avoid iterating over self.relations while removing things by
# collecting the list first.

with self.lock:
to_remove = self._list_relations_in_schema(database, schema)
self._remove_all(to_remove)
# handle a drop_schema race by using discard() over remove()
self.schemas.discard(key)

def update_schemas(self, schemas: Iterable[str]):
"""Add multiple schemas to the set of known schemas (case-insensitive)
:param Iterable[str] schemas: An iterable of the schema names to add.
:param schemas: An iterable of the schema names to add.
"""
self.schemas.update((_lower(d), _lower(s)) for (d, s) in schemas)

Expand Down Expand Up @@ -402,7 +411,6 @@ def _rename_relation(self, old_key, new_relation):

self.relations[new_key] = relation
# also fixup the schemas!
self.remove_schema(old_key.database, old_key.schema)
self.add_schema(new_key.database, new_key.schema)

return True
Expand Down Expand Up @@ -489,3 +497,25 @@ def clear(self):
with self.lock:
self.relations.clear()
self.schemas.clear()

def _list_relations_in_schema(
self, database: str, schema: str
) -> List[_CachedRelation]:
"""Get the relations in a schema. Callers should hold the lock."""
key = (_lower(database), _lower(schema))

to_remove: List[_CachedRelation] = []
for cachekey, relation in self.relations.items():
if (cachekey.database, cachekey.schema) == key:
to_remove.append(relation)
return to_remove

def _remove_all(self, to_remove: List[_CachedRelation]):
"""Remove all the listed relations. Ignore relations that have been
cascaded out.
"""
for relation in to_remove:
# it may have been cascaded out already
drop_key = _make_key(relation)
if drop_key in self.relations:
self.drop(drop_key)
7 changes: 5 additions & 2 deletions core/dbt/adapters/sql/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,18 @@ def create_schema(self, database, schema):
}
self.execute_macro(CREATE_SCHEMA_MACRO_NAME, kwargs=kwargs)
self.commit_if_has_connection()
# we can't update the cache here, as if the schema already existed we
# don't want to (incorrectly) say that it's empty

def drop_schema(self, database, schema):
logger.debug('Dropping schema "{}"."{}".', database, schema)
kwargs = {
'database_name': self.quote_as_configured(database, 'database'),
'schema_name': self.quote_as_configured(schema, 'schema'),
}
self.execute_macro(DROP_SCHEMA_MACRO_NAME,
kwargs=kwargs)
self.execute_macro(DROP_SCHEMA_MACRO_NAME, kwargs=kwargs)
# we can update the cache here
self.cache.drop_schema(database, schema)

def list_relations_without_caching(self, information_schema, schema):
kwargs = {'information_schema': information_schema, 'schema': schema}
Expand Down
1 change: 1 addition & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def create_schema(self, database: str, schema: str) -> None:
def drop_schema(self, database: str, schema: str) -> None:
logger.debug('Dropping schema "{}.{}".', database, schema)
self.connections.drop_dataset(database, schema)
self.cache.drop_schema(database, schema)

@classmethod
def quote(cls, identifier: str) -> str:
Expand Down
3 changes: 3 additions & 0 deletions test/integration/054_adapter_methods_test/models/expected.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- make sure this runs after 'model'
-- {{ ref('model') }}
select 2 as id
19 changes: 19 additions & 0 deletions test/integration/054_adapter_methods_test/models/model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

{% set upstream = ref('upstream') %}

{% if execute %}
{# don't ever do any of this #}
{%- do adapter.drop_schema(upstream.database, upstream.schema) -%}
{% set existing = adapter.get_relation(upstream.database, upstream.schema, upstream.identifier) %}
{% if existing is not none %}
{% do exceptions.raise_compiler_error('expected ' ~ ' to not exist, but it did') %}
{% endif %}

{%- do adapter.create_schema(upstream.database, upstream.schema) -%}

{% set sql = create_view_as(upstream, 'select 2 as id') %}
{% do run_query(sql) %}
{% endif %}


select * from {{ upstream }}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id
31 changes: 31 additions & 0 deletions test/integration/054_adapter_methods_test/test_adapter_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from test.integration.base import DBTIntegrationTest, use_profile


class TestBaseCaching(DBTIntegrationTest):
@property
def schema(self):
return "caching_038"

@property
def models(self):
return "models"

@use_profile('postgres')
def test_postgres_adapter_methods(self):
self.run_dbt()
self.assertTablesEqual('model', 'expected')

@use_profile('redshift')
def test_redshift_adapter_methods(self):
self.run_dbt()
self.assertTablesEqual('model', 'expected')

@use_profile('snowflake')
def test_snowflake_adapter_methods(self):
self.run_dbt()
self.assertTablesEqual('MODEL', 'EXPECTED')

@use_profile('bigquery')
def test_bigquery_adapter_methods(self):
self.run_dbt()
self.assertTablesEqual('model', 'expected')
9 changes: 6 additions & 3 deletions test/unit/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def test_dest_different_db(self):
make_relation('DBT_2', 'schema', 'foo'))
self.assert_relations_exist('DBT_2', 'schema', 'foo')
self.assert_relations_do_not_exist('DBT', 'schema', 'foo')
self.assertEqual(self.cache.schemas, {('dbt_2', 'schema')})
# we know about both schemas: dbt has nothing, dbt_2 has something.
self.assertEqual(self.cache.schemas, {('dbt_2', 'schema'), ('dbt', 'schema')})
self.assertEqual(len(self.cache.relations), 1)

def test_rename_identifier(self):
Expand All @@ -156,7 +157,8 @@ def test_rename_db(self):
self.assertEqual(len(self.cache.get_relations('DBT_2', 'schema')), 1)
self.assert_relations_exist('DBT_2', 'schema', 'foo')
self.assert_relations_do_not_exist('DBT', 'schema', 'foo')
self.assertEqual(self.cache.schemas, {('dbt_2', 'schema')})
# we know about both schemas: dbt has nothing, dbt_2 has something.
self.assertEqual(self.cache.schemas, {('dbt_2', 'schema'), ('dbt', 'schema')})

relation = self.cache.relations[('dbt_2', 'schema', 'foo')]
self.assertEqual(relation.inner.database, 'DBT_2')
Expand All @@ -174,7 +176,8 @@ def test_rename_schema(self):
self.assertEqual(len(self.cache.get_relations('DBT', 'schema_2')), 1)
self.assert_relations_exist('DBT', 'schema_2', 'foo')
self.assert_relations_do_not_exist('DBT', 'schema', 'foo')
self.assertEqual(self.cache.schemas, {('dbt', 'schema_2')})
# we know about both schemas: schema has nothing, schema_2 has something.
self.assertEqual(self.cache.schemas, {('dbt', 'schema_2'), ('dbt', 'schema')})

relation = self.cache.relations[('dbt', 'schema_2', 'foo')]
self.assertEqual(relation.inner.database, 'DBT')
Expand Down

0 comments on commit d14d250

Please sign in to comment.