Skip to content

Commit

Permalink
Merge pull request #1784 from fishtown-analytics/feature/external-sou…
Browse files Browse the repository at this point in the history
…rce-config

Feature/external source config
  • Loading branch information
jtcohen6 committed Sep 27, 2019
2 parents 812c549 + fcd86e3 commit aceda5e
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 58 deletions.
33 changes: 6 additions & 27 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

from hologram import JsonSchemaMixin
from hologram.helpers import (
StrEnum, register_pattern, ExtensibleJsonSchemaMixin
StrEnum, register_pattern
)

import dbt.clients.jinja
import dbt.flags
from dbt.contracts.graph.unparsed import (
UnparsedNode, UnparsedMacro, UnparsedDocumentationFile, Quoting,
UnparsedBaseNode, FreshnessThreshold
UnparsedBaseNode, FreshnessThreshold, ExternalTable,
AdditionalPropertiesAllowed
)
from dbt.contracts.util import Replaceable
from dbt.logger import GLOBAL_LOGGER as logger # noqa
Expand Down Expand Up @@ -50,7 +51,7 @@ def insensitive_patterns(*patterns: str):

@dataclass
class NodeConfig(
ExtensibleJsonSchemaMixin, Replaceable, MutableMapping[str, Any]
AdditionalPropertiesAllowed, Replaceable, MutableMapping[str, Any]
):
enabled: bool = True
materialized: str = 'view'
Expand All @@ -61,30 +62,6 @@ class NodeConfig(
quoting: Dict[str, Any] = field(default_factory=dict)
column_types: Dict[str, Any] = field(default_factory=dict)
tags: Union[List[str], str] = field(default_factory=list)
_extra: Dict[str, Any] = field(default_factory=dict)

@property
def extra(self):
return self._extra

@classmethod
def from_dict(cls, data, validate=True):
self = super().from_dict(data=data, validate=validate)
keys = self.to_dict(validate=False, omit_none=False)
for key, value in data.items():
if key not in keys:
self._extra[key] = value
return self

def to_dict(self, omit_none=True, validate=False):
data = super().to_dict(omit_none=omit_none, validate=validate)
data.update(self._extra)
return data

def replace(self, **kwargs):
dct = self.to_dict(omit_none=False, validate=False)
dct.update(kwargs)
return self.from_dict(dct)

@classmethod
def field_mapping(cls):
Expand Down Expand Up @@ -133,6 +110,7 @@ def __len__(self):
class ColumnInfo(JsonSchemaMixin, Replaceable):
name: str
description: str = ''
data_type: Optional[str] = None


# Docrefs are not quite like regular references, as they indicate what they
Expand Down Expand Up @@ -476,6 +454,7 @@ class ParsedSourceDefinition(
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
external: Optional[ExternalTable] = None
docrefs: List[Docref] = field(default_factory=list)
description: str = ''
columns: Dict[str, ColumnInfo] = field(default_factory=dict)
Expand Down
60 changes: 59 additions & 1 deletion core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dbt.node_types import NodeType
from dbt.contracts.util import Replaceable, Mergeable
from dbt.exceptions import CompilationException

from hologram import JsonSchemaMixin
from hologram.helpers import StrEnum
from hologram.helpers import StrEnum, ExtensibleJsonSchemaMixin

from dataclasses import dataclass, field
from datetime import timedelta
Expand Down Expand Up @@ -57,6 +58,7 @@ class UnparsedRunHook(UnparsedNode):
class NamedTested(JsonSchemaMixin, Replaceable):
name: str
description: str = ''
data_type: Optional[str] = None
tests: Optional[List[Union[Dict[str, Any], str]]] = None

def __post_init__(self):
Expand Down Expand Up @@ -129,6 +131,59 @@ def __bool__(self):
return self.warn_after is not None or self.error_after is not None


@dataclass
class AdditionalPropertiesAllowed(ExtensibleJsonSchemaMixin):
_extra: Dict[str, Any] = field(default_factory=dict)

@property
def extra(self):
return self._extra

@classmethod
def from_dict(cls, data, validate=True):
self = super().from_dict(data=data, validate=validate)
keys = self.to_dict(validate=False, omit_none=False)
for key, value in data.items():
if key not in keys:
self._extra[key] = value
return self

def to_dict(self, omit_none=True, validate=False):
data = super().to_dict(omit_none=omit_none, validate=validate)
data.update(self._extra)
return data

def replace(self, **kwargs):
dct = self.to_dict(omit_none=False, validate=False)
dct.update(kwargs)
return self.from_dict(dct)


@dataclass
class ExternalPartition(AdditionalPropertiesAllowed, Replaceable):
name: str = ''
description: str = ''
data_type: str = ''

def __post_init__(self):
if self.name == '' or self.data_type == '':
raise CompilationException(
'External partition columns must have names and data types'
)


@dataclass
class ExternalTable(AdditionalPropertiesAllowed, Mergeable):
location: Optional[str] = None
file_format: Optional[str] = None
row_format: Optional[str] = None
tbl_properties: Optional[str] = None
partitions: Optional[List[ExternalPartition]] = None

def __bool__(self):
return self.location is not None


@dataclass
class Quoting(JsonSchemaMixin, Mergeable):
database: Optional[bool] = None
Expand All @@ -144,6 +199,9 @@ class UnparsedSourceTableDefinition(ColumnDescription, NodeDescription):
freshness: Optional[FreshnessThreshold] = field(
default_factory=FreshnessThreshold
)
external: Optional[ExternalTable] = field(
default_factory=ExternalTable
)

def __post_init__(self):
NodeDescription.__post_init__(self)
Expand Down
9 changes: 6 additions & 3 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ def __init__(self):
self.column_info: Dict[str, ColumnInfo] = {}
self.docrefs: List[Docref] = []

def add(self, column_name, description):
def add(self, column_name, description, data_type):
self.column_info[column_name] = ColumnInfo(name=column_name,
description=description)
description=description,
data_type=data_type)


def collect_docrefs(
Expand Down Expand Up @@ -216,9 +217,10 @@ def parse_column(
) -> None:
column_name = column.name
description = column.description
data_type = column.data_type
collect_docrefs(block.target, refs, column_name, description)

refs.add(column_name, description)
refs.add(column_name, description, data_type)

if not column.tests:
return
Expand Down Expand Up @@ -348,6 +350,7 @@ def generate_source_node(
unique_id=unique_id,
name=table.name,
description=description,
external=table.external,
source_name=source.name,
source_description=source_description,
loader=source.loader,
Expand Down
Loading

0 comments on commit aceda5e

Please sign in to comment.