diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 300051c7e8..0d04270ac5 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -28,7 +28,7 @@ from databricks.labs.ucx.assessment.export import AssessmentExporter from databricks.labs.ucx.aws.credentials import CredentialManager from databricks.labs.ucx.config import WorkspaceConfig -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema from databricks.labs.ucx.hive_metastore.grants import ( @@ -43,13 +43,13 @@ PrincipalACL, ) from databricks.labs.ucx.hive_metastore.mapping import TableMapping -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership from databricks.labs.ucx.hive_metastore.table_migrate import ( TableMigrationStatusRefresher, TablesMigrator, ) from databricks.labs.ucx.hive_metastore.table_move import TableMove -from databricks.labs.ucx.hive_metastore.tables import TableOwnership from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore from databricks.labs.ucx.installer.workflows import DeployedWorkflows @@ -263,17 +263,29 @@ def tables_crawler(self) -> TablesCrawler: @cached_property def table_ownership(self) -> TableOwnership: - return TableOwnership(self.administrator_locator) + return TableOwnership( + self.administrator_locator, + self.grants_crawler, + self.used_tables_crawler_for_paths, + self.used_tables_crawler_for_queries, + self.legacy_query_ownership, + self.workspace_path_ownership, + ) @cached_property def workspace_path_ownership(self) -> WorkspacePathOwnership: return WorkspacePathOwnership(self.administrator_locator, self.workspace_client) + @cached_property + def legacy_query_ownership(self) -> LegacyQueryOwnership: + return LegacyQueryOwnership(self.administrator_locator, self.workspace_client) + @cached_property def directfs_access_ownership(self) -> DirectFsAccessOwnership: return DirectFsAccessOwnership( self.administrator_locator, self.workspace_path_ownership, + self.legacy_query_ownership, self.workspace_client, ) diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 0350ecfd60..55a1ddac98 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -201,6 +201,9 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli super().__init__(administrator_locator) self._ws = ws + def owner_of_path(self, path: str) -> str: + return self.owner_of(WorkspacePath(self._ws, path)) + @retried(on=[InternalError], timeout=timedelta(minutes=1)) def _maybe_direct_owner(self, record: WorkspacePath) -> str | None: maybe_type_and_id = self._maybe_type_and_id(record) @@ -237,3 +240,18 @@ def _infer_from_first_can_manage(object_permissions): return acl.group_name return acl.service_principal_name return None + + +class LegacyQueryOwnership(Ownership[str]): + def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None: + super().__init__(administrator_locator) + self._workspace_client = workspace_client + + def _maybe_direct_owner(self, record: str) -> str | None: + try: + legacy_query = self._workspace_client.queries.get(record) + return legacy_query.owner_user_name + except NotFound: + return None + except InternalError: # redash is very naughty and throws 500s instead of proper 404s + return None diff --git a/src/databricks/labs/ucx/hive_metastore/ownership.py b/src/databricks/labs/ucx/hive_metastore/ownership.py new file mode 100644 index 0000000000..b11f5f6e81 --- /dev/null +++ b/src/databricks/labs/ucx/hive_metastore/ownership.py @@ -0,0 +1,111 @@ +import logging +from functools import cached_property + +from databricks.labs.ucx.framework.owners import ( + Ownership, + AdministratorLocator, + LegacyQueryOwnership, + WorkspacePathOwnership, +) +from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus +from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler + +logger = logging.getLogger(__name__) + + +class TableOwnership(Ownership[Table]): + """Determine ownership of tables in the inventory based on the following rules: + - If a table is owned by a principal in the grants table, then that principal is the owner. + - If a table is written to by a query, then the owner of that query is the owner of the table. + - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + grants_crawler: GrantsCrawler, + used_tables_in_paths: UsedTablesCrawler, + used_tables_in_queries: UsedTablesCrawler, + legacy_query_ownership: LegacyQueryOwnership, + workspace_path_ownership: WorkspacePathOwnership, + ) -> None: + super().__init__(administrator_locator) + self._grants_crawler = grants_crawler + self._used_tables_in_paths = used_tables_in_paths + self._used_tables_in_queries = used_tables_in_queries + self._legacy_query_ownership = legacy_query_ownership + self._workspace_path_ownership = workspace_path_ownership + + def _maybe_direct_owner(self, record: Table) -> str | None: + owner = self._maybe_from_grants(record) + if owner: + return owner + return self._maybe_from_sources(record) + + def _maybe_from_sources(self, record: Table) -> str | None: + used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name)) + if not used_table: + return None + # If something writes to a table, then it's an owner of it + if not used_table.is_write: + return None + if used_table.source_type == 'QUERY' and used_table.query_id: + return self._legacy_query_ownership.owner_of(used_table.query_id) + if used_table.source_type in {'NOTEBOOK', 'FILE'}: + return self._workspace_path_ownership.owner_of_path(used_table.source_id) + logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") + return None + + @cached_property + def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: + index = {} + for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): + for used_table in collection: + key = used_table.catalog_name, used_table.schema_name, used_table.table_name + index[key] = used_table + return index + + def _maybe_from_grants(self, record: Table) -> str | None: + for grant in self._grants_snapshot: + if not grant.action_type == 'OWN': + continue + object_type, full_name = grant.this_type_and_key() + if object_type == 'TABLE' and full_name == record.key: + return grant.principal + if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}": + return grant.principal + return None + + @cached_property + def _grants_snapshot(self): + return self._grants_crawler.snapshot() + + +class TableMigrationOwnership(Ownership[TableMigrationStatus]): + """Determine ownership of table migration records in the inventory. + + This is the owner of the source table, if (and only if) the source table is present in the inventory. + """ + + def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: + super().__init__(table_ownership._administrator_locator) # TODO: Fix this + self._tables_crawler = tables_crawler + self._table_ownership = table_ownership + self._indexed_tables: dict[tuple[str, str], Table] | None = None + + def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: + index = self._indexed_tables + if index is None or reindex: + snapshot = self._tables_crawler.snapshot() + index = {(table.database, table.name): table for table in snapshot} + self._indexed_tables = index + return index + + def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: + index = self._tables_snapshot_index() + source_table = index.get((record.src_schema, record.src_table), None) + return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 9f697f9f32..dde5f17790 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -9,10 +9,8 @@ from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase -from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.tables import Table, TableOwnership logger = logging.getLogger(__name__) @@ -162,29 +160,3 @@ def _iter_schemas(self): except NotFound: logger.warning(f"Catalog {catalog.name} no longer exists. Skipping checking its migration status.") continue - - -class TableMigrationOwnership(Ownership[TableMigrationStatus]): - """Determine ownership of table migration records in the inventory. - - This is the owner of the source table, if (and only if) the source table is present in the inventory. - """ - - def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: - super().__init__(table_ownership._administrator_locator) - self._tables_crawler = tables_crawler - self._table_ownership = table_ownership - self._indexed_tables: dict[tuple[str, str], Table] | None = None - - def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: - index = self._indexed_tables - if index is None or reindex: - snapshot = self._tables_crawler.snapshot() - index = {(table.database, table.name): table for table in snapshot} - self._indexed_tables = index - return index - - def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: - index = self._tables_snapshot_index() - source_table = index.get((record.src_schema, record.src_table), None) - return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index 7a1f0e90a9..08f1864586 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -16,7 +16,6 @@ from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase -from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -660,13 +659,3 @@ def _create_describe_tasks(self, catalog: str, database: str, table_names: list[ for table in table_names: tasks.append(partial(self._describe, catalog, database, table)) return tasks - - -class TableOwnership(Ownership[Table]): - """Determine ownership of tables in the inventory. - - At the present we don't determine a specific owner for tables. - """ - - def _maybe_direct_owner(self, record: Table) -> None: - return None diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 6d46e9d600..659b38b2b7 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -231,6 +231,16 @@ def source_type(self) -> str | None: last = self.source_lineage[-1] return last.object_type + @property + def query_id(self) -> str | None: + if self.source_type != 'QUERY': + return None + last = self.source_lineage[-1] + parts = last.object_id.split('/') + if len(parts) < 2: + return None + return parts[1] + @dataclass class UsedTable(SourceInfo): diff --git a/src/databricks/labs/ucx/source_code/directfs_access.py b/src/databricks/labs/ucx/source_code/directfs_access.py index 33972f9e22..b0b449dd1a 100644 --- a/src/databricks/labs/ucx/source_code/directfs_access.py +++ b/src/databricks/labs/ucx/source_code/directfs_access.py @@ -10,7 +10,12 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk.errors import DatabricksError, NotFound -from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import ( + Ownership, + AdministratorLocator, + WorkspacePathOwnership, + LegacyQueryOwnership, +) from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.source_code.base import DirectFsAccess @@ -73,15 +78,17 @@ def __init__( self, administrator_locator: AdministratorLocator, workspace_path_ownership: WorkspacePathOwnership, + legacy_query_ownership: LegacyQueryOwnership, workspace_client: WorkspaceClient, ) -> None: super().__init__(administrator_locator) self._workspace_path_ownership = workspace_path_ownership + self._legacy_query_ownership = legacy_query_ownership self._workspace_client = workspace_client def _maybe_direct_owner(self, record: DirectFsAccess) -> str | None: - if record.source_type == 'QUERY': - return self._query_owner(record) + if record.source_type == 'QUERY' and record.query_id: + return self._legacy_query_ownership.owner_of(record.query_id) if record.source_type in {'NOTEBOOK', 'FILE'}: return self._notebook_owner(record) logger.warning(f"Unknown source type {record.source_type} for {record.source_id}") @@ -94,8 +101,3 @@ def _notebook_owner(self, record): return owner except NotFound: return None - - def _query_owner(self, record): - query_id = record.source_lineage[-1].object_id.split('/')[1] - legacy_query = self._workspace_client.queries.get(query_id) - return legacy_query.owner_user_name diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py index e9ba362a86..61f87ac8b2 100644 --- a/tests/integration/hive_metastore/test_table_migrate.py +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -2,11 +2,10 @@ from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import ( - TableMigrationOwnership, TableMigrationStatus, TableMigrationStatusRefresher, ) -from databricks.labs.ucx.hive_metastore.tables import TableOwnership +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership def test_table_migration_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None: @@ -32,7 +31,7 @@ def is_migration_record_for_table(record: TableMigrationStatus) -> bool: synthetic_record = dataclasses.replace(table_migration_record, src_table="does_not_exist") # Verify for the table that the table owner and the migration status are a match. - table_ownership = TableOwnership(runtime_ctx.administrator_locator) + table_ownership = runtime_ctx.table_ownership table_migration_ownership = TableMigrationOwnership(tables_crawler, table_ownership) assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record) diff --git a/tests/integration/hive_metastore/test_tables.py b/tests/integration/hive_metastore/test_tables.py index efd554591a..6041e53904 100644 --- a/tests/integration/hive_metastore/test_tables.py +++ b/tests/integration/hive_metastore/test_tables.py @@ -5,7 +5,7 @@ from databricks.sdk.retries import retried from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.tables import What, TableOwnership +from databricks.labs.ucx.hive_metastore.tables import What logger = logging.getLogger(__name__) @@ -90,7 +90,6 @@ def test_partitioned_tables(ws, sql_backend, make_schema, make_table): def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None: """Verify the ownership can be determined for crawled tables.""" - # This currently isn't very useful: we don't currently locate specific owners for tables. # A table for which we'll determine the owner. table = runtime_ctx.make_table() @@ -103,5 +102,6 @@ def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None: table_record = next(record for record in records if record.full_name == table.full_name) # Verify ownership can be made. - ownership = TableOwnership(runtime_ctx.administrator_locator) - assert ownership.owner_of(table_record) == runtime_ctx.administrator_locator.get_workspace_administrator() + my_user = runtime_ctx.workspace_client.current_user.me() + owner = runtime_ctx.table_ownership.owner_of(table_record) + assert owner == my_user.user_name diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 84f726433d..946a57b4f7 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -28,13 +28,12 @@ from databricks.labs.ucx.hive_metastore.table_migration_status import ( TableMigrationStatusRefresher, TableMigrationIndex, - TableMigrationOwnership, TableMigrationStatus, TableView, ) +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( Table, - TableOwnership, TablesCrawler, What, ) diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 716844806f..057389d49b 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -6,19 +6,22 @@ from databricks.labs.lsql.backends import MockBackend from databricks.labs.lsql.core import Row from databricks.labs.ucx.progress.history import ProgressEncoder + +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler from databricks.sdk import WorkspaceClient from databricks.labs.ucx.__about__ import __version__ as ucx_version -from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership from databricks.labs.ucx.hive_metastore.locations import Mount, ExternalLocations, MountsCrawler from databricks.labs.ucx.hive_metastore.tables import ( FasterTableScanCrawler, HiveSerdeType, Table, - TableOwnership, TablesCrawler, What, ) +from databricks.labs.ucx.hive_metastore.ownership import TableOwnership +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler def test_is_delta_true(): @@ -676,12 +679,30 @@ def test_table_owner() -> None: admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "an_admin" - ownership = TableOwnership(admin_locator) + grants_crawler = create_autospec(GrantsCrawler) + grants_crawler.snapshot.return_value = [] + used_tables_in_paths = create_autospec(UsedTablesCrawler) + used_tables_in_paths.snapshot.return_value = [] + used_tables_in_queries = create_autospec(UsedTablesCrawler) + used_tables_in_queries.snapshot.return_value = [] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + + ownership = TableOwnership( + admin_locator, + grants_crawler, + used_tables_in_paths, + used_tables_in_queries, + legacy_query_ownership, + workspace_path_ownership, + ) table = Table(catalog="main", database="foo", name="bar", object_type="TABLE", table_format="DELTA") owner = ownership.owner_of(table) assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of.assert_not_called() @pytest.mark.parametrize( diff --git a/tests/unit/source_code/test_directfs_access.py b/tests/unit/source_code/test_directfs_access.py index 5a9aadee80..2b381df95f 100644 --- a/tests/unit/source_code/test_directfs_access.py +++ b/tests/unit/source_code/test_directfs_access.py @@ -4,7 +4,7 @@ from databricks.labs.lsql.backends import MockBackend from databricks.sdk import WorkspaceClient -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership from databricks.labs.ucx.source_code.base import LineageAtom from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, @@ -43,11 +43,13 @@ def test_directfs_access_ownership() -> None: admin_locator = create_autospec(AdministratorLocator) workspace_path_ownership = create_autospec(WorkspacePathOwnership) workspace_path_ownership.owner_of.return_value = "other_admin" + legacy_query_ownership = create_autospec(LegacyQueryOwnership) - ownership = DirectFsAccessOwnership(admin_locator, workspace_path_ownership, ws) + ownership = DirectFsAccessOwnership(admin_locator, workspace_path_ownership, legacy_query_ownership, ws) dfsa = DirectFsAccess(source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="/x/y/z")]) owner = ownership.owner_of(dfsa) assert owner == "other_admin" ws.queries.get.assert_not_called() + legacy_query_ownership.owner_of.assert_not_called() admin_locator.get_workspace_administrator.assert_not_called()