Skip to content

Commit

Permalink
fix(tableau): fix tableau db error, add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Jul 19, 2022
1 parent a6dc669 commit 2e148ce
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,27 @@ def _create_upstream_table_lineage(
return upstream_tables

for table in datasource.get("upstreamTables", []):
# skip upstream tables when there is no column info when retrieving embedded datasource
# and when table name is None
# Schema details for these will be taken care in self.emit_custom_sql_ds()
# skip upstream tables when there is no column info when retrieving datasource
# Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources()
if not is_custom_sql and not table.get("columns"):
logger.debug(
f"Skipping upstream table with id {table['id']}, no columns"
)
continue
elif table["name"] is None:
logger.warning(
f"Skipping upstream table {table['id']} from lineage since its name is none"
)
continue

schema = table.get("schema", "")
table_name = table.get("name", "")
full_name = table.get("fullName", "")
upstream_db = table.get("database", {}).get("name", "")
upstream_db = (
table.get("database", {}).get("name", "")
if table.get("database") is not None
else ""
)
logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format(
table.get("connectionType", ""), table.get("id", "")
Expand All @@ -406,6 +415,9 @@ def _create_upstream_table_lineage(
and table_name == full_name
and schema in table_name
):
logger.debug(
f"Omitting schema for upstream table {table['id']}, schema included in table name"
)
schema = ""
table_urn = make_table_urn(
self.config.env,
Expand Down Expand Up @@ -555,6 +567,9 @@ def get_schema_metadata_for_custom_sql(
# Datasource fields

if field.get("name") is None:
logger.warning(
f"Skipping field {field['id']} from schema since its name is none"
)
continue
nativeDataType = field.get("remoteType", "UNKNOWN")
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
Expand Down Expand Up @@ -635,6 +650,9 @@ def _get_schema_metadata_for_datasource(
# check datasource - custom sql relations from a field being referenced
self._track_custom_sql_ids(field)
if field.get("name") is None:
logger.warning(
f"Skipping field {field['id']} from schema since its name is none"
)
continue

nativeDataType = field.get("dataType", "UNKNOWN")
Expand Down Expand Up @@ -843,7 +861,7 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
for (table_urn, (columns, path, is_embedded)) in self.upstream_tables.items():
if not is_embedded and not self.config.ingest_tables_external:
logger.error(
logger.debug(
f"Skipping external table {table_urn} as ingest_tables_external is set to False"
)
continue
Expand All @@ -865,6 +883,9 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
fields = []
for field in columns:
if field.get("name") is None:
logger.warning(
f"Skipping field {field['id']} from schema since its name is none"
)
continue
nativeDataType = field.get("remoteType", "UNKNOWN")
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
Expand Down Expand Up @@ -907,7 +928,7 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
aspects=[],
)

creator = workbook.get("owner", {}).get("username", "")
creator: Optional[str] = workbook["owner"].get("username")
created_at = sheet.get("createdAt", datetime.now())
updated_at = sheet.get("updatedAt", datetime.now())
last_modified = self.get_last_modified(creator, created_at, updated_at)
Expand Down Expand Up @@ -940,8 +961,6 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
data_sources = self.get_sheetwise_upstream_datasources(sheet)

for ds_id in data_sources:
if ds_id is None or not ds_id:
continue
ds_urn = builder.make_dataset_urn(self.platform, ds_id, self.config.env)
datasource_urn.append(ds_urn)
if ds_id not in self.datasource_ids_being_used:
Expand Down Expand Up @@ -1136,7 +1155,7 @@ def _extract_schema_from_fullName(self, fullName: str) -> str:

@lru_cache(maxsize=None)
def get_last_modified(
self, creator: str, created_at: bytes, updated_at: bytes
self, creator: Optional[str], created_at: bytes, updated_at: bytes
) -> ChangeAuditStamps:
last_modified = ChangeAuditStamps()
if creator:
Expand Down

0 comments on commit 2e148ce

Please sign in to comment.