Skip to content

Commit

Permalink
feat(ingest): fix bugs in SqlParsingAggregator (#9926)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Feb 28, 2024
1 parent 92b1cfa commit 1736edf
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def get_urns(self) -> Set[str]:
def schema_count(self) -> int:
return int(
self._schema_cache.sql_query(
f"SELECT COUNT(*) FROM {self._schema_cache.tablename} WHERE is_missing"
f"SELECT COUNT(*) FROM {self._schema_cache.tablename} WHERE NOT is_missing"
)[0][0]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class SqlAggregatorReport(Report):
_aggregator: "SqlParsingAggregator"
query_log_path: Optional[str] = None

# Observed queries.
num_observed_queries: int = 0
num_observed_queries_failed: int = 0
num_observed_queries_column_timeout: int = 0
Expand All @@ -123,6 +124,7 @@ class SqlAggregatorReport(Report):
default_factory=LossyList
)

# Views.
num_view_definitions: int = 0
num_views_failed: int = 0
num_views_column_timeout: int = 0
Expand All @@ -131,28 +133,30 @@ class SqlAggregatorReport(Report):
default_factory=LossyDict
)

# Other lineage loading metrics.
num_known_query_lineage: int = 0
num_known_mapping_lineage: int = 0
num_table_renames: int = 0

num_queries_with_temp_tables_in_session: int = 0

num_unique_query_fingerprints: Optional[int] = None

# Lineage-related.
num_urns_with_lineage: Optional[int] = None
# Temp tables.
num_temp_sessions: Optional[int] = None
num_inferred_temp_schemas: Optional[int] = None
num_queries_with_temp_tables_in_session: int = 0
queries_with_temp_upstreams: LossyDict[QueryId, LossyList] = dataclasses.field(
default_factory=LossyDict
)

# Lineage-related.
schema_resolver_count: Optional[int] = None
num_unique_query_fingerprints: Optional[int] = None
num_urns_with_lineage: Optional[int] = None
num_queries_entities_generated: int = 0

# Usage-related.
usage_skipped_missing_timestamp: int = 0

def compute_stats(self) -> None:
self.schema_resolver_count = self._aggregator._schema_resolver.schema_count()
self.num_unique_query_fingerprints = len(self._aggregator._query_map)

self.num_urns_with_lineage = len(self._aggregator._lineage_map)
Expand Down Expand Up @@ -865,6 +869,9 @@ def _gen_lineage_for_downstream(
confidenceScore=queries_map[query_id].confidence_score,
)
)
upstream_aspect.fineGrainedLineages = (
upstream_aspect.fineGrainedLineages or None
)

yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
]
}
}
},
Expand All @@ -44,8 +43,7 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
]
}
}
},
Expand All @@ -69,8 +67,7 @@
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@
"type": "TRANSFORMED",
"query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71"
}
],
"fineGrainedLineages": []
]
}
}
},
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver, _TableName


def test_basic_schema_resolver():
schema_resolver = SchemaResolver(
platform="redshift",
env="PROD",
graph=None,
)

schema_resolver.add_raw_schema_info(
urn="urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)",
schema_info={"name": "STRING"},
)

urn, schema = schema_resolver.resolve_table(
_TableName(database="my_db", db_schema="public", table="test_table")
)
assert (
urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)"
)
assert schema
assert schema["name"]

assert schema_resolver.schema_count() == 1


def test_get_urn_for_table_lowercase():
schema_resolver = SchemaResolver(
platform="mssql",
Expand Down

0 comments on commit 1736edf

Please sign in to comment.