Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tableau): fix tableau db error, add more logs #5423

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,27 @@ def _create_upstream_table_lineage(
return upstream_tables

for table in datasource.get("upstreamTables", []):
# skip upstream tables when there is no column info when retrieving embedded datasource
# and when table name is None
# Schema details for these will be taken care in self.emit_custom_sql_ds()
# skip upstream tables when there is no column info when retrieving datasource
# Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources()
if not is_custom_sql and not table.get("columns"):
logger.debug(
f"Skipping upstream table with id {table['id']}, no columns"
)
continue
elif table["name"] is None:
logger.warning(
f"Skipping upstream table {table['id']} from lineage since its name is none"
)
continue

schema = table.get("schema", "")
table_name = table.get("name", "")
full_name = table.get("fullName", "")
upstream_db = table.get("database", {}).get("name", "")
upstream_db = (
table.get("database", {}).get("name", "")
if table.get("database") is not None
else ""
)
logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format(
table.get("connectionType", ""), table.get("id", "")
Expand All @@ -406,6 +415,9 @@ def _create_upstream_table_lineage(
and table_name == full_name
and schema in table_name
):
logger.debug(
f"Omitting schema for upstream table {table['id']}, schema included in table name"
)
schema = ""
table_urn = make_table_urn(
self.config.env,
Expand Down Expand Up @@ -555,6 +567,9 @@ def get_schema_metadata_for_custom_sql(
# Datasource fields

if field.get("name") is None:
logger.warning(
f"Skipping field {field['id']} from schema since its name is none"
)
continue
nativeDataType = field.get("remoteType", "UNKNOWN")
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
Expand Down Expand Up @@ -635,6 +650,9 @@ def _get_schema_metadata_for_datasource(
# check datasource - custom sql relations from a field being referenced
self._track_custom_sql_ids(field)
if field.get("name") is None:
logger.warning(
f"Skipping field {field['id']} from schema since its name is none"
)
continue

nativeDataType = field.get("dataType", "UNKNOWN")
Expand Down Expand Up @@ -843,7 +861,7 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
for (table_urn, (columns, path, is_embedded)) in self.upstream_tables.items():
if not is_embedded and not self.config.ingest_tables_external:
logger.error(
logger.debug(
f"Skipping external table {table_urn} as ingest_tables_external is set to False"
)
continue
Expand All @@ -865,6 +883,9 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
fields = []
for field in columns:
if field.get("name") is None:
logger.warning(
f"Skipping field {field['id']} from schema since its name is none"
)
continue
nativeDataType = field.get("remoteType", "UNKNOWN")
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
Expand Down Expand Up @@ -907,7 +928,7 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
aspects=[],
)

creator = workbook.get("owner", {}).get("username", "")
creator: Optional[str] = workbook["owner"].get("username")
created_at = sheet.get("createdAt", datetime.now())
updated_at = sheet.get("updatedAt", datetime.now())
last_modified = self.get_last_modified(creator, created_at, updated_at)
Expand Down Expand Up @@ -940,8 +961,6 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
data_sources = self.get_sheetwise_upstream_datasources(sheet)

for ds_id in data_sources:
if ds_id is None or not ds_id:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datasource id is mandatory non-null field in tableau metadata type Datasource

continue
ds_urn = builder.make_dataset_urn(self.platform, ds_id, self.config.env)
datasource_urn.append(ds_urn)
if ds_id not in self.datasource_ids_being_used:
Expand Down Expand Up @@ -1136,7 +1155,7 @@ def _extract_schema_from_fullName(self, fullName: str) -> str:

@lru_cache(maxsize=None)
def get_last_modified(
self, creator: str, created_at: bytes, updated_at: bytes
self, creator: Optional[str], created_at: bytes, updated_at: bytes
) -> ChangeAuditStamps:
last_modified = ChangeAuditStamps()
if creator:
Expand Down