diff --git a/src/databricks/labs/ucx/hive_metastore/mapping.py b/src/databricks/labs/ucx/hive_metastore/mapping.py index 48f3c30b22..bef402d73d 100644 --- a/src/databricks/labs/ucx/hive_metastore/mapping.py +++ b/src/databricks/labs/ucx/hive_metastore/mapping.py @@ -195,7 +195,13 @@ def unskip_schema(self, schema: str) -> None: except (NotFound, BadRequest) as e: logger.error(f"Failed to remove skip marker from schema: {schema}.", exc_info=e) - def get_tables_to_migrate(self, tables_crawler: TablesCrawler) -> Collection[TableToMigrate]: + def get_tables_to_migrate( + self, + tables_crawler: TablesCrawler, + check_uc_table: bool = True, + ) -> Collection[TableToMigrate]: + # the check_uc_table is added specifically for convert_managed_hms_to_external method + # so that it doesn't invoke any UC api which are not supported in non uc cluster rules = self.load() # Getting all the source tables from the rules databases_in_scope = self._get_databases_in_scope({rule.src_schema for rule in rules}) @@ -212,7 +218,14 @@ def get_tables_to_migrate(self, tables_crawler: TablesCrawler) -> Collection[Tab logger.info(f"Table {rule.as_hms_table_key} is a db demo dataset and will not be migrated") continue tasks.append( - partial(self._get_table_in_scope_task, TableToMigrate(crawled_tables_keys[rule.as_hms_table_key], rule)) + partial( + self._get_table_in_scope_task, + TableToMigrate( + crawled_tables_keys[rule.as_hms_table_key], + rule, + ), + check_uc_table, + ) ) return Threads.strict("checking all database properties", tasks) @@ -243,11 +256,11 @@ def _get_database_in_scope_task(self, database: str) -> str | None: return None return database - def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToMigrate | None: + def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate, check_uc_table: bool) -> TableToMigrate | None: table = table_to_migrate.src rule = table_to_migrate.rule - if self.exists_in_uc(table, rule.as_uc_table_key): + if check_uc_table and self.exists_in_uc(table, rule.as_uc_table_key): logger.info(f"The intended target for {table.key}, {rule.as_uc_table_key}, already exists.") return None properties = self._get_table_properties(table) @@ -260,7 +273,7 @@ def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToM return None if value["key"] == "upgraded_to": logger.info(f"{table.key} is set as upgraded to {value['value']}") - if self.exists_in_uc(table, value["value"]): + if check_uc_table and self.exists_in_uc(table, value["value"]): logger.info( f"The table {table.key} was previously migrated to {value['value']}. " f"To revert the table and allow it to be migrated again use the CLI command:" diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 51c369a692..ca80030258 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -67,11 +67,39 @@ def get_remaining_tables(self) -> list[Table]: def index(self, *, force_refresh: bool = False): return self._migration_status_refresher.index(force_refresh=force_refresh) + def convert_managed_hms_to_external( + self, + managed_table_external_storage: str = "CLONE", + ): + # This method contains some of the steps of migrate tables. this was done to separate out the + # code for converting managed hms table to external, since this needs to run in non uc cluster, + # the functionality to call the UC api are removed + + if managed_table_external_storage != "CONVERT_TO_EXTERNAL": + logger.info("Not required to convert managed hms table to external, Skipping this task...") + return None + self._spark = self._spark_session + tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, False) + tables_in_scope = filter(lambda t: t.src.what == What.EXTERNAL_SYNC, tables_to_migrate) + tasks = [] + for table in tables_in_scope: + tasks.append( + partial( + self._convert_hms_table_to_external, + table.src, + ) + ) + Threads.strict("convert tables", tasks) + if not tasks: + logger.info("No managed hms table found to convert to external") + return tasks + def migrate_tables( self, what: What, hiveserde_in_place_migrate: bool = False, managed_table_external_storage: str = "CLONE", + check_uc_table: bool = True, ): if managed_table_external_storage == "CONVERT_TO_EXTERNAL": self._spark = self._spark_session @@ -82,9 +110,7 @@ def migrate_tables( if what == What.VIEW: return self._migrate_views() return self._migrate_tables( - what, - managed_table_external_storage.upper(), - hiveserde_in_place_migrate, + what, managed_table_external_storage.upper(), hiveserde_in_place_migrate, check_uc_table ) def _migrate_tables( @@ -92,8 +118,9 @@ def _migrate_tables( what: What, managed_table_external_storage: str, hiveserde_in_place_migrate: bool = False, + check_uc_table: bool = True, ): - tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler) + tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler, check_uc_table) tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate) tasks = [] for table in tables_in_scope: @@ -134,12 +161,15 @@ def _spark_session(self): return SparkSession.builder.getOrCreate() - def _migrate_managed_table(self, managed_table_external_storage: str, src_table: TableToMigrate): + def _migrate_managed_table( + self, + managed_table_external_storage: str, + src_table: TableToMigrate, + ): if managed_table_external_storage == 'CONVERT_TO_EXTERNAL': - if self._convert_hms_table_to_external(src_table.src): - return self._migrate_external_table( - src_table.src, src_table.rule - ) # _migrate_external_table remains unchanged + return self._migrate_external_table( + src_table.src, src_table.rule + ) # _migrate_external_table remains unchanged if managed_table_external_storage == 'SYNC_AS_EXTERNAL': return self._migrate_managed_as_external_table(src_table.src, src_table.rule) # new method if managed_table_external_storage == 'CLONE': @@ -262,8 +292,9 @@ def _catalog_table(self): return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access def _convert_hms_table_to_external(self, src_table: Table): + logger.info(f"Changing HMS managed table {src_table.name} to External Table type.") + inventory_table = self._tables_crawler.full_name try: - logger.info(f"Changing HMS managed table {src_table.name} to External Table type.") database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access table_identifier = self._table_identifier(src_table.name, database) old_table = self._catalog.getTableMetadata(table_identifier) @@ -290,12 +321,17 @@ def _convert_hms_table_to_external(self, src_table: Table): old_table.viewOriginalText(), ) self._catalog.alterTable(new_table) + self._update_table_status(src_table, inventory_table) logger.info(f"Converted {src_table.name} to External Table type.") except Exception as e: # pylint: disable=broad-exception-caught logger.warning(f"Error converting HMS table {src_table.name} to external: {e}", exc_info=True) return False return True + def _update_table_status(self, src_table: Table, inventory_table: str): + update_sql = f"UPDATE {escape_sql_identifier(inventory_table)} SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='{src_table.database}' AND name='{src_table.name}';" + self._sql_backend.execute(update_sql) + def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule): target_table_key = rule.as_uc_table_key table_migrate_sql = src_table.sql_migrate_as_external(target_table_key) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 4fd0add564..a07f8b6746 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -8,7 +8,15 @@ class TableMigration(Workflow): def __init__(self): super().__init__('migrate-tables') - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables]) + @job_task(job_cluster="main", depends_on=[Assessment.crawl_tables]) + def convert_managed_table(self, ctx: RuntimeContext): + """This workflow task converts managed HMS tables to external table if `managed_table_external_storage` is set to `CONVERT_TO_EXTERNAL + See documentation for more detail.""" + ctx.tables_migrator.convert_managed_hms_to_external( + managed_table_external_storage=ctx.config.managed_table_external_storage + ) + + @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table]) def migrate_external_tables_sync(self, ctx: RuntimeContext): """This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore to the Unity Catalog. @@ -17,15 +25,18 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext): what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage ) - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables]) + @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table]) def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext): """This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog using deep clone. """ ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA) - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables]) - def migrate_dbfs_root_non_delta_tables(self, ctx: RuntimeContext): + @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table]) + def migrate_dbfs_root_non_delta_tables( + self, + ctx: RuntimeContext, + ): """This workflow task migrates non delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog using CTAS. """ diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index e349ad3432..00d6815b1a 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -380,7 +380,7 @@ def _config_table_migration(self, spark_conf_dict) -> tuple[int, int, dict, str] managed_table_migration_choices = { "Migrate MANAGED HMS table as EXTERNAL UC table. This option would require you to convert MANAGED HMS tables to EXTERNAL HMS tables once UC migration is complete, otherwise deleting HMS MANAGED table would delete the migrated UC table": 'SYNC_AS_EXTERNAL', "Copy data from MANAGED HMS to MANAGED UC table": 'CLONE', - "Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. This risks data leakage, as once the relevant HMS tables are deleted, the underlying data won't get deleted anymore.": 'CONVERT_TO_EXTERNAL', + "Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. Once the relevant HMS tables are deleted, the underlying data won't get deleted anymore, consider the impact of this change on your data workloads": 'CONVERT_TO_EXTERNAL', } managed_table_migration_choice = self.prompts.choice_from_dict( "If hive_metastore contains managed table with external" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1edef6d88f..63971a2940 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -611,6 +611,9 @@ def save_tables(self, is_hiveserde: bool = False): continue table_type = table.table_type.value if table.table_type else "" table_format = table.data_source_format.value if table.data_source_format else default_table_format + storage_location = table.storage_location + if table_type == "MANAGED": + storage_location = "" tables_to_save.append( Table( catalog=table.catalog_name, @@ -618,7 +621,7 @@ def save_tables(self, is_hiveserde: bool = False): name=table.name, object_type=table_type, table_format=table_format, - location=str(table.storage_location or ""), + location=str(storage_location or ""), view_text=table.view_definition, ) ) @@ -1205,13 +1208,18 @@ def prepare_tables_for_migration( is_hiveserde = scenario == "hiveserde" random = make_random(5).lower() # create external and managed tables to be migrated - if is_hiveserde: + if scenario == "hiveserde": schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}") table_base_dir = make_storage_dir( path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}' ) tables = prepare_hiveserde_tables(installation_ctx, random, schema, table_base_dir) - else: + elif scenario == "managed": + schema_name = f"managed_{random}" + schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}' + schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location) + tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema) + elif scenario == "regular": schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}") tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema) diff --git a/tests/integration/hive_metastore/test_migrate.py b/tests/integration/hive_metastore/test_migrate.py index 386606068c..99aafdd4bf 100644 --- a/tests/integration/hive_metastore/test_migrate.py +++ b/tests/integration/hive_metastore/test_migrate.py @@ -186,7 +186,7 @@ def test_migrate_managed_table_to_external_table_without_conversion( env_or_skip, ): src_schema_name = f"dummy_s{make_random(4)}".lower() - src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}" + src_schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/{src_schema_name}' src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location) src_external_table = runtime_ctx.make_table( schema_name=src_schema.name, @@ -230,7 +230,7 @@ def test_migrate_managed_table_to_external_table_with_clone( env_or_skip, ): src_schema_name = f"dummy_s{make_random(4)}".lower() - src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}" + src_schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/{src_schema_name}' src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location) src_external_table = runtime_ctx.make_table( schema_name=src_schema.name, diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 9e85e7364a..ef124ace22 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -1,5 +1,7 @@ import pytest from databricks.sdk.errors import NotFound + +from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table @@ -60,6 +62,39 @@ def test_table_migration_job_refreshes_migration_status( assert len(asserts) == 0, assert_message +@pytest.mark.parametrize( + "prepare_tables_for_migration,workflow", + [ + ("managed", "migrate-tables"), + ], + indirect=("prepare_tables_for_migration",), +) +def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_for_migration, workflow, sql_backend): + # This test cases test the CONVERT_TO_EXTERNAL scenario. + tables, dst_schema = prepare_tables_for_migration + ctx = installation_ctx.replace( + extend_prompts={ + r"If hive_metastore contains managed table with external.*": "0", + r".*Do you want to update the existing installation?.*": 'yes', + }, + ) + + ctx.workspace_installation.run() + ctx.deployed_workflows.run_workflow(workflow) + + for table in tables.values(): + try: + assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name + except NotFound: + assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}" + managed_table = tables["src_managed_table"] + + for key, value, _ in sql_backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"): + if key == "Type": + assert value == "EXTERNAL" + break + + @pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True) def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_tables_for_migration): tables, dst_schema = prepare_tables_for_migration diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 946a57b4f7..5b7e7a66a4 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -217,18 +217,12 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa table_migrate = TablesMigrator( table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations ) - table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, managed_table_external_storage="CONVERT_TO_EXTERNAL") + table_migrate.convert_managed_hms_to_external(managed_table_external_storage="CONVERT_TO_EXTERNAL") - migrate_grants.apply.assert_called() external_locations.resolve_mount.assert_not_called() - + migrate_grants.apply.assert_not_called() assert backend.queries == [ - "SYNC TABLE `ucx_default`.`db1_dst`.`managed_other` FROM `hive_metastore`.`db1_src`.`managed_other`;", - ( - f"ALTER TABLE `ucx_default`.`db1_dst`.`managed_other` " - f"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.managed_other' , " - f"'{Table.UPGRADED_FROM_WS_PARAM}' = '123');" - ), + "UPDATE `hive_metastore`.`inventory_database`.`tables` SET object_type = 'EXTERNAL' WHERE catalog='hive_metastore' AND database='db1_src' AND name='managed_other';" ]