Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with migrating MANAGED hive_metastore table to UC for CONVERT_TO_EXTERNAL scenario #3020

Merged
merged 29 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
52905f5
adding CONVERT_TO_EXTERNAL option for table migration
HariGS-DB Oct 10, 2024
427af7d
Merge branch 'main' into feat/convertexternal
HariGS-DB Oct 14, 2024
d669b06
adding CONVERT_TO_EXTERNAL option for table migration
HariGS-DB Oct 15, 2024
67b7f83
merging
HariGS-DB Oct 15, 2024
bb4ca90
unit test fixes
HariGS-DB Oct 15, 2024
ec8b896
unit test fixes
HariGS-DB Oct 15, 2024
7670a18
int test fixes
HariGS-DB Oct 16, 2024
fd9c3bd
int test fixes
HariGS-DB Oct 16, 2024
4809e51
Merge branch 'main' into feat/convertexternal
HariGS-DB Oct 16, 2024
3fb222a
removing wrong int test
HariGS-DB Oct 16, 2024
c2d37c2
Merge branch 'main' into feat/convertexternal
HariGS-DB Oct 16, 2024
2472d12
condition check
HariGS-DB Oct 16, 2024
b38df81
int test
HariGS-DB Oct 18, 2024
e8ed97b
int test
HariGS-DB Oct 19, 2024
c27c55c
int test
HariGS-DB Oct 19, 2024
9be756a
updating managed conversion to another task in workflow
HariGS-DB Oct 19, 2024
3d6c58a
int test fix
HariGS-DB Oct 20, 2024
da8b80c
workflow task order change
HariGS-DB Oct 20, 2024
d2f4c87
adding single user cluster logic
HariGS-DB Oct 20, 2024
37aba78
fixing table update sql
HariGS-DB Oct 20, 2024
aaf7b7a
Merge branch 'main' into feat/managedconvert
HariGS-DB Oct 23, 2024
e6efccf
change in approach to use default non uc cluster
HariGS-DB Oct 23, 2024
0492026
int test fixes
HariGS-DB Oct 23, 2024
971f649
review comments
HariGS-DB Oct 25, 2024
76b816c
merge from main
HariGS-DB Oct 25, 2024
42a382c
review comments
HariGS-DB Oct 25, 2024
3d5e19e
review comments
HariGS-DB Oct 25, 2024
e06874e
Merge branch 'main' into feat/managedconvert
HariGS-DB Oct 26, 2024
dde0772
Merge branch 'main' into feat/managedconvert
HariGS-DB Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/databricks/labs/ucx/hive_metastore/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ def unskip_schema(self, schema: str) -> None:
except (NotFound, BadRequest) as e:
logger.error(f"Failed to remove skip marker from schema: {schema}.", exc_info=e)

def get_tables_to_migrate(self, tables_crawler: TablesCrawler) -> Collection[TableToMigrate]:
def get_tables_to_migrate(
self, tables_crawler: TablesCrawler, check_uc_table: bool = True
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
) -> Collection[TableToMigrate]:
# the check_uc_table is added specifically for convert_managed_hms_to_external method
# so that it doesn't invoke any UC api which are not supported in non uc cluster
rules = self.load()
# Getting all the source tables from the rules
databases_in_scope = self._get_databases_in_scope({rule.src_schema for rule in rules})
Expand All @@ -212,7 +216,14 @@ def get_tables_to_migrate(self, tables_crawler: TablesCrawler) -> Collection[Tab
logger.info(f"Table {rule.as_hms_table_key} is a db demo dataset and will not be migrated")
continue
tasks.append(
partial(self._get_table_in_scope_task, TableToMigrate(crawled_tables_keys[rule.as_hms_table_key], rule))
partial(
self._get_table_in_scope_task,
TableToMigrate(
crawled_tables_keys[rule.as_hms_table_key],
rule,
),
check_uc_table,
)
)

return Threads.strict("checking all database properties", tasks)
Expand Down Expand Up @@ -243,11 +254,11 @@ def _get_database_in_scope_task(self, database: str) -> str | None:
return None
return database

def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToMigrate | None:
def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate, check_uc_table: bool) -> TableToMigrate | None:
table = table_to_migrate.src
rule = table_to_migrate.rule

if self.exists_in_uc(table, rule.as_uc_table_key):
if check_uc_table and self.exists_in_uc(table, rule.as_uc_table_key):
logger.info(f"The intended target for {table.key}, {rule.as_uc_table_key}, already exists.")
return None
properties = self._get_table_properties(table)
Expand All @@ -260,7 +271,7 @@ def _get_table_in_scope_task(self, table_to_migrate: TableToMigrate) -> TableToM
return None
if value["key"] == "upgraded_to":
logger.info(f"{table.key} is set as upgraded to {value['value']}")
if self.exists_in_uc(table, value["value"]):
if check_uc_table and self.exists_in_uc(table, value["value"]):
logger.info(
f"The table {table.key} was previously migrated to {value['value']}. "
f"To revert the table and allow it to be migrated again use the CLI command:"
Expand Down
59 changes: 48 additions & 11 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,41 @@ def get_remaining_tables(self) -> list[Table]:
def index(self, *, force_refresh: bool = False):
return self._migration_status_refresher.index(force_refresh=force_refresh)

def convert_managed_hms_to_external(
self,
managed_table_external_storage: str = "CLONE",
inventory_table: str | None = None,
):
# This method contains some of the steps of migrate tables. this was done to separate out the
# code for converting managed hms table to external, since this needs to run in non uc cluster,
# the functionality to call the UC api are removed

if managed_table_external_storage != "CONVERT_TO_EXTERNAL":
logger.info("Not required to convert managed hms table to external, Skipping this task...")
return None
self._spark = self._spark_session
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc, False)
tables_in_scope = filter(lambda t: t.src.what == What.EXTERNAL_SYNC, tables_to_migrate)
tasks = []
for table in tables_in_scope:
tasks.append(
partial(
self._convert_hms_table_to_external,
table.src,
inventory_table,
)
)
Threads.strict("convert tables", tasks)
if not tasks:
logger.info("No managed hms table found to convert to external")
return tasks

def migrate_tables(
self,
what: What,
hiveserde_in_place_migrate: bool = False,
managed_table_external_storage: str = "CLONE",
check_uc_table: bool = True,
):
if managed_table_external_storage == "CONVERT_TO_EXTERNAL":
self._spark = self._spark_session
Expand All @@ -82,18 +112,17 @@ def migrate_tables(
if what == What.VIEW:
return self._migrate_views()
return self._migrate_tables(
what,
managed_table_external_storage.upper(),
hiveserde_in_place_migrate,
what, managed_table_external_storage.upper(), hiveserde_in_place_migrate, check_uc_table
)

def _migrate_tables(
self,
what: What,
managed_table_external_storage: str,
hiveserde_in_place_migrate: bool = False,
check_uc_table: bool = True,
):
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc)
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc, check_uc_table)
tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate)
tasks = []
for table in tables_in_scope:
Expand Down Expand Up @@ -134,12 +163,15 @@ def _spark_session(self):

return SparkSession.builder.getOrCreate()

def _migrate_managed_table(self, managed_table_external_storage: str, src_table: TableToMigrate):
def _migrate_managed_table(
self,
managed_table_external_storage: str,
src_table: TableToMigrate,
):
if managed_table_external_storage == 'CONVERT_TO_EXTERNAL':
if self._convert_hms_table_to_external(src_table.src):
return self._migrate_external_table(
src_table.src, src_table.rule
) # _migrate_external_table remains unchanged
return self._migrate_external_table(
src_table.src, src_table.rule
) # _migrate_external_table remains unchanged
if managed_table_external_storage == 'SYNC_AS_EXTERNAL':
return self._migrate_managed_as_external_table(src_table.src, src_table.rule) # new method
if managed_table_external_storage == 'CLONE':
Expand Down Expand Up @@ -261,9 +293,9 @@ def _catalog_type(self):
def _catalog_table(self):
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access

def _convert_hms_table_to_external(self, src_table: Table):
def _convert_hms_table_to_external(self, src_table: Table, inventory_table: str):
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
try:
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
table_identifier = self._table_identifier(src_table.name, database)
old_table = self._catalog.getTableMetadata(table_identifier)
Expand All @@ -290,12 +322,17 @@ def _convert_hms_table_to_external(self, src_table: Table):
old_table.viewOriginalText(),
)
self._catalog.alterTable(new_table)
self._update_table_status(src_table, inventory_table)
logger.info(f"Converted {src_table.name} to External Table type.")
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"Error converting HMS table {src_table.name} to external: {e}", exc_info=True)
return False
return True

def _update_table_status(self, src_table: Table, inventory_table: str):
update_sql = f"update {escape_sql_identifier(inventory_table)} set object_type = 'EXTERNAL' where catalog='hive_metastore' and database='{src_table.database}' and name='{src_table.name}';"
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
self._backend.execute(update_sql)

def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule):
target_table_key = rule.as_uc_table_key
table_migrate_sql = src_table.sql_migrate_as_external(target_table_key)
Expand Down
10 changes: 9 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@ class TableMigration(Workflow):
def __init__(self):
super().__init__('migrate-tables')

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
@job_task(job_cluster="main", depends_on=[Assessment.crawl_tables])
def convert_managed_table(self, ctx: RuntimeContext):
"""This workflow task converts managed HMS tables to external table."""
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
ctx.tables_migrator.convert_managed_hms_to_external(
managed_table_external_storage=ctx.config.managed_table_external_storage,
inventory_table=ctx.tables_crawler.full_name,
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
def migrate_external_tables_sync(self, ctx: RuntimeContext):
"""This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore
to the Unity Catalog.
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def _config_table_migration(self, spark_conf_dict) -> tuple[int, int, dict, str]
managed_table_migration_choices = {
"Migrate MANAGED HMS table as EXTERNAL UC table. This option would require you to convert MANAGED HMS tables to EXTERNAL HMS tables once UC migration is complete, otherwise deleting HMS MANAGED table would delete the migrated UC table": 'SYNC_AS_EXTERNAL',
"Copy data from MANAGED HMS to MANAGED UC table": 'CLONE',
"Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. This risks data leakage, as once the relevant HMS tables are deleted, the underlying data won't get deleted anymore.": 'CONVERT_TO_EXTERNAL',
"Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table.Once the relevant HMS tables are deleted, the underlying data won't get deleted anymore, consider the impact of this change on your data workloads": 'CONVERT_TO_EXTERNAL',
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
}
managed_table_migration_choice = self.prompts.choice_from_dict(
"If hive_metastore contains managed table with external"
Expand Down
14 changes: 11 additions & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,14 +611,17 @@ def save_tables(self, is_hiveserde: bool = False):
continue
table_type = table.table_type.value if table.table_type else ""
table_format = table.data_source_format.value if table.data_source_format else default_table_format
storage_location = table.storage_location
if table_type == "MANAGED":
storage_location = ""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, even managed tables have a location

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the logic in fixture considers if type as managed then attaches a dbfs location instead of the one being passes. hence I had to overwrite it

tables_to_save.append(
Table(
catalog=table.catalog_name,
database=table.schema_name,
name=table.name,
object_type=table_type,
table_format=table_format,
location=str(table.storage_location or ""),
location=str(storage_location or ""),
view_text=table.view_definition,
)
)
Expand Down Expand Up @@ -1205,13 +1208,18 @@ def prepare_tables_for_migration(
is_hiveserde = scenario == "hiveserde"
random = make_random(5).lower()
# create external and managed tables to be migrated
if is_hiveserde:
if scenario == "hiveserde":
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"hiveserde_in_place_{random}")
table_base_dir = make_storage_dir(
path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/hiveserde_in_place_{random}'
)
tables = prepare_hiveserde_tables(installation_ctx, random, schema, table_base_dir)
else:
elif scenario == "regularmanaged":
schema_name = f"managed_{random}"
schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/managed_{random}'
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=schema_name, location=schema_location)
tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema)
elif scenario == "regular":
schema = installation_ctx.make_schema(catalog_name="hive_metastore", name=f"migrate_{random}")
tables = prepare_regular_tables(installation_ctx, make_mounted_location, schema)

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def test_migrate_managed_table_to_external_table_without_conversion(
ws, sql_backend, runtime_ctx, make_catalog, make_mounted_location, make_random, env_or_skip
):
src_schema_name = f"dummy_s{make_random(4)}".lower()
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
src_schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/{src_schema_name}'
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
src_external_table = runtime_ctx.make_table(
schema_name=src_schema.name,
Expand Down Expand Up @@ -213,7 +213,7 @@ def test_migrate_managed_table_to_external_table_with_clone(
ws, sql_backend, runtime_ctx, make_catalog, make_mounted_location, make_random, env_or_skip
):
src_schema_name = f"dummy_s{make_random(4)}".lower()
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
src_schema_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/{src_schema_name}'
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
src_external_table = runtime_ctx.make_table(
schema_name=src_schema.name,
Expand Down
34 changes: 34 additions & 0 deletions tests/integration/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.tables import Table


Expand Down Expand Up @@ -60,6 +62,38 @@ def test_table_migration_job_refreshes_migration_status(
assert len(asserts) == 0, assert_message


@pytest.mark.parametrize(
"prepare_tables_for_migration,workflow",
[
("regularmanaged", "migrate-tables"),
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
],
indirect=("prepare_tables_for_migration",),
)
def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_for_migration, workflow, sql_backend):
tables, dst_schema = prepare_tables_for_migration
ctx = installation_ctx.replace(
extend_prompts={
r"If hive_metastore contains managed table with external.*": "0",
r".*Do you want to update the existing installation?.*": 'yes',
},
)

ctx.workspace_installation.run()
ctx.deployed_workflows.run_workflow(workflow)

for table in tables.values():
try:
assert ws.tables.get(f"{dst_schema.catalog_name}.{dst_schema.name}.{table.name}").name
except NotFound:
assert False, f"{table.name} not found in {dst_schema.catalog_name}.{dst_schema.name}"
managed_table = tables["src_managed_table"]

for key, value, _ in sql_backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(managed_table.full_name)}"):
if key == "Type":
assert value == "EXTERNAL"
break


@pytest.mark.parametrize('prepare_tables_for_migration', [('hiveserde')], indirect=True)
def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_tables_for_migration):
tables, dst_schema = prepare_tables_for_migration
Expand Down
15 changes: 6 additions & 9 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,15 @@ def test_migrate_managed_table_as_external_tables_with_conversion(ws, mock_pyspa
table_migrate = TablesMigrator(
table_crawler, ws, backend, table_mapping, migration_status_refresher, migrate_grants, external_locations
)
table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, managed_table_external_storage="CONVERT_TO_EXTERNAL")
table_migrate.convert_managed_hms_to_external(
managed_table_external_storage="CONVERT_TO_EXTERNAL",
inventory_table="hive_metastore.inventory_database.tables",
)

migrate_grants.apply.assert_called()
external_locations.resolve_mount.assert_not_called()

migrate_grants.apply.assert_not_called()
assert backend.queries == [
"SYNC TABLE `ucx_default`.`db1_dst`.`managed_other` FROM `hive_metastore`.`db1_src`.`managed_other`;",
(
f"ALTER TABLE `ucx_default`.`db1_dst`.`managed_other` "
f"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.managed_other' , "
f"'{Table.UPGRADED_FROM_WS_PARAM}' = '123');"
),
"update `hive_metastore`.`inventory_database`.`tables` set object_type = 'EXTERNAL' where catalog='hive_metastore' and database='db1_src' and name='managed_other';"
]


Expand Down
Loading