Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding CsvTableBadgeExtractor #417

Merged
merged 7 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions databuilder/extractor/csv_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata
from databuilder.models.badge import Badge, BadgeMetadata


class CsvExtractor(Extractor):
Expand Down Expand Up @@ -65,6 +66,83 @@ def get_scope(self) -> str:
return 'extractor.csv'


class CsvTableBadgeExtractor(Extractor):
# Config keys
TABLE_FILE_LOCATION = 'table_file_location'
BADGE_FILE_LOCATION = 'badge_file_location'

"""
An Extractor that combines Table and Badge CSVs.
"""
def init(self, conf: ConfigTree) -> None:
self.conf = conf
self.table_file_location = conf.get_string(CsvTableBadgeExtractor.TABLE_FILE_LOCATION)
self.badge_file_location = conf.get_string(CsvTableBadgeExtractor.BADGE_FILE_LOCATION)
self._load_csv()

def _get_key(self,
db: str,
cluster: str,
schema: str,
tbl: str
) -> str:
return TableMetadata.TABLE_KEY_FORMAT.format(db=db,
cluster=cluster,
schema=schema,
tbl=tbl)

def _load_csv(self) -> None:
with open(self.badge_file_location, 'r') as fin:
self.badges = [dict(i) for i in csv.DictReader(fin)]
# print("BADGES: " + str(self.badges))

parsed_badges = defaultdict(list)
for badge_dict in self.badges:
db = badge_dict['database']
cluster = badge_dict['cluster']
schema = badge_dict['schema']
table_name = badge_dict['table_name']
id = self._get_key(db, cluster, schema, table_name)
badge = Badge(name=badge_dict['name'],
category=badge_dict['category'])
parsed_badges[id].append(badge)

with open(self.table_file_location, 'r') as fin:
tables = [dict(i) for i in csv.DictReader(fin)]

results = []
for table_dict in tables:
db = table_dict['database']
cluster = table_dict['cluster']
schema = table_dict['schema']
table_name = table_dict['name']
id = self._get_key(db, cluster, schema, table_name)
badges = parsed_badges[id]

if badges is None:
badges = []
badge_metadata = BadgeMetadata(start_label=TableMetadata.TABLE_NODE_LABEL,
start_key=id,
badges=badges)
results.append(badge_metadata)
self._iter = iter(results)

def extract(self) -> Any:
"""
Yield the csv result one at a time.
convert the result to model if a model_class is provided
"""
try:
return next(self._iter)
except StopIteration:
return None
except Exception as e:
raise e

def get_scope(self) -> str:
return 'extractor.csvtablebadge'


class CsvTableColumnExtractor(Extractor):
# Config keys
TABLE_FILE_LOCATION = 'table_file_location'
Expand Down
2 changes: 2 additions & 0 deletions example/sample_data/sample_badges.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name,category,database,cluster,schema,table_name
beta,table_status,hive,gold,test_schema,test_table1
32 changes: 31 additions & 1 deletion example/scripts/sample_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.csv_extractor import CsvTableColumnExtractor, CsvExtractor
from databuilder.extractor.csv_extractor import CsvTableBadgeExtractor, CsvTableColumnExtractor, CsvExtractor
from databuilder.extractor.neo4j_es_last_updated_extractor import Neo4jEsLastUpdatedExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.job.job import DefaultJob
Expand Down Expand Up @@ -103,6 +103,35 @@ def run_csv_job(file_loc, job_name, model):
publisher=Neo4jCsvPublisher()).launch()


def run_table_badge_job(table_path, badge_path):
tmp_folder = '/var/tmp/amundsen/table_badge'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder)
extractor = CsvTableBadgeExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=extractor,
loader=csv_loader,
transformer=NoopTransformer())
job_config = ConfigFactory.from_dict({
'extractor.csvtablebadge.table_file_location': table_path,
'extractor.csvtablebadge.badge_file_location': badge_path,
'loader.filesystem_csv_neo4j.node_dir_path': node_files_folder,
'loader.filesystem_csv_neo4j.relationship_dir_path': relationship_files_folder,
'loader.filesystem_csv_neo4j.delete_created_directories': True,
'publisher.neo4j.node_files_directory': node_files_folder,
'publisher.neo4j.relation_files_directory': relationship_files_folder,
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'unique_tag_b', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()


def run_table_column_job(table_path, column_path):
tmp_folder = '/var/tmp/amundsen/table_column'
node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder)
Expand Down Expand Up @@ -268,6 +297,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
# logging.basicConfig(level=logging.INFO)

run_table_column_job('example/sample_data/sample_table.csv', 'example/sample_data/sample_col.csv')
run_table_badge_job('example/sample_data/sample_table.csv', 'example/sample_data/sample_badges.csv')
run_csv_job('example/sample_data/sample_table_column_stats.csv', 'test_table_column_stats',
'databuilder.models.table_stats.TableColumnStats')
run_csv_job('example/sample_data/sample_table_programmatic_source.csv', 'test_programmatic_source',
Expand Down