From a43d5b2ea1a7260a1476852e913ceb4eaa142b16 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Thu, 20 Jun 2024 10:40:23 +0200 Subject: [PATCH 1/2] fix(ingestion/unity-catalog): fixed issue with profiling with GE turned on --- .../datahub/ingestion/source/unity/source.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 42ca9af7e8459..d8c0e22d85d86 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -262,7 +262,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.report_ingestion_stage_start("Ingestion Setup") wait_on_warehouse = None - if self.config.is_profiling_enabled() or self.config.include_hive_metastore: + if self.config.include_hive_metastore: self.report.report_ingestion_stage_start("Start warehouse") # Can take several minutes, so start now and wait later wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() @@ -309,9 +309,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start("Wait on warehouse") - assert wait_on_warehouse - wait_on_warehouse.result() + self.report.report_ingestion_stage_start("Start warehouse") + # Can take several minutes, so start now and wait later + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() + self.report.report_ingestion_stage_start("Warehouse started") self.report.report_ingestion_stage_start("Profiling") if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): From 6ea1d29b8848b37e905a9a882b6f7eac5cc4b3a6 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Thu, 20 Jun 2024 14:27:40 +0200 Subject: [PATCH 2/2] fix(ingest/unity-catalog) fixed review comments --- .../src/datahub/ingestion/source/unity/source.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index d8c0e22d85d86..b29170cb2d705 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -310,7 +310,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if self.config.is_profiling_enabled(): self.report.report_ingestion_stage_start("Start warehouse") - # Can take several minutes, so start now and wait later + # Need to start the warehouse again for profiling, + # as it may have been stopped after ingestion might take + # longer time to complete wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() if wait_on_warehouse is None: self.report.report_failure( @@ -321,7 +323,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: # wait until warehouse is started wait_on_warehouse.result() - self.report.report_ingestion_stage_start("Warehouse started") self.report.report_ingestion_stage_start("Profiling") if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig):