diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index a00a52ae54207..f4579376a3b3a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -12,8 +12,10 @@ class UnityCatalogUsagePerfReport(Report): get_queries_timer: PerfTimer = field(default_factory=PerfTimer) sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer) + spark_sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer) aggregator_add_event_timer: PerfTimer = field(default_factory=PerfTimer) gen_operation_timer: PerfTimer = field(default_factory=PerfTimer) + query_fingerprinting_timer: PerfTimer = field(default_factory=PerfTimer) @dataclass @@ -32,6 +34,7 @@ class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport): num_external_upstreams_unsupported: int = 0 num_queries: int = 0 + num_unique_queries: int = 0 num_queries_dropped_parse_failure: int = 0 num_queries_missing_table: int = 0 # Can be due to pattern filter num_queries_duplicate_table: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index 08482c9d2fa3b..2e3385ae59dc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -22,6 +22,7 @@ from datahub.ingestion.source.unity.report import UnityCatalogReport from datahub.ingestion.source.usage.usage_common import UsageAggregator from datahub.metadata.schema_classes import OperationClass +from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint logger = logging.getLogger(__name__) @@ -76,6 +77,7 @@ def _get_workunits_internal( self, table_refs: Set[TableReference] ) -> Iterable[MetadataWorkUnit]: table_map = defaultdict(list) + query_hashes = set() for ref in table_refs: table_map[ref.table].append(ref) table_map[f"{ref.schema}.{ref.table}"].append(ref) @@ -85,6 +87,12 @@ def _get_workunits_internal( for query in self._get_queries(): self.report.num_queries += 1 with current_timer.pause(): + with self.report.usage_perf_report.query_fingerprinting_timer: + query_hashes.add( + get_query_fingerprint( + query.query_text, "databricks", fast=True + ) + ) table_info = self._parse_query(query, table_map) if table_info is not None: if self.config.include_operational_stats: @@ -100,6 +108,7 @@ def _get_workunits_internal( user=query.user_name, fields=[], ) + self.report.num_unique_queries = len(query_hashes) if not self.report.num_queries: logger.warning("No queries found in the given time range.") @@ -166,7 +175,8 @@ def _parse_query( with self.report.usage_perf_report.sql_parsing_timer: table_info = self._parse_query_via_lineage_runner(query.query_text) if table_info is None and query.statement_type == QueryStatementType.SELECT: - table_info = self._parse_query_via_spark_sql_plan(query.query_text) + with self.report.usage_perf_report.spark_sql_parsing_timer: + table_info = self._parse_query_via_spark_sql_plan(query.query_text) if table_info is None: self.report.num_queries_dropped_parse_failure += 1