Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert all embedded adapter SQL into macros (#1204) #1207

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ def close(cls, connection):
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))

connection.state = 'closed'

Expand All @@ -331,7 +334,7 @@ def commit_if_has_connection(self, name):

:param str name: The name of the connection to use.
"""
connection = self.get_if_exists(name)
connection = self.in_use.get(name)
if connection:
self.commit(connection)

Expand Down
81 changes: 62 additions & 19 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from dbt.compat import abstractclassmethod, classmethod
from dbt.contracts.connection import Connection
from dbt.loader import GraphLoader
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.schema import Column
from dbt.utils import filter_null_values, translate_aliases
Expand All @@ -21,6 +22,7 @@
from dbt.adapters.base import BaseRelation
from dbt.adapters.cache import RelationsCache


GET_CATALOG_MACRO_NAME = 'get_catalog'


Expand Down Expand Up @@ -68,11 +70,14 @@ def test(row):
class BaseAdapter(object):
"""The BaseAdapter provides an abstract base class for adapters.

Adapters must implement the following methods. Some of these methods can be
safely overridden as a noop, where it makes sense (transactions on
databases that don't support them, for instance). Those methods are marked
with a (passable) in their docstrings. Check docstrings for type
information, etc.
Adapters must implement the following methods and macros. Some of the
methods can be safely overridden as a noop, where it makes sense
(transactions on databases that don't support them, for instance). Those
methods are marked with a (passable) in their docstrings. Check docstrings
for type information, etc.

To implement a macro, implement "${adapter_type}__${macro_name}". in the
adapter's internal project.

Methods:
- exception_handler
Expand All @@ -94,6 +99,9 @@ class BaseAdapter(object):
- convert_datetime_type
- convert_date_type
- convert_time_type

Macros:
- get_catalog
"""
requires = {}

Expand All @@ -106,6 +114,7 @@ def __init__(self, config):
self.config = config
self.cache = RelationsCache()
self.connections = self.ConnectionManager(config)
self._internal_manifest_lazy = None

###
# Methods that pass through to the connection manager
Expand Down Expand Up @@ -159,6 +168,19 @@ def type(cls):
"""
return cls.ConnectionManager.TYPE

@property
def _internal_manifest(self):
if self._internal_manifest_lazy is None:
manifest = GraphLoader.load_internal(self.config)
self._internal_manifest_lazy = manifest
return self._internal_manifest_lazy

def check_internal_manifest(self):
"""Return the internal manifest (used for executing macros) if it's
been initialized, otherwise return None.
"""
return self._internal_manifest_lazy

###
# Caching methods
###
Expand Down Expand Up @@ -667,21 +689,38 @@ def convert_agate_type(cls, agate_table, col_idx):
###
# Operations involving the manifest
###
def execute_macro(self, manifest, macro_name, project=None,
context_override=None):
def execute_macro(self, macro_name, manifest=None, project=None,
context_override=None, kwargs=None, release=False,
connection_name=None):
"""Look macro_name up in the manifest and execute its results.

:param Manifest manifest: The manifest to use for generating the base
macro execution context.
:param str macro_name: The name of the macro to execute.
:param Optional[Manifest] manifest: The manifest to use for generating
the base macro execution context. If none is provided, use the
internal manifest.
:param Optional[str] project: The name of the project to search in, or
None for the first match.
:param Optional[dict] context_override: An optional dict to update()
the macro execution context.
:param Optional[dict] kwargs: An optional dict of keyword args used to
pass to the macro.
:param bool release: If True, release the connection after executing.
:param Optional[str] connection_name: The connection name to use, or
use the macro name.

Return an an AttrDict with three attributes: 'table', 'data', and
'status'. 'table' is an agate.Table.
"""
if kwargs is None:
kwargs = {}
if context_override is None:
context_override = {}
if connection_name is None:
connection_name = macro_name

if manifest is None:
manifest = self._internal_manifest

macro = manifest.find_macro_by_name(macro_name, project)
if macro is None:
raise dbt.exceptions.RuntimeException(
Expand All @@ -692,15 +731,21 @@ def execute_macro(self, manifest, macro_name, project=None,
# This causes a reference cycle, as dbt.context.runtime.generate()
# ends up calling get_adapter, so the import has to be here.
import dbt.context.runtime
macro_context = dbt.context.runtime.generate(
macro_context = dbt.context.runtime.generate_macro(
macro,
self.config,
manifest
manifest,
connection_name
)
if context_override:
macro_context.update(context_override)
macro_context.update(context_override)

macro_function = macro.generator(macro_context)

result = macro.generator(macro_context)()
try:
result = macro_function(**kwargs)
finally:
if release:
self.release_connection(connection_name)
return result

@classmethod
Expand All @@ -716,11 +761,9 @@ def get_catalog(self, manifest):
"""
# make it a list so macros can index into it.
context = {'databases': list(manifest.get_used_databases())}
try:
table = self.execute_macro(manifest, GET_CATALOG_MACRO_NAME,
context_override=context)
finally:
self.release_connection(GET_CATALOG_MACRO_NAME)
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
context_override=context,
release=True)

results = self._catalog_filter_table(table, manifest)
return results
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/adapters/base/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def __init__(self, adapter, credentials, include_path, dependencies=None):
self.adapter = adapter
self.credentials = credentials
self.include_path = include_path
project_path = os.path.join(self.include_path, adapter.type())
project = Project.from_project_root(project_path, {})
project = Project.from_project_root(include_path, {})
self.project_name = project.project_name
if dependencies is None:
dependencies = []
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,6 @@ def reset_adapters():
"""Clear the adapters. This is useful for tests, which change configs.
"""
with _ADAPTER_LOCK:
for adapter in _ADAPTERS.values():
adapter.cleanup_connections()
_ADAPTERS.clear()
ADAPTER_TYPES.clear()
Loading