forked from amundsen-io/amundsen
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Create a RedshiftMetadataExtractor that supports late binding v…
…iews (amundsen-io#356) Signed-off-by: Nathan Lawrence <[email protected]> Co-authored-by: Nathan Lawrence <[email protected]>
- Loading branch information
1 parent
871a176
commit 4113cfd
Showing
5 changed files
with
355 additions
and
131 deletions.
There are no files selected for viewing
116 changes: 116 additions & 0 deletions
116
databuilder/extractor/base_postgres_metadata_extractor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# Copyright Contributors to the Amundsen project. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import abc | ||
import logging | ||
from collections import namedtuple | ||
|
||
from pyhocon import ConfigFactory, ConfigTree | ||
from typing import Iterator, Union, Dict, Any | ||
|
||
from databuilder import Scoped | ||
from databuilder.extractor.base_extractor import Extractor | ||
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor | ||
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata | ||
from itertools import groupby | ||
|
||
|
||
TableKey = namedtuple('TableKey', ['schema', 'table_name']) | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class BasePostgresMetadataExtractor(Extractor): | ||
""" | ||
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor | ||
""" | ||
|
||
# CONFIG KEYS | ||
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' | ||
CLUSTER_KEY = 'cluster_key' | ||
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name' | ||
DATABASE_KEY = 'database_key' | ||
|
||
# Default values | ||
DEFAULT_CLUSTER_NAME = 'master' | ||
|
||
DEFAULT_CONFIG = ConfigFactory.from_dict( | ||
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True} | ||
) | ||
|
||
@abc.abstractmethod | ||
def get_sql_statement(self, use_catalog_as_cluster_name: bool, where_clause_suffix: str) -> Any: | ||
""" | ||
:return: Provides a record or None if no more to extract | ||
""" | ||
return None | ||
|
||
def init(self, conf: ConfigTree) -> None: | ||
conf = conf.with_fallback(BasePostgresMetadataExtractor.DEFAULT_CONFIG) | ||
self._cluster = '{}'.format(conf.get_string(BasePostgresMetadataExtractor.CLUSTER_KEY)) | ||
|
||
self._database = conf.get_string(BasePostgresMetadataExtractor.DATABASE_KEY, default='postgres') | ||
|
||
self.sql_stmt = self.get_sql_statement( | ||
use_catalog_as_cluster_name=conf.get_bool(BasePostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME), | ||
where_clause_suffix=conf.get_string(BasePostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY), | ||
) | ||
|
||
self._alchemy_extractor = SQLAlchemyExtractor() | ||
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\ | ||
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) | ||
|
||
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL) | ||
|
||
LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt)) | ||
|
||
self._alchemy_extractor.init(sql_alch_conf) | ||
self._extract_iter: Union[None, Iterator] = None | ||
|
||
def extract(self) -> Union[TableMetadata, None]: | ||
if not self._extract_iter: | ||
self._extract_iter = self._get_extract_iter() | ||
try: | ||
return next(self._extract_iter) | ||
except StopIteration: | ||
return None | ||
|
||
def _get_extract_iter(self) -> Iterator[TableMetadata]: | ||
""" | ||
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata | ||
:return: | ||
""" | ||
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key): | ||
columns = [] | ||
|
||
for row in group: | ||
last_row = row | ||
columns.append(ColumnMetadata(row['col_name'], row['col_description'], | ||
row['col_type'], row['col_sort_order'])) | ||
|
||
yield TableMetadata(self._database, last_row['cluster'], | ||
last_row['schema'], | ||
last_row['name'], | ||
last_row['description'], | ||
columns) | ||
|
||
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]: | ||
""" | ||
Provides iterator of result row from SQLAlchemy extractor | ||
:return: | ||
""" | ||
row = self._alchemy_extractor.extract() | ||
while row: | ||
yield row | ||
row = self._alchemy_extractor.extract() | ||
|
||
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]: | ||
""" | ||
Table key consists of schema and table name | ||
:param row: | ||
:return: | ||
""" | ||
if row: | ||
return TableKey(schema=row['schema'], table_name=row['name']) | ||
|
||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,132 +1,43 @@ | ||
# Copyright Contributors to the Amundsen project. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
from collections import namedtuple | ||
from pyhocon import ConfigFactory, ConfigTree # noqa: F401 | ||
from typing import Iterator, Union, Dict, Any # noqa: F401 | ||
|
||
from pyhocon import ConfigFactory, ConfigTree | ||
from typing import Iterator, Union, Dict, Any | ||
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor | ||
|
||
from databuilder import Scoped | ||
from databuilder.extractor.base_extractor import Extractor | ||
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor | ||
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata | ||
from itertools import groupby | ||
|
||
|
||
TableKey = namedtuple('TableKey', ['schema', 'table_name']) | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class PostgresMetadataExtractor(Extractor): | ||
class PostgresMetadataExtractor(BasePostgresMetadataExtractor): | ||
""" | ||
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor | ||
""" | ||
# SELECT statement from postgres information_schema to extract table and column metadata | ||
SQL_STATEMENT = """ | ||
SELECT | ||
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description | ||
,c.column_name as col_name, c.data_type as col_type | ||
, pgcd.description as col_description, ordinal_position as col_sort_order | ||
FROM INFORMATION_SCHEMA.COLUMNS c | ||
INNER JOIN | ||
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname | ||
LEFT JOIN | ||
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position | ||
LEFT JOIN | ||
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0 | ||
{where_clause_suffix} | ||
ORDER by cluster, schema, name, col_sort_order ; | ||
""" | ||
|
||
# CONFIG KEYS | ||
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' | ||
CLUSTER_KEY = 'cluster_key' | ||
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name' | ||
DATABASE_KEY = 'database_key' | ||
|
||
# Default values | ||
DEFAULT_CLUSTER_NAME = 'master' | ||
|
||
DEFAULT_CONFIG = ConfigFactory.from_dict( | ||
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True} | ||
) | ||
|
||
def init(self, conf: ConfigTree) -> None: | ||
conf = conf.with_fallback(PostgresMetadataExtractor.DEFAULT_CONFIG) | ||
self._cluster = '{}'.format(conf.get_string(PostgresMetadataExtractor.CLUSTER_KEY)) | ||
|
||
if conf.get_bool(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): | ||
def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix): | ||
# type: (bool, str) -> str | ||
if use_catalog_as_cluster_name: | ||
cluster_source = "c.table_catalog" | ||
else: | ||
cluster_source = "'{}'".format(self._cluster) | ||
|
||
self._database = conf.get_string(PostgresMetadataExtractor.DATABASE_KEY, default='postgres') | ||
|
||
self.sql_stmt = PostgresMetadataExtractor.SQL_STATEMENT.format( | ||
where_clause_suffix=conf.get_string(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY), | ||
cluster_source=cluster_source | ||
return """ | ||
SELECT | ||
{cluster_source} as cluster, c.table_schema as schema, c.table_name as name, pgtd.description as description | ||
,c.column_name as col_name, c.data_type as col_type | ||
, pgcd.description as col_description, ordinal_position as col_sort_order | ||
FROM INFORMATION_SCHEMA.COLUMNS c | ||
INNER JOIN | ||
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname | ||
LEFT JOIN | ||
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position | ||
LEFT JOIN | ||
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0 | ||
{where_clause_suffix} | ||
ORDER by cluster, schema, name, col_sort_order ; | ||
""".format( | ||
cluster_source=cluster_source, | ||
where_clause_suffix=where_clause_suffix, | ||
) | ||
|
||
self._alchemy_extractor = SQLAlchemyExtractor() | ||
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\ | ||
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) | ||
|
||
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL) | ||
|
||
LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt)) | ||
|
||
self._alchemy_extractor.init(sql_alch_conf) | ||
self._extract_iter: Union[None, Iterator] = None | ||
|
||
def extract(self) -> Union[TableMetadata, None]: | ||
if not self._extract_iter: | ||
self._extract_iter = self._get_extract_iter() | ||
try: | ||
return next(self._extract_iter) | ||
except StopIteration: | ||
return None | ||
|
||
def get_scope(self) -> str: | ||
def get_scope(self): | ||
# type: () -> str | ||
return 'extractor.postgres_metadata' | ||
|
||
def _get_extract_iter(self) -> Iterator[TableMetadata]: | ||
""" | ||
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata | ||
:return: | ||
""" | ||
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key): | ||
columns = [] | ||
|
||
for row in group: | ||
last_row = row | ||
columns.append(ColumnMetadata(row['col_name'], row['col_description'], | ||
row['col_type'], row['col_sort_order'])) | ||
|
||
yield TableMetadata(self._database, last_row['cluster'], | ||
last_row['schema'], | ||
last_row['name'], | ||
last_row['description'], | ||
columns) | ||
|
||
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]: | ||
""" | ||
Provides iterator of result row from SQLAlchemy extractor | ||
:return: | ||
""" | ||
row = self._alchemy_extractor.extract() | ||
while row: | ||
yield row | ||
row = self._alchemy_extractor.extract() | ||
|
||
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]: | ||
""" | ||
Table key consists of schema and table name | ||
:param row: | ||
:return: | ||
""" | ||
if row: | ||
return TableKey(schema=row['schema'], table_name=row['name']) | ||
|
||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# Copyright Contributors to the Amundsen project. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from pyhocon import ConfigFactory, ConfigTree # noqa: F401 | ||
from typing import Iterator, Union, Dict, Any # noqa: F401 | ||
|
||
from databuilder.extractor.base_postgres_metadata_extractor import BasePostgresMetadataExtractor | ||
|
||
|
||
class RedshiftMetadataExtractor(BasePostgresMetadataExtractor): | ||
""" | ||
Extracts Redshift table and column metadata from underlying meta store database using SQLAlchemyExtractor | ||
This differs from the PostgresMetadataExtractor because in order to support Redshift's late binding views, | ||
we need to join the INFORMATION_SCHEMA data against the function PG_GET_LATE_BINDING_VIEW_COLS(). | ||
""" | ||
|
||
def get_sql_statement(self, use_catalog_as_cluster_name, where_clause_suffix): | ||
# type: (bool, str) -> str | ||
if use_catalog_as_cluster_name: | ||
cluster_source = "CURRENT_DATABASE()" | ||
else: | ||
cluster_source = "'{}'".format(self._cluster) | ||
|
||
return """ | ||
SELECT | ||
* | ||
FROM ( | ||
SELECT | ||
{cluster_source} as cluster, | ||
c.table_schema as schema, | ||
c.table_name as name, | ||
pgtd.description as description, | ||
c.column_name as col_name, | ||
c.data_type as col_type, | ||
pgcd.description as col_description, | ||
ordinal_position as col_sort_order | ||
FROM INFORMATION_SCHEMA.COLUMNS c | ||
INNER JOIN | ||
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname | ||
LEFT JOIN | ||
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position | ||
LEFT JOIN | ||
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0 | ||
UNION | ||
SELECT | ||
{cluster_source} as cluster, | ||
view_schema as schema, | ||
view_name as name, | ||
NULL as description, | ||
column_name as col_name, | ||
data_type as col_type, | ||
NULL as col_description, | ||
ordinal_position as col_sort_order | ||
FROM | ||
PG_GET_LATE_BINDING_VIEW_COLS() | ||
COLS(view_schema NAME, view_name NAME, column_name NAME, data_type VARCHAR, ordinal_position INT) | ||
) | ||
{where_clause_suffix} | ||
ORDER by cluster, schema, name, col_sort_order ; | ||
""".format( | ||
cluster_source=cluster_source, | ||
where_clause_suffix=where_clause_suffix, | ||
) | ||
|
||
def get_scope(self): | ||
# type: () -> str | ||
return 'extractor.redshift_metadata' |
Oops, something went wrong.