Skip to content

Commit

Permalink
fix(ingestion/unity-catalog): fixed issue with profiling with GE turn…
Browse files Browse the repository at this point in the history
…ed on
  • Loading branch information
dushayntAW committed Jun 20, 2024
1 parent ea7b27b commit 903edaf
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_ingestion_stage_start("Ingestion Setup")
wait_on_warehouse = None
if self.config.is_profiling_enabled() or self.config.include_hive_metastore:
if self.config.include_hive_metastore:
self.report.report_ingestion_stage_start("Start warehouse")
# Can take several minutes, so start now and wait later
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
Expand Down Expand Up @@ -309,9 +309,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
)

if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start("Wait on warehouse")
assert wait_on_warehouse
wait_on_warehouse.result()
self.report.report_ingestion_stage_start("Start warehouse")
# Can take several minutes, so start now and wait later
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None:
self.report.report_failure(
"initialization",
f"SQL warehouse {self.config.profiling.warehouse_id} not found",
)
return
else:
# wait until warehouse is started
wait_on_warehouse.result()
self.report.report_ingestion_stage_start("Warehouse started")

self.report.report_ingestion_stage_start("Profiling")
if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig):
Expand Down

0 comments on commit 903edaf

Please sign in to comment.