Skip to content

Commit

Permalink
Merge pull request #1833 from fishtown-analytics/fix/postgres-externa…
Browse files Browse the repository at this point in the history
…l-materialized-views

postgres: gracefully handle materialized views (#1698)
  • Loading branch information
beckjake authored Oct 15, 2019
2 parents e83aab2 + fee9382 commit d5824d9
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 49 deletions.
12 changes: 8 additions & 4 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,21 @@ def is_view(self) -> bool:
return self.type == RelationType.View

@classproperty
def Table(self) -> str:
def Table(cls) -> str:
return str(RelationType.Table)

@classproperty
def CTE(self) -> str:
def CTE(cls) -> str:
return str(RelationType.CTE)

@classproperty
def View(self) -> str:
def View(cls) -> str:
return str(RelationType.View)

@classproperty
def External(self) -> str:
def External(cls) -> str:
return str(RelationType.External)

@classproperty
def RelationType(cls) -> Type[RelationType]:
return RelationType
31 changes: 23 additions & 8 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ def _add_link(self, referenced_key, dependent_key):
:raises InternalError: If either entry does not exist.
"""
referenced = self.relations.get(referenced_key)
if referenced is None:
return
if referenced is None:
dbt.exceptions.raise_cache_inconsistent(
'in add_link, referenced link key {} not in cache!'
Expand All @@ -268,8 +270,8 @@ def _add_link(self, referenced_key, dependent_key):
referenced.add_reference(dependent)

def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. Both the old and
new entries must already exist in the database.
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.
The dependent model refers _to_ the referenced model. So, given
arguments of (jake_test, bar, jake_test, foo):
Expand All @@ -281,23 +283,36 @@ def add_link(self, referenced, dependent):
:param BaseRelation dependent: The dependent model.
:raises InternalError: If either entry does not exist.
"""
referenced = _make_key(referenced)
if (referenced.database, referenced.schema) not in self:
ref_key = _make_key(referenced)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
logger.debug(
'{dep!s} references {ref!s} but {ref.database}.{ref.schema} '
'is not in the cache, skipping assumed external relation'
.format(dep=dependent, ref=referenced)
.format(dep=dependent, ref=ref_key)
)
return
dependent = _make_key(dependent)
if ref_key not in self.relations:
# Insert a dummy "external" relation.
referenced = referenced.replace(
type=referenced.RelationType.External
)
self.add(referenced)

dep_key = _make_key(dependent)
if dep_key not in self.relations:
# Insert a dummy "external" relation.
dependent = dependent.replace(
type=referenced.RelationType.External
)
self.add(dependent)
logger.debug(
'adding link, {!s} references {!s}'.format(dependent, referenced)
'adding link, {!s} references {!s}'.format(dep_key, ref_key)
)
with self.lock:
self._add_link(referenced, dependent)
self._add_link(ref_key, dep_key)

def add(self, relation):
"""Add the relation inner to the cache, under the schema schema and
Expand Down
14 changes: 7 additions & 7 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ def _link_cached_database_relations(self, schemas):
database = self.config.credentials.database
table = self.execute_macro(GET_RELATIONS_MACRO_NAME)

for (refed_schema, refed_name, dep_schema, dep_name) in table:
referenced = self.Relation.create(
database=database,
schema=refed_schema,
identifier=refed_name
)
for (dep_schema, dep_name, refed_schema, refed_name) in table:
dependent = self.Relation.create(
database=database,
schema=dep_schema,
identifier=dep_name
)
referenced = self.Relation.create(
database=database,
schema=refed_schema,
identifier=refed_name
)

# don't record in cache if this relation isn't in a relevant
# schema
if refed_schema.lower() in schemas:
self.cache.add_link(dependent, referenced)
self.cache.add_link(referenced, dependent)

def _get_cache_schemas(self, manifest, exec_only=False):
# postgres/redshift only allow one database (the main one)
Expand Down
36 changes: 26 additions & 10 deletions test/integration/001_simple_copy_test/test_simple_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def models(self):

class TestSimpleCopy(BaseTestSimpleCopy):

@property
def project_config(self):
return {"data-paths": [self.dir("seed-initial")]}

@use_profile("postgres")
def test__postgres__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand All @@ -36,10 +38,30 @@ def test__postgres__simple_copy(self):

self.assertManyTablesEqual(["seed", "view_model", "incremental", "materialized", "get_and_ref"])

@use_profile('postgres')
def test__postgres__simple_copy_with_materialized_views(self):
self.run_sql('''
create table {schema}.unrelated_table (id int)
'''.format(schema=self.unique_schema())
)
self.run_sql('''
create materialized view {schema}.unrelated_materialized_view as (
select * from {schema}.unrelated_table
)
'''.format(schema=self.unique_schema()))
self.run_sql('''
create view {schema}.unrelated_view as (
select * from {schema}.unrelated_materialized_view
)
'''.format(schema=self.unique_schema()))

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
self.assertEqual(len(results), 7)

@use_profile("postgres")
def test__postgres__dbt_doesnt_run_empty_models(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand All @@ -66,8 +88,6 @@ def test__presto__simple_copy(self):

@use_profile("snowflake")
def test__snowflake__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand All @@ -92,7 +112,6 @@ def test__snowflake__simple_copy(self):
@use_profile("snowflake")
def test__snowflake__simple_copy__quoting_off(self):
self.use_default_project({
"data-paths": [self.dir("seed-initial")],
"quoting": {"identifier": False},
})

Expand Down Expand Up @@ -124,7 +143,6 @@ def test__snowflake__simple_copy__quoting_off(self):
@use_profile("snowflake")
def test__snowflake__seed__quoting_switch(self):
self.use_default_project({
"data-paths": [self.dir("seed-initial")],
"quoting": {"identifier": False},
})

Expand All @@ -145,8 +163,6 @@ def test__snowflake__seed__quoting_switch(self):

@use_profile("bigquery")
def test__bigquery__simple_copy(self):
self.use_default_project({"data-paths": [self.dir("seed-initial")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)
results = self.run_dbt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ def project_config(self):
}
}

def setUp(self):
super().setUp()
# self.use_default_config()
self.run_sql_file("seed.sql")

@use_profile('postgres')
def test__postgres__simple_reference(self):
self.use_default_project()
self.run_sql_file("seed.sql")

results = self.run_dbt()
# ephemeral_copy doesn't show up in results
Expand Down Expand Up @@ -59,8 +62,6 @@ def test__postgres__simple_reference(self):

@use_profile('snowflake')
def test__snowflake__simple_reference(self):
self.use_default_project()
self.run_sql_file("seed.sql")

results = self.run_dbt()
self.assertEqual(len(results), 8)
Expand All @@ -83,8 +84,6 @@ def test__snowflake__simple_reference(self):

@use_profile('postgres')
def test__postgres__simple_reference_with_models(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand All @@ -101,8 +100,6 @@ def test__postgres__simple_reference_with_models(self):

@use_profile('postgres')
def test__postgres__simple_reference_with_models_and_children(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand Down Expand Up @@ -139,8 +136,6 @@ def test__postgres__simple_reference_with_models_and_children(self):

@use_profile('snowflake')
def test__snowflake__simple_reference_with_models(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy & ephemeral_copy
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand All @@ -157,8 +152,6 @@ def test__snowflake__simple_reference_with_models(self):

@use_profile('snowflake')
def test__snowflake__simple_reference_with_models_and_children(self):
self.use_default_project()
self.run_sql_file("seed.sql")

# Run materialized_copy, ephemeral_copy, and their dependents
# ephemeral_copy should not actually be materialized b/c it is ephemeral
Expand Down
22 changes: 14 additions & 8 deletions test/unit/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,22 @@ def setUp(self):
self.cache.add(make_relation('dbt', 'schema_2', 'bar'))

def test_no_src(self):
# src does not exist (but similar names do)
with self.assertRaises(dbt.exceptions.InternalException):
self.cache.add_link(make_relation('dbt', 'schema', 'bar'),
make_relation('dbt', 'schema', 'foo'))
self.assert_relations_exist('dbt', 'schema', 'foo')
self.assert_relations_do_not_exist('dbt', 'schema', 'bar')

self.cache.add_link(make_relation('dbt', 'schema', 'bar'),
make_relation('dbt', 'schema', 'foo'))

self.assert_relations_exist('dbt', 'schema', 'foo', 'bar')

def test_no_dst(self):
# dst does not exist (but similar names do)
with self.assertRaises(dbt.exceptions.InternalException):
self.cache.add_link(make_relation('dbt', 'schema', 'foo'),
make_relation('dbt', 'schema', 'bar'))
self.assert_relations_exist('dbt', 'schema', 'foo')
self.assert_relations_do_not_exist('dbt', 'schema', 'bar')

self.cache.add_link(make_relation('dbt', 'schema', 'foo'),
make_relation('dbt', 'schema', 'bar'))

self.assert_relations_exist('dbt', 'schema', 'foo', 'bar')


class TestRename(TestCache):
Expand Down

0 comments on commit d5824d9

Please sign in to comment.