diff --git a/pyproject.toml b/pyproject.toml index 0228fd89b7..d0c99499fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,15 +31,11 @@ dependencies = [ # TODO: remove later "typer[all]>=0.9.0,<0.10.0", - "pandas>=2.0.3,<3.0.0", "ratelimit>=2.2.1,<3.0.0", "tenacity>=8.2.2,<9.0.0", ] [project.optional-dependencies] -dbconnect = [ - "databricks-connect>=13.2.0,<=14.0.0" -] test = [ "coverage[toml]>=6.5", "pytest", @@ -62,9 +58,7 @@ path = "src/databricks/labs/ucx/__about__.py" [tool.hatch.envs.unit] dependencies = [ - "databricks-labs-ucx[test]", - "pyspark>=3.4.0,<=3.5.0", - "delta-spark>=2.4.0,<3.0.0" + "databricks-labs-ucx[test]" ] [tool.hatch.envs.unit.scripts] @@ -74,8 +68,6 @@ test-cov-report = "pytest --cov src tests/unit --cov-report=html" [tool.hatch.envs.integration] dependencies = [ "databricks-labs-ucx[test]", - "databricks-labs-ucx[dbconnect]", - "delta-spark>=2.4.0,<3.0.0" ] [tool.hatch.envs.integration.scripts] @@ -108,10 +100,6 @@ profile = "black" [tool.pytest.ini_options] addopts = "-s -p no:warnings -vv --cache-clear" -filterwarnings = [ - "ignore:::.*pyspark.broadcast*", - "ignore:::.*pyspark.sql.pandas.utils*" -] [tool.black] target-version = ["py310"] diff --git a/src/databricks/labs/ucx/inventory/permissions.py b/src/databricks/labs/ucx/inventory/permissions.py index ba6e5a826d..257edd24e5 100644 --- a/src/databricks/labs/ucx/inventory/permissions.py +++ b/src/databricks/labs/ucx/inventory/permissions.py @@ -7,7 +7,6 @@ from databricks.labs.ucx.inventory.permissions_inventory import ( PermissionsInventoryTable, ) -from databricks.labs.ucx.inventory.types import PermissionsInventoryItem from databricks.labs.ucx.providers.groups_info import GroupMigrationState from databricks.labs.ucx.support.impl import SupportsProvider from databricks.labs.ucx.utils import ThreadedExecution @@ -28,8 +27,7 @@ def inventorize_permissions(self): crawler_tasks = list(self._supports_provider.get_crawler_tasks()) logger.info(f"Total crawler tasks: {len(crawler_tasks)}") logger.info("Starting the permissions inventorization") - execution = ThreadedExecution[PermissionsInventoryItem | None](crawler_tasks) - results = execution.run() + results = ThreadedExecution.gather("crawl permissions", crawler_tasks) items = [item for item in results if item is not None] logger.info(f"Total inventorized items: {len(items)}") self._permissions_inventory.save(items) @@ -62,6 +60,5 @@ def apply_group_permissions(self, migration_state: GroupMigrationState, destinat logger.info(f"Total applier tasks: {len(applier_tasks)}") logger.info("Starting the permissions application") - execution = ThreadedExecution(applier_tasks) - execution.run() + ThreadedExecution.gather("apply permissions", applier_tasks) logger.info("Permissions were applied") diff --git a/src/databricks/labs/ucx/inventory/permissions_inventory.py b/src/databricks/labs/ucx/inventory/permissions_inventory.py index aba8216a04..52ff447595 100644 --- a/src/databricks/labs/ucx/inventory/permissions_inventory.py +++ b/src/databricks/labs/ucx/inventory/permissions_inventory.py @@ -1,50 +1,31 @@ import logging -from databricks.sdk import WorkspaceClient - from databricks.labs.ucx.inventory.types import PermissionsInventoryItem -from databricks.labs.ucx.providers.spark import SparkMixin +from databricks.labs.ucx.tacl._internal import CrawlerBase, SqlBackend logger = logging.getLogger(__name__) -class PermissionsInventoryTable(SparkMixin): - def __init__(self, inventory_database: str, ws: WorkspaceClient): - super().__init__(ws) - self._table = f"hive_metastore.{inventory_database}.permissions" - - @property - def _table_schema(self): - from pyspark.sql.types import StringType, StructField, StructType - - return StructType( - [ - StructField("object_id", StringType(), True), - StructField("support", StringType(), True), - StructField("raw_object_permissions", StringType(), True), - ] - ) - - @property - def _df(self): - return self.spark.table(self._table) +class PermissionsInventoryTable(CrawlerBase): + def __init__(self, backend: SqlBackend, inventory_database: str): + super().__init__(backend, "hive_metastore", inventory_database, "permissions") def cleanup(self): - logger.info(f"Cleaning up inventory table {self._table}") - self.spark.sql(f"DROP TABLE IF EXISTS {self._table}") + logger.info(f"Cleaning up inventory table {self._full_name}") + self._exec(f"DROP TABLE IF EXISTS {self._full_name}") logger.info("Inventory table cleanup complete") def save(self, items: list[PermissionsInventoryItem]): # TODO: update instead of append - logger.info(f"Saving {len(items)} items to inventory table {self._table}") - serialized_items = [item.as_dict() for item in items] - df = self.spark.createDataFrame(serialized_items, schema=self._table_schema) - df.write.mode("append").format("delta").saveAsTable(self._table) + logger.info(f"Saving {len(items)} items to inventory table {self._full_name}") + self._append_records(PermissionsInventoryItem, items) logger.info("Successfully saved the items to inventory table") def load_all(self) -> list[PermissionsInventoryItem]: - logger.info(f"Loading inventory table {self._table}") - df = self._df.toPandas() - - logger.info("Successfully loaded the inventory table") - return PermissionsInventoryItem.from_pandas(df) + logger.info(f"Loading inventory table {self._full_name}") + return [ + PermissionsInventoryItem(object_id, support, raw_object_permissions) + for object_id, support, raw_object_permissions in self._fetch( + f"SELECT object_id, support, raw_object_permissions FROM {self._full_name}" + ) + ] diff --git a/src/databricks/labs/ucx/inventory/types.py b/src/databricks/labs/ucx/inventory/types.py index cea7bfad72..276db209c3 100644 --- a/src/databricks/labs/ucx/inventory/types.py +++ b/src/databricks/labs/ucx/inventory/types.py @@ -1,8 +1,6 @@ -from dataclasses import asdict, dataclass +from dataclasses import dataclass from typing import Literal -import pandas as pd - from databricks.labs.ucx.generic import StrEnum Destination = Literal["backup", "account"] @@ -32,19 +30,3 @@ class PermissionsInventoryItem: object_id: str support: str # shall be taken from CRAWLERS dict raw_object_permissions: str - - @staticmethod - def from_pandas(source: pd.DataFrame) -> list["PermissionsInventoryItem"]: - items = source.to_dict(orient="records") - return [PermissionsInventoryItem.from_dict(item) for item in items] - - def as_dict(self) -> dict: - return asdict(self) - - @classmethod - def from_dict(cls, raw: dict) -> "PermissionsInventoryItem": - return cls( - object_id=raw["object_id"], - raw_object_permissions=raw["raw_object_permissions"], - support=raw["support"], - ) diff --git a/src/databricks/labs/ucx/providers/spark.py b/src/databricks/labs/ucx/providers/spark.py deleted file mode 100644 index 7d7b2dc07b..0000000000 --- a/src/databricks/labs/ucx/providers/spark.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging - -from databricks.sdk import WorkspaceClient - -logger = logging.getLogger(__name__) - - -class SparkMixin: - def __init__(self, ws: WorkspaceClient): - self._spark = self._initialize_spark(ws) - - @staticmethod - def _initialize_spark(ws: WorkspaceClient): - logger.info("Initializing Spark session") - try: - from databricks.sdk.runtime import spark - - return spark - except ImportError: - logger.info("Using DB Connect") - from databricks.connect import DatabricksSession - - if ws.config.cluster_id is None: - msg = "DATABRICKS_CLUSTER_ID environment variable is not set, cannot use DB Connect" - raise RuntimeError(msg) from None - - cluster_id = ws.config.cluster_id - cluster_info = ws.clusters.get(cluster_id) - - logger.info(f"Ensuring that cluster {cluster_id} ({cluster_info.cluster_name}) is running") - ws.clusters.ensure_cluster_is_running(cluster_id) - - logger.info("Cluster is ready, creating the DBConnect session") - spark = DatabricksSession.builder.sdkConfig(ws.config).getOrCreate() - return spark - - @property - def spark(self): - return self._spark diff --git a/src/databricks/labs/ucx/toolkits/group_migration.py b/src/databricks/labs/ucx/toolkits/group_migration.py index 4fec89fe02..2796d213a5 100644 --- a/src/databricks/labs/ucx/toolkits/group_migration.py +++ b/src/databricks/labs/ucx/toolkits/group_migration.py @@ -10,10 +10,15 @@ from databricks.labs.ucx.inventory.verification import VerificationManager from databricks.labs.ucx.managers.group import GroupManager from databricks.labs.ucx.support.impl import SupportsProvider +from databricks.labs.ucx.tacl._internal import ( + RuntimeBackend, + SqlBackend, + StatementExecutionBackend, +) class GroupMigrationToolkit: - def __init__(self, config: MigrationConfig): + def __init__(self, config: MigrationConfig, *, warehouse_id=None): self._num_threads = config.num_threads self._workspace_start_path = config.workspace_start_path @@ -27,13 +32,20 @@ def __init__(self, config: MigrationConfig): self._verify_ws_client(self._ws) self._group_manager = GroupManager(self._ws, config.groups) - self._permissions_inventory = PermissionsInventoryTable(config.inventory_database, self._ws) + sql_backend = self._backend(self._ws, warehouse_id) + self._permissions_inventory = PermissionsInventoryTable(sql_backend, config.inventory_database) self._supports_provider = SupportsProvider(self._ws, self._num_threads, self._workspace_start_path) self._permissions_manager = PermissionManager( self._ws, self._permissions_inventory, supports_provider=self._supports_provider ) self._verification_manager = VerificationManager(self._ws, self._supports_provider.supports["secrets"]) + @staticmethod + def _backend(ws: WorkspaceClient, warehouse_id: str | None = None) -> SqlBackend: + if warehouse_id is None: + return RuntimeBackend() + return StatementExecutionBackend(ws, warehouse_id) + @staticmethod def _verify_ws_client(w: WorkspaceClient): _me = w.current_user.me() diff --git a/src/databricks/labs/ucx/toolkits/table_acls.py b/src/databricks/labs/ucx/toolkits/table_acls.py index 32f15714ed..097c17313a 100644 --- a/src/databricks/labs/ucx/toolkits/table_acls.py +++ b/src/databricks/labs/ucx/toolkits/table_acls.py @@ -25,9 +25,7 @@ def __init__( self._tc = TablesCrawler(self._backend(ws, warehouse_id), inventory_catalog, inventory_schema) self._gc = GrantsCrawler(self._tc) - self._databases = ( - databases if databases else [database["databaseName"] for database in self._tc._all_databases()] - ) + self._databases = databases if databases else [database for (database,) in self._tc._all_databases()] def database_snapshot(self): tables = [] diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index da2118eacc..0cc1a0e74b 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -166,7 +166,9 @@ def test_e2e( tacl=TaclConfig(auto=True), log_level="DEBUG", ) - toolkit = GroupMigrationToolkit(config) + + warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"] + toolkit = GroupMigrationToolkit(config, warehouse_id=warehouse_id) toolkit.prepare_environment() group_migration_state = toolkit._group_manager.migration_groups_provider diff --git a/tests/integration/test_permissions.py b/tests/integration/test_permissions.py new file mode 100644 index 0000000000..8d1692b49f --- /dev/null +++ b/tests/integration/test_permissions.py @@ -0,0 +1,23 @@ +import os + +from databricks.labs.ucx.inventory.permissions_inventory import ( + PermissionsInventoryTable, +) +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.tacl._internal import StatementExecutionBackend + + +def test_permissions_save_and_load(ws, make_schema): + schema = make_schema().split(".")[-1] + backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"]) + pi = PermissionsInventoryTable(backend, schema) + + saved = [ + PermissionsInventoryItem(object_id="abc", support="bcd", raw_object_permissions="def"), + PermissionsInventoryItem(object_id="efg", support="fgh", raw_object_permissions="ghi"), + ] + + pi.save(saved) + loaded = pi.load_all() + + assert saved == loaded diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py deleted file mode 100644 index 27fe1310fd..0000000000 --- a/tests/unit/conftest.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging -import shutil -import tempfile -from pathlib import Path - -import pytest -from delta import configure_spark_with_delta_pip -from pyspark.sql import SparkSession - -logger = logging.getLogger(__name__) - - -@pytest.fixture(scope="session") -def spark() -> SparkSession: - """ - This fixture provides preconfigured SparkSession with Hive and Delta support. - After the test session, temporary warehouse directory is deleted. - :return: SparkSession - """ - logger.info("Configuring Spark session for testing environment") - warehouse_dir = tempfile.TemporaryDirectory().name - _builder = ( - SparkSession.builder.master("local[1]") - .config("spark.hive.metastore.warehouse.dir", Path(warehouse_dir).as_uri()) - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") - .config( - "spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.delta.catalog.DeltaCatalog", - ) - ) - spark: SparkSession = configure_spark_with_delta_pip(_builder).getOrCreate() - logger.info("Spark session configured") - yield spark - logger.info("Shutting down Spark session") - spark.stop() - if Path(warehouse_dir).exists(): - shutil.rmtree(warehouse_dir) diff --git a/tests/unit/test_permissions_inventory.py b/tests/unit/test_permissions_inventory.py index e5ee4a1bea..e5554d6ec2 100644 --- a/tests/unit/test_permissions_inventory.py +++ b/tests/unit/test_permissions_inventory.py @@ -1,65 +1,55 @@ -from unittest.mock import Mock - -import pandas as pd -import pytest -from pyspark.sql.types import StringType, StructField, StructType - from databricks.labs.ucx.inventory.permissions_inventory import ( PermissionsInventoryTable, ) from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.providers.mixins.sql import Row +from .mocks import MockBackend -@pytest.fixture -def workspace_client(): - client = Mock() - return client +def test_inventory_table_manager_init(): + b = MockBackend() + pi = PermissionsInventoryTable(b, "test_database") -@pytest.fixture -def permissions_inventory(workspace_client, mocker): - mocker.patch("databricks.labs.ucx.providers.spark.SparkMixin._initialize_spark", Mock()) - return PermissionsInventoryTable("test_database", workspace_client) + assert pi._full_name == "hive_metastore.test_database.permissions" -def test_inventory_table_manager_init(permissions_inventory): - assert str(permissions_inventory._table) == "hive_metastore.test_database.permissions" +def test_cleanup(): + b = MockBackend() + pi = PermissionsInventoryTable(b, "test_database") + pi.cleanup() -def test_table_schema(permissions_inventory): - schema = StructType( - [ - StructField("object_id", StringType(), True), - StructField("support", StringType(), True), - StructField("raw_object_permissions", StringType(), True), - ] - ) - assert permissions_inventory._table_schema == schema + assert "DROP TABLE IF EXISTS hive_metastore.test_database.permissions" == b.queries[0] -def test_table(permissions_inventory): - assert permissions_inventory._df == permissions_inventory.spark.table("test_catalog.test_database.permissions") +def test_save(): + b = MockBackend() + pi = PermissionsInventoryTable(b, "test_database") + pi.save([PermissionsInventoryItem("object1", "clusters", "test acl")]) -def test_cleanup(permissions_inventory): - permissions_inventory.cleanup() - permissions_inventory.spark.sql.assert_called_with("DROP TABLE IF EXISTS hive_metastore.test_database.permissions") + assert ( + "INSERT INTO hive_metastore.test_database.permissions (object_id, support, " + "raw_object_permissions) VALUES ('object1', 'clusters', 'test acl')" + ) == b.queries[0] -def test_save(permissions_inventory): - perm_items = [PermissionsInventoryItem("object1", "clusters", "test acl")] - permissions_inventory.save(perm_items) - permissions_inventory.spark.createDataFrame.assert_called_once() +def make_row(data, columns): + row = Row(data) + row.__columns__ = columns + return row -def test_load_all(permissions_inventory): - items = pd.DataFrame( - { - "object_id": ["object1"], - "support": ["clusters"], - "raw_object_permissions": ["test acl"], +def test_load_all(): + b = MockBackend( + rows={ + "SELECT": [ + make_row(("object1", "clusters", "test acl"), ["object_id", "support", "raw_object_permissions"]), + ] } ) - permissions_inventory._df.toPandas.return_value = items - output = permissions_inventory.load_all() + pi = PermissionsInventoryTable(b, "test_database") + + output = pi.load_all() assert output[0] == PermissionsInventoryItem("object1", support="clusters", raw_object_permissions="test acl") diff --git a/tests/unit/test_permissions_manager.py b/tests/unit/test_permissions_manager.py index 5bef9ae5d0..8d0bf9700f 100644 --- a/tests/unit/test_permissions_manager.py +++ b/tests/unit/test_permissions_manager.py @@ -13,16 +13,10 @@ from databricks.labs.ucx.support.impl import SupportsProvider -@pytest.fixture(scope="function") -def spark_mixin(): - with mock.patch("databricks.labs.ucx.providers.spark.SparkMixin._initialize_spark", MagicMock()): - yield - - -def test_manager_inventorize(spark_mixin): +def test_manager_inventorize(): sup = SupportsProvider(ws=MagicMock(), num_threads=1, workspace_start_path="/") pm = PermissionManager( - ws=MagicMock(), permissions_inventory=PermissionsInventoryTable("test", MagicMock()), supports_provider=sup + ws=MagicMock(), permissions_inventory=PermissionsInventoryTable(MagicMock(), "test"), supports_provider=sup ) with mock.patch("databricks.labs.ucx.inventory.permissions.ThreadedExecution.run", MagicMock()) as run_mock: @@ -30,7 +24,7 @@ def test_manager_inventorize(spark_mixin): run_mock.assert_called_once() -def test_manager_apply(spark_mixin): +def test_manager_apply(): sup = SupportsProvider(ws=MagicMock(), num_threads=1, workspace_start_path="/") inventory = MagicMock(spec=PermissionsInventoryTable) inventory.load_all.return_value = [