Skip to content

Commit

Permalink
Added debug notebook companion to troubleshoot the installation (#191)
Browse files Browse the repository at this point in the history
This PR ensures that tests are working correctly and adds a debug notebook to troubleshoot the app on interactive clusters.
  • Loading branch information
nfx authored Sep 13, 2023
1 parent 77cb4c3 commit 0898bd6
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 49 deletions.
149 changes: 113 additions & 36 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,52 @@
from databricks.labs.ucx.runtime import main
from databricks.labs.ucx.tasks import _TASKS

TAG_STEP = "step"
TAG_APP = "App"

DEBUG_NOTEBOOK = """
# Databricks notebook source
# MAGIC %md
# MAGIC # Debug companion for UCX installation (see [README]({readme_link}))
# MAGIC
# MAGIC Production runs are supposed to be triggered through the following jobs: {job_links}
# MAGIC
# MAGIC **This notebook is overwritten with each UCX update/(re)install.**
# COMMAND ----------
# MAGIC %pip install /Workspace{remote_wheel}
dbutils.library.restartPython()
# COMMAND ----------
import logging
from pathlib import Path
from databricks.labs.ucx.__about__ import __version__
from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx import logger
from databricks.sdk import WorkspaceClient
logger._install()
logging.getLogger("databricks").setLevel("DEBUG")
cfg = MigrationConfig.from_file(Path("/Workspace{config_file}"))
ws = WorkspaceClient()
print(__version__)
"""

logger = logging.getLogger(__name__)


class Installer:
def __init__(self, ws: WorkspaceClient):
def __init__(self, ws: WorkspaceClient, *, prefix: str = "ucx", promtps: bool = True):
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
msg = "Installer is not supposed to be executed in Databricks Runtime"
raise SystemExit(msg)
self._ws = ws
self._prefix = prefix
self._prompts = promtps

def run(self):
self._configure()
Expand All @@ -45,7 +82,7 @@ def _my_username(self):

@property
def _install_folder(self):
return f"/Users/{self._my_username}/.ucx"
return f"/Users/{self._my_username}/.{self._prefix}"

@property
def _config_file(self):
Expand All @@ -60,14 +97,13 @@ def _current_config(self):
return self._config

def _configure(self):
config_path = self._config_file
ws_file_url = f"{self._ws.config.host}/#workspace{config_path}"
ws_file_url = self._notebook_link(self._config_file)
try:
self._ws.workspace.get_status(config_path)
self._ws.workspace.get_status(self._config_file)
logger.info(f"UCX is already configured. See {ws_file_url}")
if self._question("Type 'yes' to open config file in the browser") == "yes":
if self._prompts and self._question("Type 'yes' to open config file in the browser") == "yes":
webbrowser.open(ws_file_url)
return config_path
return
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
raise err
Expand All @@ -84,41 +120,55 @@ def _configure(self):
num_threads=int(self._question("Number of threads", default="8")),
)

config_bytes = yaml.dump(self._config.as_dict()).encode("utf8")
self._ws.workspace.upload(config_path, config_bytes, format=ImportFormat.AUTO)
logger.info(f"Created configuration file: {config_path}")
if self._question("Open config file in the browser and continue installing?", default="yes") == "yes":
self._write_config()
msg = "Open config file in the browser and continue installing?"
if self._prompts and self._question(msg, default="yes") == "yes":
webbrowser.open(ws_file_url)

def _write_config(self):
try:
self._ws.workspace.get_status(self._install_folder)
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
raise err
logger.debug(f"Creating install folder: {self._install_folder}")
self._ws.workspace.mkdirs(self._install_folder)

config_bytes = yaml.dump(self._config.as_dict()).encode("utf8")
logger.info(f"Creating configuration file: {self._config_file}")
self._ws.workspace.upload(self._config_file, config_bytes, format=ImportFormat.AUTO)

def _create_jobs(self):
logger.debug(f"Creating jobs from tasks in {main.__name__}")
dbfs_path = self._upload_wheel()
deployed_steps = self._deployed_steps()
remote_wheel = self._upload_wheel()
self._deployed_steps = self._deployed_steps()
desired_steps = {t.workflow for t in _TASKS.values()}
for step_name in desired_steps:
settings = self._job_settings(step_name, dbfs_path)
if step_name in deployed_steps:
job_id = deployed_steps[step_name]
settings = self._job_settings(step_name, remote_wheel)
if step_name in self._deployed_steps:
job_id = self._deployed_steps[step_name]
logger.info(f"Updating configuration for step={step_name} job_id={job_id}")
self._ws.jobs.reset(job_id, jobs.JobSettings(**settings))
else:
logger.info(f"Creating new job configuration for step={step_name}")
deployed_steps[step_name] = self._ws.jobs.create(**settings).job_id
self._deployed_steps[step_name] = self._ws.jobs.create(**settings).job_id

for step_name, job_id in deployed_steps.items():
for step_name, job_id in self._deployed_steps.items():
if step_name not in desired_steps:
logger.info(f"Removing job_id={job_id}, as it is no longer needed")
self._ws.jobs.delete(job_id)

self._create_readme(deployed_steps)
self._create_readme()
self._create_debug(remote_wheel)

def _create_readme(self, deployed_steps):
def _create_readme(self):
md = [
"# UCX - The Unity Catalog Migration Assistant",
"Here are the descriptions of jobs that trigger various stages of migration.",
f'To troubleshoot, see [debug notebook]({self._notebook_link(f"{self._install_folder}/DEBUG.py")}).',
]
for step_name, job_id in deployed_steps.items():
md.append(f"## [[UCX] {step_name}]({self._ws.config.host}#job/{job_id})\n")
for step_name, job_id in self._deployed_steps.items():
md.append(f"## [[{self._prefix.upper()}] {step_name}]({self._ws.config.host}#job/{job_id})\n")
for t in _TASKS.values():
if t.workflow != step_name:
continue
Expand All @@ -129,12 +179,31 @@ def _create_readme(self, deployed_steps):
intro = "\n".join(preamble + [f"# MAGIC {line}" for line in md])
path = f"{self._install_folder}/README.py"
self._ws.workspace.upload(path, intro.encode("utf8"), overwrite=True)
url = f"{self._ws.config.host}/#workspace{path}"
logger.info(f"Created notebook with job overview: {url}")
url = self._notebook_link(path)
logger.info(f"Created README notebook with job overview: {url}")
msg = "Type 'yes' to open job overview in README notebook in your home directory"
if self._question(msg) == "yes":
if self._prompts and self._question(msg) == "yes":
webbrowser.open(url)

def _create_debug(self, remote_wheel: str):
readme_link = self._notebook_link(f"{self._install_folder}/README.py")
job_links = ", ".join(
f"[[{self._prefix.upper()}] {step_name}]({self._ws.config.host}#job/{job_id})"
for step_name, job_id in self._deployed_steps.items()
)
path = f"{self._install_folder}/DEBUG.py"
logger.debug(f"Created debug notebook: {self._notebook_link(path)}")
self._ws.workspace.upload(
path,
DEBUG_NOTEBOOK.format(
remote_wheel=remote_wheel, readme_link=readme_link, job_links=job_links, config_file=self._config_file
).encode("utf8"),
overwrite=True,
)

def _notebook_link(self, path: str) -> str:
return f"{self._ws.config.host}/#workspace{path}"

@staticmethod
def _question(text: str, *, default: str | None = None) -> str:
default_help = "" if default is None else f"\033[36m (default: {default})\033[0m"
Expand All @@ -146,14 +215,20 @@ def _question(text: str, *, default: str | None = None) -> str:
return default
return res

def _upload_wheel(self):
def _upload_wheel(self) -> str:
with tempfile.TemporaryDirectory() as tmp_dir:
wheel = self._build_wheel(tmp_dir)
dbfs_path = f"{self._install_folder}/wheels/{wheel.name}"
with wheel.open("rb") as f:
logger.info(f"Uploading wheel to dbfs:{dbfs_path}")
self._ws.dbfs.upload(dbfs_path, f, overwrite=True)
return dbfs_path
local_wheel = self._build_wheel(tmp_dir)
remote_wheel = f"{self._install_folder}/wheels/{local_wheel.name}"
remote_dirname = os.path.dirname(remote_wheel)
with local_wheel.open("rb") as f:
self._ws.dbfs.mkdirs(remote_dirname)
logger.info(f"Uploading wheel to dbfs:{remote_wheel}")
self._ws.dbfs.upload(remote_wheel, f, overwrite=True)
with local_wheel.open("rb") as f:
self._ws.workspace.mkdirs(remote_dirname)
logger.info(f"Uploading wheel to /Workspace{remote_wheel}")
self._ws.workspace.upload(remote_wheel, f, overwrite=True, format=ImportFormat.AUTO)
return remote_wheel

def _job_settings(self, step_name, dbfs_path):
config_file = f"/Workspace/{self._install_folder}/config.yml"
Expand All @@ -164,8 +239,8 @@ def _job_settings(self, step_name, dbfs_path):
)
tasks = sorted([t for t in _TASKS.values() if t.workflow == step_name], key=lambda _: _.name)
return {
"name": f"[UCX] {step_name}",
"tags": {"App": "ucx", "step": step_name},
"name": f"[{self._prefix.upper()}] {step_name}",
"tags": {TAG_APP: self._prefix, TAG_STEP: step_name},
"job_clusters": self._job_clusters({t.job_cluster for t in tasks}),
"email_notifications": email_notifications,
"tasks": [
Expand Down Expand Up @@ -210,6 +285,7 @@ def _job_clusters(self, names: set[str]):
spec,
data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL,
spark_conf={"spark.databricks.acl.sqlOnly": "true"},
num_workers=1, # ShowPermissionsCommand needs a worker
custom_tags={},
),
)
Expand Down Expand Up @@ -270,13 +346,14 @@ def _cluster_node_type(self, spec: compute.ClusterSpec) -> compute.ClusterSpec:

def _deployed_steps(self):
deployed_steps = {}
logger.debug(f"Fetching all jobs to determine already deployed steps for app={self._prefix}")
for j in self._ws.jobs.list():
tags = j.settings.tags
if tags is None:
continue
if tags.get("App", None) != "ucx":
if tags.get(TAG_APP, None) != self._prefix:
continue
deployed_steps[tags.get("step", "_")] = j.job_id
deployed_steps[tags.get(TAG_STEP, "_")] = j.job_id
return deployed_steps


Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/providers/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def create(
kwargs["roles"] = _scim_values(roles)
if entitlements is not None:
kwargs["entitlements"] = _scim_values(entitlements)
# TODO: REQUEST_LIMIT_EXCEEDED: GetUserPermissionsRequest RPC token bucket limit has been exceeded.
return interface.create(**kwargs)

yield from factory(name, create, lambda item: interface.delete(item.id))
Expand Down
5 changes: 0 additions & 5 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx.logger import _install
from databricks.labs.ucx.tasks import task, trigger
from databricks.labs.ucx.toolkits.group_migration import GroupMigrationToolkit
from databricks.labs.ucx.toolkits.table_acls import TaclToolkit

_install()

logging.root.setLevel("INFO")

logger = logging.getLogger(__name__)


Expand Down
9 changes: 7 additions & 2 deletions src/databricks/labs/ucx/tacl/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ def __init__(self, ws: WorkspaceClient, warehouse_id):
self._warehouse_id = warehouse_id

def execute(self, sql):
logger.debug(f"[api][execute] {sql}")
self._sql.execute(self._warehouse_id, sql)

def fetch(self, sql) -> Iterator[any]:
logger.debug(f"[api][fetch] {sql}")
return self._sql.execute_fetch_all(self._warehouse_id, sql)


Expand All @@ -43,9 +45,11 @@ def __init__(self):
self._spark = SparkSession.builder.getOrCreate()

def execute(self, sql):
logger.debug(f"[spark][execute] {sql}")
self._spark.sql(sql)

def fetch(self, sql) -> Iterator[any]:
logger.debug(f"[spark][fetch] {sql}")
return self._spark.sql(sql).collect()


Expand Down Expand Up @@ -160,6 +164,7 @@ def _snapshot(self, klass, fetcher, loader) -> list[any]:
logger.debug(f"[{self._full_name}] crawling new batch for {self._table}")
loaded_records = list(loader())
if len(loaded_records) > 0:
logger.debug(f"[{self._full_name}] found {len(loaded_records)} new records for {self._table}")
self._append_records(klass, loaded_records)
loaded = True

Expand Down Expand Up @@ -230,10 +235,10 @@ def _append_records(self, klass, records: Iterator[any]):
logger.debug(f"[{self._full_name}] not found. creating")
schema = ", ".join(f"{f.name} {self._field_type(f)}" for f in fields)
try:
ddl = f"CREATE TABLE {self._full_name} ({schema}) USING DELTA"
self._exec(ddl)
self._exec(f"CREATE TABLE {self._full_name} ({schema}) USING DELTA")
except Exception as e:
schema_not_found = "SCHEMA_NOT_FOUND" in str(e)
if not schema_not_found:
raise e
logger.debug(f"[{self._catalog}.{self._schema}] not found. creating")
self._exec(f"CREATE SCHEMA {self._catalog}.{self._schema}")
6 changes: 5 additions & 1 deletion src/databricks/labs/ucx/tacl/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ def _crawl(self, catalog: str, database: str) -> list[Grant]:
tasks.append(partial(fn, view=table.name))
else:
tasks.append(partial(fn, table=table.name))
return [grant for grants in ThreadedExecution.gather("listing grants", tasks) for grant in grants]
return [
grant
for grants in ThreadedExecution.gather(f"listing grants for {catalog}.{database}", tasks)
for grant in grants
]

def _grants(
self,
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/tacl/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _crawl(self, catalog: str, database: str) -> list[Table]:
tasks = []
for _, table, _is_tmp in self._fetch(f"SHOW TABLES FROM {catalog}.{database}"):
tasks.append(partial(self._describe, catalog, database, table))
return ThreadedExecution.gather("listing tables", tasks)
return ThreadedExecution.gather(f"listing tables in {catalog}.{database}", tasks)

def _describe(self, catalog: str, database: str, table: str) -> Table:
"""Fetches metadata like table type, data format, external table location,
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

from databricks.labs.ucx.config import MigrationConfig
from databricks.labs.ucx.logger import _install

_TASKS: dict[str, "Task"] = {}

Expand Down Expand Up @@ -69,6 +70,8 @@ def trigger(*argv):
current_task = _TASKS[task_name]
print(current_task.doc)

_install()

cfg = MigrationConfig.from_file(Path(args["config"]))
logging.getLogger("databricks").setLevel(cfg.log_level)

Expand Down
Loading

0 comments on commit 0898bd6

Please sign in to comment.