diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index 386ede3d0a..8d6ac54f91 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -19,15 +19,52 @@ from databricks.labs.ucx.runtime import main from databricks.labs.ucx.tasks import _TASKS +TAG_STEP = "step" +TAG_APP = "App" + +DEBUG_NOTEBOOK = """ +# Databricks notebook source +# MAGIC %md +# MAGIC # Debug companion for UCX installation (see [README]({readme_link})) +# MAGIC +# MAGIC Production runs are supposed to be triggered through the following jobs: {job_links} +# MAGIC +# MAGIC **This notebook is overwritten with each UCX update/(re)install.** + +# COMMAND ---------- + +# MAGIC %pip install /Workspace{remote_wheel} +dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +from pathlib import Path +from databricks.labs.ucx.__about__ import __version__ +from databricks.labs.ucx.config import MigrationConfig +from databricks.labs.ucx import logger +from databricks.sdk import WorkspaceClient + +logger._install() +logging.getLogger("databricks").setLevel("DEBUG") + +cfg = MigrationConfig.from_file(Path("/Workspace{config_file}")) +ws = WorkspaceClient() + +print(__version__) +""" + logger = logging.getLogger(__name__) class Installer: - def __init__(self, ws: WorkspaceClient): + def __init__(self, ws: WorkspaceClient, *, prefix: str = "ucx", promtps: bool = True): if "DATABRICKS_RUNTIME_VERSION" in os.environ: msg = "Installer is not supposed to be executed in Databricks Runtime" raise SystemExit(msg) self._ws = ws + self._prefix = prefix + self._prompts = promtps def run(self): self._configure() @@ -45,7 +82,7 @@ def _my_username(self): @property def _install_folder(self): - return f"/Users/{self._my_username}/.ucx" + return f"/Users/{self._my_username}/.{self._prefix}" @property def _config_file(self): @@ -60,14 +97,13 @@ def _current_config(self): return self._config def _configure(self): - config_path = self._config_file - ws_file_url = f"{self._ws.config.host}/#workspace{config_path}" + ws_file_url = self._notebook_link(self._config_file) try: - self._ws.workspace.get_status(config_path) + self._ws.workspace.get_status(self._config_file) logger.info(f"UCX is already configured. See {ws_file_url}") - if self._question("Type 'yes' to open config file in the browser") == "yes": + if self._prompts and self._question("Type 'yes' to open config file in the browser") == "yes": webbrowser.open(ws_file_url) - return config_path + return except DatabricksError as err: if err.error_code != "RESOURCE_DOES_NOT_EXIST": raise err @@ -84,41 +120,55 @@ def _configure(self): num_threads=int(self._question("Number of threads", default="8")), ) - config_bytes = yaml.dump(self._config.as_dict()).encode("utf8") - self._ws.workspace.upload(config_path, config_bytes, format=ImportFormat.AUTO) - logger.info(f"Created configuration file: {config_path}") - if self._question("Open config file in the browser and continue installing?", default="yes") == "yes": + self._write_config() + msg = "Open config file in the browser and continue installing?" + if self._prompts and self._question(msg, default="yes") == "yes": webbrowser.open(ws_file_url) + def _write_config(self): + try: + self._ws.workspace.get_status(self._install_folder) + except DatabricksError as err: + if err.error_code != "RESOURCE_DOES_NOT_EXIST": + raise err + logger.debug(f"Creating install folder: {self._install_folder}") + self._ws.workspace.mkdirs(self._install_folder) + + config_bytes = yaml.dump(self._config.as_dict()).encode("utf8") + logger.info(f"Creating configuration file: {self._config_file}") + self._ws.workspace.upload(self._config_file, config_bytes, format=ImportFormat.AUTO) + def _create_jobs(self): logger.debug(f"Creating jobs from tasks in {main.__name__}") - dbfs_path = self._upload_wheel() - deployed_steps = self._deployed_steps() + remote_wheel = self._upload_wheel() + self._deployed_steps = self._deployed_steps() desired_steps = {t.workflow for t in _TASKS.values()} for step_name in desired_steps: - settings = self._job_settings(step_name, dbfs_path) - if step_name in deployed_steps: - job_id = deployed_steps[step_name] + settings = self._job_settings(step_name, remote_wheel) + if step_name in self._deployed_steps: + job_id = self._deployed_steps[step_name] logger.info(f"Updating configuration for step={step_name} job_id={job_id}") self._ws.jobs.reset(job_id, jobs.JobSettings(**settings)) else: logger.info(f"Creating new job configuration for step={step_name}") - deployed_steps[step_name] = self._ws.jobs.create(**settings).job_id + self._deployed_steps[step_name] = self._ws.jobs.create(**settings).job_id - for step_name, job_id in deployed_steps.items(): + for step_name, job_id in self._deployed_steps.items(): if step_name not in desired_steps: logger.info(f"Removing job_id={job_id}, as it is no longer needed") self._ws.jobs.delete(job_id) - self._create_readme(deployed_steps) + self._create_readme() + self._create_debug(remote_wheel) - def _create_readme(self, deployed_steps): + def _create_readme(self): md = [ "# UCX - The Unity Catalog Migration Assistant", "Here are the descriptions of jobs that trigger various stages of migration.", + f'To troubleshoot, see [debug notebook]({self._notebook_link(f"{self._install_folder}/DEBUG.py")}).', ] - for step_name, job_id in deployed_steps.items(): - md.append(f"## [[UCX] {step_name}]({self._ws.config.host}#job/{job_id})\n") + for step_name, job_id in self._deployed_steps.items(): + md.append(f"## [[{self._prefix.upper()}] {step_name}]({self._ws.config.host}#job/{job_id})\n") for t in _TASKS.values(): if t.workflow != step_name: continue @@ -129,12 +179,31 @@ def _create_readme(self, deployed_steps): intro = "\n".join(preamble + [f"# MAGIC {line}" for line in md]) path = f"{self._install_folder}/README.py" self._ws.workspace.upload(path, intro.encode("utf8"), overwrite=True) - url = f"{self._ws.config.host}/#workspace{path}" - logger.info(f"Created notebook with job overview: {url}") + url = self._notebook_link(path) + logger.info(f"Created README notebook with job overview: {url}") msg = "Type 'yes' to open job overview in README notebook in your home directory" - if self._question(msg) == "yes": + if self._prompts and self._question(msg) == "yes": webbrowser.open(url) + def _create_debug(self, remote_wheel: str): + readme_link = self._notebook_link(f"{self._install_folder}/README.py") + job_links = ", ".join( + f"[[{self._prefix.upper()}] {step_name}]({self._ws.config.host}#job/{job_id})" + for step_name, job_id in self._deployed_steps.items() + ) + path = f"{self._install_folder}/DEBUG.py" + logger.debug(f"Created debug notebook: {self._notebook_link(path)}") + self._ws.workspace.upload( + path, + DEBUG_NOTEBOOK.format( + remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self._config_file + ).encode("utf8"), + overwrite=True, + ) + + def _notebook_link(self, path: str) -> str: + return f"{self._ws.config.host}/#workspace{path}" + @staticmethod def _question(text: str, *, default: str | None = None) -> str: default_help = "" if default is None else f"\033[36m (default: {default})\033[0m" @@ -146,14 +215,20 @@ def _question(text: str, *, default: str | None = None) -> str: return default return res - def _upload_wheel(self): + def _upload_wheel(self) -> str: with tempfile.TemporaryDirectory() as tmp_dir: - wheel = self._build_wheel(tmp_dir) - dbfs_path = f"{self._install_folder}/wheels/{wheel.name}" - with wheel.open("rb") as f: - logger.info(f"Uploading wheel to dbfs:{dbfs_path}") - self._ws.dbfs.upload(dbfs_path, f, overwrite=True) - return dbfs_path + local_wheel = self._build_wheel(tmp_dir) + remote_wheel = f"{self._install_folder}/wheels/{local_wheel.name}" + remote_dirname = os.path.dirname(remote_wheel) + with local_wheel.open("rb") as f: + self._ws.dbfs.mkdirs(remote_dirname) + logger.info(f"Uploading wheel to dbfs:{remote_wheel}") + self._ws.dbfs.upload(remote_wheel, f, overwrite=True) + with local_wheel.open("rb") as f: + self._ws.workspace.mkdirs(remote_dirname) + logger.info(f"Uploading wheel to /Workspace{remote_wheel}") + self._ws.workspace.upload(remote_wheel, f, overwrite=True, format=ImportFormat.AUTO) + return remote_wheel def _job_settings(self, step_name, dbfs_path): config_file = f"/Workspace/{self._install_folder}/config.yml" @@ -164,8 +239,8 @@ def _job_settings(self, step_name, dbfs_path): ) tasks = sorted([t for t in _TASKS.values() if t.workflow == step_name], key=lambda _: _.name) return { - "name": f"[UCX] {step_name}", - "tags": {"App": "ucx", "step": step_name}, + "name": f"[{self._prefix.upper()}] {step_name}", + "tags": {TAG_APP: self._prefix, TAG_STEP: step_name}, "job_clusters": self._job_clusters({t.job_cluster for t in tasks}), "email_notifications": email_notifications, "tasks": [ @@ -210,6 +285,7 @@ def _job_clusters(self, names: set[str]): spec, data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL, spark_conf={"spark.databricks.acl.sqlOnly": "true"}, + num_workers=1, # ShowPermissionsCommand needs a worker custom_tags={}, ), ) @@ -270,13 +346,14 @@ def _cluster_node_type(self, spec: compute.ClusterSpec) -> compute.ClusterSpec: def _deployed_steps(self): deployed_steps = {} + logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._prefix}") for j in self._ws.jobs.list(): tags = j.settings.tags if tags is None: continue - if tags.get("App", None) != "ucx": + if tags.get(TAG_APP, None) != self._prefix: continue - deployed_steps[tags.get("step", "_")] = j.job_id + deployed_steps[tags.get(TAG_STEP, "_")] = j.job_id return deployed_steps diff --git a/src/databricks/labs/ucx/providers/mixins/fixtures.py b/src/databricks/labs/ucx/providers/mixins/fixtures.py index dbe3f725b2..15663df1ca 100644 --- a/src/databricks/labs/ucx/providers/mixins/fixtures.py +++ b/src/databricks/labs/ucx/providers/mixins/fixtures.py @@ -354,6 +354,7 @@ def create( kwargs["roles"] = _scim_values(roles) if entitlements is not None: kwargs["entitlements"] = _scim_values(entitlements) + # TODO: REQUEST_LIMIT_EXCEEDED: GetUserPermissionsRequest RPC token bucket limit has been exceeded. return interface.create(**kwargs) yield from factory(name, create, lambda item: interface.delete(item.id)) diff --git a/src/databricks/labs/ucx/runtime.py b/src/databricks/labs/ucx/runtime.py index 919ccb2063..af5f5c502e 100644 --- a/src/databricks/labs/ucx/runtime.py +++ b/src/databricks/labs/ucx/runtime.py @@ -5,15 +5,10 @@ from databricks.sdk import WorkspaceClient from databricks.labs.ucx.config import MigrationConfig -from databricks.labs.ucx.logger import _install from databricks.labs.ucx.tasks import task, trigger from databricks.labs.ucx.toolkits.group_migration import GroupMigrationToolkit from databricks.labs.ucx.toolkits.table_acls import TaclToolkit -_install() - -logging.root.setLevel("INFO") - logger = logging.getLogger(__name__) diff --git a/src/databricks/labs/ucx/tacl/_internal.py b/src/databricks/labs/ucx/tacl/_internal.py index 9440818549..6fb95ae799 100644 --- a/src/databricks/labs/ucx/tacl/_internal.py +++ b/src/databricks/labs/ucx/tacl/_internal.py @@ -27,9 +27,11 @@ def __init__(self, ws: WorkspaceClient, warehouse_id): self._warehouse_id = warehouse_id def execute(self, sql): + logger.debug(f"[api][execute] {sql}") self._sql.execute(self._warehouse_id, sql) def fetch(self, sql) -> Iterator[any]: + logger.debug(f"[api][fetch] {sql}") return self._sql.execute_fetch_all(self._warehouse_id, sql) @@ -43,9 +45,11 @@ def __init__(self): self._spark = SparkSession.builder.getOrCreate() def execute(self, sql): + logger.debug(f"[spark][execute] {sql}") self._spark.sql(sql) def fetch(self, sql) -> Iterator[any]: + logger.debug(f"[spark][fetch] {sql}") return self._spark.sql(sql).collect() @@ -160,6 +164,7 @@ def _snapshot(self, klass, fetcher, loader) -> list[any]: logger.debug(f"[{self._full_name}] crawling new batch for {self._table}") loaded_records = list(loader()) if len(loaded_records) > 0: + logger.debug(f"[{self._full_name}] found {len(loaded_records)} new records for {self._table}") self._append_records(klass, loaded_records) loaded = True @@ -230,10 +235,10 @@ def _append_records(self, klass, records: Iterator[any]): logger.debug(f"[{self._full_name}] not found. creating") schema = ", ".join(f"{f.name} {self._field_type(f)}" for f in fields) try: - ddl = f"CREATE TABLE {self._full_name} ({schema}) USING DELTA" - self._exec(ddl) + self._exec(f"CREATE TABLE {self._full_name} ({schema}) USING DELTA") except Exception as e: schema_not_found = "SCHEMA_NOT_FOUND" in str(e) if not schema_not_found: raise e + logger.debug(f"[{self._catalog}.{self._schema}] not found. creating") self._exec(f"CREATE SCHEMA {self._catalog}.{self._schema}") diff --git a/src/databricks/labs/ucx/tacl/grants.py b/src/databricks/labs/ucx/tacl/grants.py index 96d3212cd0..8d8639bcf3 100644 --- a/src/databricks/labs/ucx/tacl/grants.py +++ b/src/databricks/labs/ucx/tacl/grants.py @@ -166,7 +166,11 @@ def _crawl(self, catalog: str, database: str) -> list[Grant]: tasks.append(partial(fn, view=table.name)) else: tasks.append(partial(fn, table=table.name)) - return [grant for grants in ThreadedExecution.gather("listing grants", tasks) for grant in grants] + return [ + grant + for grants in ThreadedExecution.gather(f"listing grants for {catalog}.{database}", tasks) + for grant in grants + ] def _grants( self, diff --git a/src/databricks/labs/ucx/tacl/tables.py b/src/databricks/labs/ucx/tacl/tables.py index 6a4267a019..d7ac40929c 100644 --- a/src/databricks/labs/ucx/tacl/tables.py +++ b/src/databricks/labs/ucx/tacl/tables.py @@ -119,7 +119,7 @@ def _crawl(self, catalog: str, database: str) -> list[Table]: tasks = [] for _, table, _is_tmp in self._fetch(f"SHOW TABLES FROM {catalog}.{database}"): tasks.append(partial(self._describe, catalog, database, table)) - return ThreadedExecution.gather("listing tables", tasks) + return ThreadedExecution.gather(f"listing tables in {catalog}.{database}", tasks) def _describe(self, catalog: str, database: str, table: str) -> Table: """Fetches metadata like table type, data format, external table location, diff --git a/src/databricks/labs/ucx/tasks.py b/src/databricks/labs/ucx/tasks.py index b93c338c48..56a0d2f45e 100644 --- a/src/databricks/labs/ucx/tasks.py +++ b/src/databricks/labs/ucx/tasks.py @@ -5,6 +5,7 @@ from pathlib import Path from databricks.labs.ucx.config import MigrationConfig +from databricks.labs.ucx.logger import _install _TASKS: dict[str, "Task"] = {} @@ -69,6 +70,8 @@ def trigger(*argv): current_task = _TASKS[task_name] print(current_task.doc) + _install() + cfg = MigrationConfig.from_file(Path(args["config"])) logging.getLogger("databricks").setLevel(cfg.log_level) diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index 69b5222469..31137b003f 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -12,6 +12,8 @@ from databricks.sdk.service.iam import PermissionLevel from databricks.sdk.service.workspace import ImportFormat +from databricks.labs.ucx.config import GroupsConfig, MigrationConfig, TaclConfig +from databricks.labs.ucx.install import Installer from databricks.labs.ucx.inventory.types import RequestObjectType from databricks.labs.ucx.providers.mixins.compute import CommandExecutor from databricks.labs.ucx.tacl.grants import Grant @@ -98,12 +100,86 @@ def test_sql_backend_works(ws, wsfs_wheel): assert len(database_names) > 0 -def test_creating_workflows(ws): - from databricks.labs.ucx.install import Installer +def test_assessment_job_with_no_inventory_database( + request, + ws, + sql_exec, + sql_fetch_all, + make_cluster_policy, + make_cluster_policy_permissions, + make_ucx_group, + make_job, + make_job_permissions, + make_random, + make_schema, + make_table, +): + ws_group_a, acc_group_a = make_ucx_group() + ws_group_b, acc_group_b = make_ucx_group() + ws_group_c, acc_group_c = make_ucx_group() + + schema_a = make_schema() + schema_b = make_schema() + schema_c = make_schema() + table_a = make_table(schema=schema_a) + table_b = make_table(schema=schema_b) + + sql_exec(f"GRANT USAGE ON SCHEMA default TO `{ws_group_a.display_name}`") + sql_exec(f"GRANT USAGE ON SCHEMA default TO `{ws_group_b.display_name}`") + sql_exec(f"GRANT SELECT ON TABLE {table_a} TO `{ws_group_a.display_name}`") + sql_exec(f"GRANT SELECT ON TABLE {table_b} TO `{ws_group_b.display_name}`") + sql_exec(f"GRANT MODIFY ON SCHEMA {schema_b} TO `{ws_group_b.display_name}`") + + cluster_policy = make_cluster_policy() + make_cluster_policy_permissions( + object_id=cluster_policy.policy_id, + permission_level=random.choice([PermissionLevel.CAN_USE]), + group_name=ws_group_a.display_name, + ) + + job = make_job() + make_job_permissions( + object_id=job.job_id, + permission_level=random.choice( + [PermissionLevel.CAN_VIEW, PermissionLevel.CAN_MANAGE_RUN, PermissionLevel.CAN_MANAGE] + ), + group_name=ws_group_b.display_name, + ) + + install = Installer(ws, prefix=make_random(4), promtps=False) + install._config = MigrationConfig( + inventory_database=f"ucx_{make_random(4)}", + instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], + groups=GroupsConfig(selected=[ws_group_a.display_name, ws_group_b.display_name, ws_group_c.display_name]), + tacl=TaclConfig(databases=[schema_a.split(".")[-1], schema_b.split(".")[-1], schema_c.split(".")[-1]]), + log_level="DEBUG", + ) + install._write_config() + install._create_jobs() + + def cleanup_created_resources(): + logger.debug(f"cleaning up install folder: {install._install_folder}") + ws.workspace.delete(install._install_folder, recursive=True) + + for step, job_id in install._deployed_steps.items(): + logger.debug(f"cleaning up {step} job_id={job_id}") + ws.jobs.delete(job_id) + + logger.debug(f"cleaning up inventory_database={install._config.inventory_database}") + sql_exec(f"DROP SCHEMA IF EXISTS `{install._config.inventory_database}` CASCADE") + + request.addfinalizer(cleanup_created_resources) + + logger.debug(f'starting job: {ws.config.host}#job/{install._deployed_steps["assessment"]}') + ws.jobs.run_now(install._deployed_steps["assessment"]).result() - inst = Installer(ws) + permissions = list(sql_fetch_all(f"SELECT * FROM hive_metastore.{install._config.inventory_database}.permissions")) + tables = list(sql_fetch_all(f"SELECT * FROM hive_metastore.{install._config.inventory_database}.tables")) + grants = list(sql_fetch_all(f"SELECT * FROM hive_metastore.{install._config.inventory_database}.grants")) - inst._create_jobs() + assert len(permissions) > 0 + assert len(tables) == 2 + assert len(grants) >= 5 def test_toolkit_notebook(