Skip to content

Commit

Permalink
feat: Column lineage implementation & sample ingest scripts (#470)
Browse files Browse the repository at this point in the history
* Slight refactor to table lineage interface, added csv extract to import table lineage

Signed-off-by: Grant Seward <[email protected]>

* Removed whitespace

Signed-off-by: Grant Seward <[email protected]>

* Local fork test

Signed-off-by: Grant Seward <[email protected]>

* Fixed linting

Signed-off-by: Grant Seward <[email protected]>

* Fixed isort

Signed-off-by: Grant Seward <[email protected]>

* Additional test data

Signed-off-by: Grant Seward <[email protected]>

* Created generic lineage interface, changed upstream/downstream wording to be explicit

Signed-off-by: Grant Seward <[email protected]>

* removed white space...

Signed-off-by: Grant Seward <[email protected]>

* Fixed static typing

Signed-off-by: Grant Seward <[email protected]>

* Fixed test from upstream merge

Signed-off-by: Grant Seward <[email protected]>
  • Loading branch information
sewardgw authored Apr 25, 2021
1 parent 04f1e27 commit a2f8a1b
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 70 deletions.
106 changes: 105 additions & 1 deletion databuilder/databuilder/extractor/csv_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.badge import Badge, BadgeMetadata
from databuilder.models.table_lineage import ColumnLineage, TableLineage
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata


Expand Down Expand Up @@ -192,7 +193,6 @@ def _load_csv(self) -> None:
"""
Create an iterator to execute sql.
"""

with open(self.column_file_location, 'r') as fin:
self.columns = [dict(i) for i in csv.DictReader(fin)]

Expand Down Expand Up @@ -256,3 +256,107 @@ def extract(self) -> Any:

def get_scope(self) -> str:
return 'extractor.csvtablecolumn'


class CsvTableLineageExtractor(Extractor):
# Config keys
TABLE_LINEAGE_FILE_LOCATION = 'table_lineage_file_location'

"""
An Extractor that creates Table Lineage between two tables
"""

def init(self, conf: ConfigTree) -> None:
"""
:param conf:
"""
self.conf = conf
self.table_lineage_file_location = conf.get_string(CsvTableLineageExtractor.TABLE_LINEAGE_FILE_LOCATION)
self._load_csv()

def _load_csv(self) -> None:
"""
Create an iterator to execute sql.
"""

with open(self.table_lineage_file_location, 'r') as fin:
self.table_lineage = [dict(i) for i in csv.DictReader(fin)]

results = []
for lineage_dict in self.table_lineage:
source_table_key = lineage_dict['source_table_key']
target_table_key = lineage_dict['target_table_key']
lineage = TableLineage(
table_key=source_table_key,
downstream_deps=[target_table_key]
)
results.append(lineage)

self._iter = iter(results)

def extract(self) -> Any:
"""
Yield the csv result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self._iter)
except StopIteration:
return None
except Exception as e:
raise e

def get_scope(self) -> str:
return 'extractor.csvtablelineage'


class CsvColumnLineageExtractor(Extractor):
# Config keys
COLUMN_LINEAGE_FILE_LOCATION = 'column_lineage_file_location'

"""
An Extractor that creates Column Lineage between two columns
"""

def init(self, conf: ConfigTree) -> None:
"""
:param conf:
"""
self.conf = conf
self.column_lineage_file_location = conf.get_string(CsvColumnLineageExtractor.COLUMN_LINEAGE_FILE_LOCATION)
self._load_csv()

def _load_csv(self) -> None:
"""
Create an iterator to execute sql.
"""

with open(self.column_lineage_file_location, 'r') as fin:
self.column_lineage = [dict(i) for i in csv.DictReader(fin)]

results = []
for lineage_dict in self.column_lineage:
source_column_key = lineage_dict['source_column_key']
target_column_key = lineage_dict['target_column_key']
lineage = ColumnLineage(
column_key=source_column_key,
downstream_deps=[target_column_key]
)
results.append(lineage)

self._iter = iter(results)

def extract(self) -> Any:
"""
Yield the csv result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self._iter)
except StopIteration:
return None
except Exception as e:
raise e

def get_scope(self) -> str:
return 'extractor.csvcolumnlineage'
130 changes: 74 additions & 56 deletions databuilder/databuilder/models/table_lineage.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import re
from abc import abstractmethod
from typing import (
Iterator, List, Union,
)

from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.table_metadata import TableMetadata
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata


class TableLineage(GraphSerializable):
class BaseLineage(GraphSerializable):
"""
Table Lineage Model. It won't create nodes but create upstream/downstream rels.
Generic Lineage Interface
"""
LABEL = 'Lineage'
KEY_FORMAT = '{db}://{cluster}.{schema}/{tbl}/'
ORIGIN_DEPENDENCY_RELATION_TYPE = 'UPSTREAM'
DEPENDENCY_ORIGIN_RELATION_TYPE = 'DOWNSTREAM'
ORIGIN_DEPENDENCY_RELATION_TYPE = 'HAS_DOWNSTREAM'
DEPENDENCY_ORIGIN_RELATION_TYPE = 'HAS_UPSTREAM'

def __init__(self,
db_name: str,
schema: str,
table_name: str,
cluster: str,
downstream_deps: List = None,
) -> None:
self.db = db_name
self.schema = schema
self.table = table_name
self.cluster = cluster if cluster else 'gold'
# a list of downstream dependencies, each of which will follow
# the same key
self.downstream_deps = downstream_deps or []
def __init__(self) -> None:
self._node_iter = self._create_node_iterator()
self._relation_iter = self._create_rel_iterator()

Expand All @@ -51,14 +37,6 @@ def create_next_relation(self) -> Union[GraphRelationship, None]:
except StopIteration:
return None

def get_table_model_key(self,
db: str,
cluster: str,
schema: str,
table: str
) -> str:
return f'{db}://{cluster}.{schema}/{table}'

def _create_node_iterator(self) -> Iterator[GraphNode]:
"""
It won't create any node for this model
Expand All @@ -67,37 +45,77 @@ def _create_node_iterator(self) -> Iterator[GraphNode]:
return
yield

@abstractmethod
def _create_rel_iterator(self) -> Iterator[GraphRelationship]:
pass


class TableLineage(BaseLineage):
"""
Table Lineage Model. It won't create nodes but create upstream/downstream rels.
"""

def __init__(self,
table_key: str,
downstream_deps: List = None, # List of table keys
) -> None:
self.table_key = table_key
# a list of downstream dependencies, each of which will follow
# the same key
self.downstream_deps = downstream_deps or []
super().__init__()

def _create_rel_iterator(self) -> Iterator[GraphRelationship]:
"""
Create relations between source table and all the downstream tables
:return:
"""
for downstream_tab in self.downstream_deps:
# every deps should follow '{db}://{cluster}.{schema}/{table}'
# todo: if we change the table uri, we should change here.
m = re.match('(\w+)://(\w+)\.(\w+)\/(\w+)', downstream_tab)
if m:
# if not match, skip those records
relationship = GraphRelationship(
start_key=self.get_table_model_key(
db=self.db,
cluster=self.cluster,
schema=self.schema,
table=self.table
),
start_label=TableMetadata.TABLE_NODE_LABEL,
end_label=TableMetadata.TABLE_NODE_LABEL,
end_key=self.get_table_model_key(
db=m.group(1),
cluster=m.group(2),
schema=m.group(3),
table=m.group(4)
),
type=TableLineage.ORIGIN_DEPENDENCY_RELATION_TYPE,
reverse_type=TableLineage.DEPENDENCY_ORIGIN_RELATION_TYPE,
attributes={}
)
yield relationship
for downstream_key in self.downstream_deps:
relationship = GraphRelationship(
start_key=self.table_key,
start_label=TableMetadata.TABLE_NODE_LABEL,
end_label=TableMetadata.TABLE_NODE_LABEL,
end_key=downstream_key,
type=TableLineage.ORIGIN_DEPENDENCY_RELATION_TYPE,
reverse_type=TableLineage.DEPENDENCY_ORIGIN_RELATION_TYPE,
attributes={}
)
yield relationship

def __repr__(self) -> str:
return f'TableLineage({self.table_key!r})'


class ColumnLineage(BaseLineage):
"""
Column Lineage Model. It won't create nodes but create upstream/downstream rels.
"""
def __init__(self,
column_key: str,
downstream_deps: List = None, # List of column keys
) -> None:
self.column_key = column_key
# a list of downstream dependencies, each of which will follow
# the same key
self.downstream_deps = downstream_deps or []
super().__init__()

def _create_rel_iterator(self) -> Iterator[GraphRelationship]:
"""
Create relations between source column and all the downstream columns
:return:
"""
for downstream_key in self.downstream_deps:
relationship = GraphRelationship(
start_key=self.column_key,
start_label=ColumnMetadata.COLUMN_NODE_LABEL,
end_label=ColumnMetadata.COLUMN_NODE_LABEL,
end_key=downstream_key,
type=ColumnLineage.ORIGIN_DEPENDENCY_RELATION_TYPE,
reverse_type=ColumnLineage.DEPENDENCY_ORIGIN_RELATION_TYPE,
attributes={}
)
yield relationship

def __repr__(self) -> str:
return f'TableLineage({self.db!r}, {self.cluster!r}, {self.schema!r}, {self.table!r})'
return f'ColumnLineage({self.column_key!r})'
4 changes: 4 additions & 0 deletions databuilder/example/sample_data/sample_column_lineage.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source_column_key,target_column_key
hive://gold.test_schema/test_table1/col1,dynamo://gold.test_schema/test_table2/col1
hive://gold.test_schema/test_table1/col1,hive://gold.test_schema/test_view1/col1
dynamo://gold.test_schema/test_table2/col2,hive://gold.test_schema/test_table3/col1
7 changes: 7 additions & 0 deletions databuilder/example/sample_data/sample_table_lineage.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
source_table_key,target_table_key
hive://gold.test_schema/test_table1,dynamo://gold.test_schema/test_table2
hive://gold.test_schema/test_table1,hive://gold.test_schema/test_view1
dynamo://gold.test_schema/test_table2,hive://gold.test_schema/test_table3
hive://gold.test_schema/test_view1,hive://gold.test_schema/test's_table4
dynamo://gold.test_schema/test_table2,hive://gold.test_schema/test's_table4
hive://gold.test_schema/test_view1,hive://gold.test_schema/test's_table4
60 changes: 59 additions & 1 deletion databuilder/example/scripts/sample_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.csv_extractor import (
CsvExtractor, CsvTableBadgeExtractor, CsvTableColumnExtractor,
CsvColumnLineageExtractor, CsvExtractor, CsvTableBadgeExtractor, CsvTableColumnExtractor, CsvTableLineageExtractor,
)
from databuilder.extractor.es_last_updated_extractor import EsLastUpdatedExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
Expand Down Expand Up @@ -165,6 +165,62 @@ def run_table_column_job(table_path, column_path):
job.launch()


def run_table_lineage_job(table_lineage_path):
tmp_folder = '/var/tmp/amundsen/table_column'
node_files_folder = f'{tmp_folder}/nodes'
relationship_files_folder = f'{tmp_folder}/relationships'
extractor = CsvTableLineageExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor,
loader=csv_loader,
transformer=NoopTransformer())
job_config = ConfigFactory.from_dict({
'extractor.csvtablelineage.table_lineage_file_location': table_lineage_path,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'lineage_unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()


def run_column_lineage_job(column_lineage_path):
tmp_folder = '/var/tmp/amundsen/table_column'
node_files_folder = f'{tmp_folder}/nodes'
relationship_files_folder = f'{tmp_folder}/relationships'
extractor = CsvColumnLineageExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor,
loader=csv_loader,
transformer=NoopTransformer())
job_config = ConfigFactory.from_dict({
'extractor.csvcolumnlineage.column_lineage_file_location': column_lineage_path,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'lineage_unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()


def create_last_updated_job():
# loader saves data to these folders and publisher reads it from here
tmp_folder = '/var/tmp/amundsen/last_updated_data'
Expand Down Expand Up @@ -302,6 +358,8 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index

run_table_column_job('example/sample_data/sample_table.csv', 'example/sample_data/sample_col.csv')
run_table_badge_job('example/sample_data/sample_table.csv', 'example/sample_data/sample_badges.csv')
run_table_lineage_job('example/sample_data/sample_table_lineage.csv')
run_column_lineage_job('example/sample_data/sample_column_lineage.csv')
run_csv_job('example/sample_data/sample_table_column_stats.csv', 'test_table_column_stats',
'databuilder.models.table_stats.TableColumnStats')
run_csv_job('example/sample_data/sample_table_programmatic_source.csv', 'test_programmatic_source',
Expand Down
Loading

0 comments on commit a2f8a1b

Please sign in to comment.