Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Determine ownership of tables based on grants and source code #3066

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from databricks.labs.ucx.assessment.export import AssessmentExporter
from databricks.labs.ucx.aws.credentials import CredentialManager
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership
from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.grants import (
Expand All @@ -43,13 +43,13 @@
PrincipalACL,
)
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership
from databricks.labs.ucx.hive_metastore.table_migrate import (
TableMigrationStatusRefresher,
TablesMigrator,
)
from databricks.labs.ucx.hive_metastore.table_move import TableMove
from databricks.labs.ucx.hive_metastore.tables import TableOwnership
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
Expand Down Expand Up @@ -263,17 +263,29 @@ def tables_crawler(self) -> TablesCrawler:

@cached_property
def table_ownership(self) -> TableOwnership:
return TableOwnership(self.administrator_locator)
return TableOwnership(
self.administrator_locator,
self.grants_crawler,
self.used_tables_crawler_for_paths,
self.used_tables_crawler_for_queries,
self.legacy_query_ownership,
self.workspace_path_ownership,
)

@cached_property
def workspace_path_ownership(self) -> WorkspacePathOwnership:
return WorkspacePathOwnership(self.administrator_locator, self.workspace_client)

@cached_property
def legacy_query_ownership(self) -> LegacyQueryOwnership:
return LegacyQueryOwnership(self.administrator_locator, self.workspace_client)

@cached_property
def directfs_access_ownership(self) -> DirectFsAccessOwnership:
return DirectFsAccessOwnership(
self.administrator_locator,
self.workspace_path_ownership,
self.legacy_query_ownership,
self.workspace_client,
)

Expand Down
18 changes: 18 additions & 0 deletions src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli
super().__init__(administrator_locator)
self._ws = ws

def owner_of_path(self, path: str) -> str:
return self.owner_of(WorkspacePath(self._ws, path))

@retried(on=[InternalError], timeout=timedelta(minutes=1))
def _maybe_direct_owner(self, record: WorkspacePath) -> str | None:
maybe_type_and_id = self._maybe_type_and_id(record)
Expand Down Expand Up @@ -237,3 +240,18 @@ def _infer_from_first_can_manage(object_permissions):
return acl.group_name
return acl.service_principal_name
return None


class LegacyQueryOwnership(Ownership[str]):
def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None:
super().__init__(administrator_locator)
self._workspace_client = workspace_client

def _maybe_direct_owner(self, record: str) -> str | None:
try:
legacy_query = self._workspace_client.queries.get(record)
return legacy_query.owner_user_name
except NotFound:
return None
except InternalError: # redash is very naughty and throws 500s instead of proper 404s
return None
111 changes: 111 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/ownership.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
from functools import cached_property

from databricks.labs.ucx.framework.owners import (
Ownership,
AdministratorLocator,
LegacyQueryOwnership,
WorkspacePathOwnership,
)
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.source_code.base import UsedTable
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler

logger = logging.getLogger(__name__)


class TableOwnership(Ownership[Table]):
"""Determine ownership of tables in the inventory based on the following rules:
- If a table is owned by a principal in the grants table, then that principal is the owner.
- If a table is written to by a query, then the owner of that query is the owner of the table.
- If a table is written to by a notebook or file, then the owner of the path is the owner of the table.
"""

def __init__(
self,
administrator_locator: AdministratorLocator,
grants_crawler: GrantsCrawler,
used_tables_in_paths: UsedTablesCrawler,
used_tables_in_queries: UsedTablesCrawler,
legacy_query_ownership: LegacyQueryOwnership,
workspace_path_ownership: WorkspacePathOwnership,
) -> None:
super().__init__(administrator_locator)
self._grants_crawler = grants_crawler
self._used_tables_in_paths = used_tables_in_paths
self._used_tables_in_queries = used_tables_in_queries
self._legacy_query_ownership = legacy_query_ownership
self._workspace_path_ownership = workspace_path_ownership

def _maybe_direct_owner(self, record: Table) -> str | None:
owner = self._maybe_from_grants(record)
if owner:
return owner
return self._maybe_from_sources(record)

def _maybe_from_sources(self, record: Table) -> str | None:
used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name))
if not used_table:
return None
# If something writes to a table, then it's an owner of it
if not used_table.is_write:
return None
if used_table.source_type == 'QUERY' and used_table.query_id:
return self._legacy_query_ownership.owner_of(used_table.query_id)
if used_table.source_type in {'NOTEBOOK', 'FILE'}:
return self._workspace_path_ownership.owner_of_path(used_table.source_id)
logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}")
return None

@cached_property
def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]:
index = {}
for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()):
for used_table in collection:
key = used_table.catalog_name, used_table.schema_name, used_table.table_name
index[key] = used_table
return index

def _maybe_from_grants(self, record: Table) -> str | None:
for grant in self._grants_snapshot:
if not grant.action_type == 'OWN':
continue
object_type, full_name = grant.this_type_and_key()
if object_type == 'TABLE' and full_name == record.key:
return grant.principal
if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}":
return grant.principal
return None

@cached_property
def _grants_snapshot(self):
return self._grants_crawler.snapshot()


class TableMigrationOwnership(Ownership[TableMigrationStatus]):
"""Determine ownership of table migration records in the inventory.

This is the owner of the source table, if (and only if) the source table is present in the inventory.
"""

def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None:
super().__init__(table_ownership._administrator_locator) # TODO: Fix this
self._tables_crawler = tables_crawler
self._table_ownership = table_ownership
self._indexed_tables: dict[tuple[str, str], Table] | None = None

def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]:
index = self._indexed_tables
if index is None or reindex:
snapshot = self._tables_crawler.snapshot()
index = {(table.database, table.name): table for table in snapshot}
self._indexed_tables = index
return index

def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None:
index = self._tables_snapshot_index()
source_table = index.get((record.src_schema, record.src_table), None)
return self._table_ownership.owner_of(source_table) if source_table is not None else None
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.tables import Table, TableOwnership

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,29 +160,3 @@ def _iter_schemas(self):
except NotFound:
logger.warning(f"Catalog {catalog.name} no longer exists. Skipping checking its migration status.")
continue


class TableMigrationOwnership(Ownership[TableMigrationStatus]):
"""Determine ownership of table migration records in the inventory.

This is the owner of the source table, if (and only if) the source table is present in the inventory.
"""

def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None:
super().__init__(table_ownership._administrator_locator)
self._tables_crawler = tables_crawler
self._table_ownership = table_ownership
self._indexed_tables: dict[tuple[str, str], Table] | None = None

def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]:
index = self._indexed_tables
if index is None or reindex:
snapshot = self._tables_crawler.snapshot()
index = {(table.database, table.name): table for table in snapshot}
self._indexed_tables = index
return index

def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None:
index = self._tables_snapshot_index()
source_table = index.get((record.src_schema, record.src_table), None)
return self._table_ownership.owner_of(source_table) if source_table is not None else None
11 changes: 0 additions & 11 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -660,13 +659,3 @@ def _create_describe_tasks(self, catalog: str, database: str, table_names: list[
for table in table_names:
tasks.append(partial(self._describe, catalog, database, table))
return tasks


class TableOwnership(Ownership[Table]):
"""Determine ownership of tables in the inventory.

At the present we don't determine a specific owner for tables.
"""

def _maybe_direct_owner(self, record: Table) -> None:
return None
10 changes: 10 additions & 0 deletions src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ def source_type(self) -> str | None:
last = self.source_lineage[-1]
return last.object_type

@property
def query_id(self) -> str | None:
if self.source_type != 'QUERY':
return None
last = self.source_lineage[-1]
parts = last.object_id.split('/')
if len(parts) < 2:
return None
return parts[1]


@dataclass
class UsedTable(SourceInfo):
Expand Down
18 changes: 10 additions & 8 deletions src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import DatabricksError, NotFound

from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator, WorkspacePathOwnership
from databricks.labs.ucx.framework.owners import (
Ownership,
AdministratorLocator,
WorkspacePathOwnership,
LegacyQueryOwnership,
)
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.source_code.base import DirectFsAccess

Expand Down Expand Up @@ -73,15 +78,17 @@ def __init__(
self,
administrator_locator: AdministratorLocator,
workspace_path_ownership: WorkspacePathOwnership,
legacy_query_ownership: LegacyQueryOwnership,
workspace_client: WorkspaceClient,
) -> None:
super().__init__(administrator_locator)
self._workspace_path_ownership = workspace_path_ownership
self._legacy_query_ownership = legacy_query_ownership
self._workspace_client = workspace_client

def _maybe_direct_owner(self, record: DirectFsAccess) -> str | None:
if record.source_type == 'QUERY':
return self._query_owner(record)
if record.source_type == 'QUERY' and record.query_id:
return self._legacy_query_ownership.owner_of(record.query_id)
if record.source_type in {'NOTEBOOK', 'FILE'}:
return self._notebook_owner(record)
logger.warning(f"Unknown source type {record.source_type} for {record.source_id}")
Expand All @@ -94,8 +101,3 @@ def _notebook_owner(self, record):
return owner
except NotFound:
return None

def _query_owner(self, record):
query_id = record.source_lineage[-1].object_id.split('/')[1]
legacy_query = self._workspace_client.queries.get(query_id)
return legacy_query.owner_user_name
5 changes: 2 additions & 3 deletions tests/integration/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationOwnership,
TableMigrationStatus,
TableMigrationStatusRefresher,
)
from databricks.labs.ucx.hive_metastore.tables import TableOwnership
from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership


def test_table_migration_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None:
Expand All @@ -32,7 +31,7 @@ def is_migration_record_for_table(record: TableMigrationStatus) -> bool:
synthetic_record = dataclasses.replace(table_migration_record, src_table="does_not_exist")

# Verify for the table that the table owner and the migration status are a match.
table_ownership = TableOwnership(runtime_ctx.administrator_locator)
table_ownership = runtime_ctx.table_ownership
table_migration_ownership = TableMigrationOwnership(tables_crawler, table_ownership)
assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record)

Expand Down
8 changes: 4 additions & 4 deletions tests/integration/hive_metastore/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from databricks.sdk.retries import retried

from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.tables import What, TableOwnership
from databricks.labs.ucx.hive_metastore.tables import What

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,7 +90,6 @@ def test_partitioned_tables(ws, sql_backend, make_schema, make_table):

def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None:
"""Verify the ownership can be determined for crawled tables."""
# This currently isn't very useful: we don't currently locate specific owners for tables.

# A table for which we'll determine the owner.
table = runtime_ctx.make_table()
Expand All @@ -103,5 +102,6 @@ def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None:
table_record = next(record for record in records if record.full_name == table.full_name)

# Verify ownership can be made.
ownership = TableOwnership(runtime_ctx.administrator_locator)
assert ownership.owner_of(table_record) == runtime_ctx.administrator_locator.get_workspace_administrator()
my_user = runtime_ctx.workspace_client.current_user.me()
owner = runtime_ctx.table_ownership.owner_of(table_record)
assert owner == my_user.user_name
3 changes: 1 addition & 2 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationIndex,
TableMigrationOwnership,
TableMigrationStatus,
TableView,
)
from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership
from databricks.labs.ucx.hive_metastore.tables import (
Table,
TableOwnership,
TablesCrawler,
What,
)
Expand Down
Loading
Loading