From 2e148ced66b9047657a116b7ae13f79303f230d0 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Mon, 18 Jul 2022 15:24:30 +0530 Subject: [PATCH] fix(tableau): fix tableau db error, add more logs --- .../src/datahub/ingestion/source/tableau.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index b9d0dcc121fd2..e6e261a8fdd46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -380,18 +380,27 @@ def _create_upstream_table_lineage( return upstream_tables for table in datasource.get("upstreamTables", []): - # skip upstream tables when there is no column info when retrieving embedded datasource - # and when table name is None - # Schema details for these will be taken care in self.emit_custom_sql_ds() + # skip upstream tables when there is no column info when retrieving datasource + # Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources() if not is_custom_sql and not table.get("columns"): + logger.debug( + f"Skipping upstream table with id {table['id']}, no columns" + ) continue elif table["name"] is None: + logger.warning( + f"Skipping upstream table {table['id']} from lineage since its name is none" + ) continue schema = table.get("schema", "") table_name = table.get("name", "") full_name = table.get("fullName", "") - upstream_db = table.get("database", {}).get("name", "") + upstream_db = ( + table.get("database", {}).get("name", "") + if table.get("database") is not None + else "" + ) logger.debug( "Processing Table with Connection Type: {0} and id {1}".format( table.get("connectionType", ""), table.get("id", "") @@ -406,6 +415,9 @@ def _create_upstream_table_lineage( and table_name == full_name and schema in table_name ): + logger.debug( + f"Omitting schema for upstream table {table['id']}, schema included in table name" + ) schema = "" table_urn = make_table_urn( self.config.env, @@ -555,6 +567,9 @@ def get_schema_metadata_for_custom_sql( # Datasource fields if field.get("name") is None: + logger.warning( + f"Skipping field {field['id']} from schema since its name is none" + ) continue nativeDataType = field.get("remoteType", "UNKNOWN") TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass) @@ -635,6 +650,9 @@ def _get_schema_metadata_for_datasource( # check datasource - custom sql relations from a field being referenced self._track_custom_sql_ids(field) if field.get("name") is None: + logger.warning( + f"Skipping field {field['id']} from schema since its name is none" + ) continue nativeDataType = field.get("dataType", "UNKNOWN") @@ -843,7 +861,7 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]: for (table_urn, (columns, path, is_embedded)) in self.upstream_tables.items(): if not is_embedded and not self.config.ingest_tables_external: - logger.error( + logger.debug( f"Skipping external table {table_urn} as ingest_tables_external is set to False" ) continue @@ -865,6 +883,9 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]: fields = [] for field in columns: if field.get("name") is None: + logger.warning( + f"Skipping field {field['id']} from schema since its name is none" + ) continue nativeDataType = field.get("remoteType", "UNKNOWN") TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass) @@ -907,7 +928,7 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]: aspects=[], ) - creator = workbook.get("owner", {}).get("username", "") + creator: Optional[str] = workbook["owner"].get("username") created_at = sheet.get("createdAt", datetime.now()) updated_at = sheet.get("updatedAt", datetime.now()) last_modified = self.get_last_modified(creator, created_at, updated_at) @@ -940,8 +961,6 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]: data_sources = self.get_sheetwise_upstream_datasources(sheet) for ds_id in data_sources: - if ds_id is None or not ds_id: - continue ds_urn = builder.make_dataset_urn(self.platform, ds_id, self.config.env) datasource_urn.append(ds_urn) if ds_id not in self.datasource_ids_being_used: @@ -1136,7 +1155,7 @@ def _extract_schema_from_fullName(self, fullName: str) -> str: @lru_cache(maxsize=None) def get_last_modified( - self, creator: str, created_at: bytes, updated_at: bytes + self, creator: Optional[str], created_at: bytes, updated_at: bytes ) -> ChangeAuditStamps: last_modified = ChangeAuditStamps() if creator: