Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added collection of used tables from Python notebooks and files and SQL queries #2772

Merged
merged 26 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
from databricks.labs.ucx.recon.migration_recon import ReconResult
from databricks.labs.ucx.runtime import Workflows
from databricks.labs.ucx.source_code.base import TableInfo
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess
from databricks.labs.ucx.source_code.jobs import JobProblem
from databricks.labs.ucx.source_code.queries import QueryProblem
Expand Down Expand Up @@ -124,6 +125,8 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "recon_results", ReconResult),
functools.partial(table, "directfs_in_paths", DirectFsAccess),
functools.partial(table, "directfs_in_queries", DirectFsAccess),
functools.partial(table, "table_infos_in_paths", TableInfo),
functools.partial(table, "table_infos_in_queries", TableInfo),
],
)
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql")
Expand All @@ -133,6 +136,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
deployer.deploy_view("code_patterns", "queries/views/code_patterns.sql")
deployer.deploy_view("reconciliation_results", "queries/views/reconciliation_results.sql")
deployer.deploy_view("directfs", "queries/views/directfs.sql")
deployer.deploy_view("table_infos", "queries/views/table_infos.sql")


def extract_major_minor(version_string):
Expand Down
21 changes: 21 additions & 0 deletions src/databricks/labs/ucx/queries/views/table_infos.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
SELECT
catalog_name,
schema_name,
table_name,
source_id,
source_timestamp,
source_lineage,
assessment_start_timestamp,
assessment_end_timestamp
FROM $inventory.table_infos_in_paths
UNION ALL
SELECT
catalog_name,
schema_name,
table_name,
source_id,
source_timestamp,
source_lineage,
assessment_start_timestamp,
assessment_end_timestamp
FROM $inventory.table_infos_in_queries
197 changes: 192 additions & 5 deletions src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import logging
from abc import abstractmethod, ABC
from collections.abc import Iterable
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Self, Any

from astroid import AstroidSyntaxError, NodeNG # type: ignore
from sqlglot import Expression, parse as parse_sql, ParseError as SqlParseError
Expand Down Expand Up @@ -174,6 +176,145 @@ def name(self) -> str: ...
def apply(self, code: str) -> str: ...


@dataclass
class LineageAtom:

object_type: str
object_id: str
other: dict[str, str] | None = None


@dataclass
class SourceInfo:

@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
source_lineage = data.get("source_lineage", None)
if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict):
lineage_atoms = [LineageAtom(**lineage) for lineage in source_lineage]
data["source_lineage"] = lineage_atoms
return cls(**data)

UNKNOWN = "unknown"

source_id: str = UNKNOWN
source_timestamp: datetime = datetime.fromtimestamp(0)
source_lineage: list[LineageAtom] = field(default_factory=list)
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)

def replace_source(
self,
source_id: str | None = None,
source_lineage: list[LineageAtom] | None = None,
source_timestamp: datetime | None = None,
):
return dataclasses.replace(
self,
source_id=source_id or self.source_id,
source_timestamp=source_timestamp or self.source_timestamp,
source_lineage=source_lineage or self.source_lineage,
)

def replace_assessment_infos(
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
):
return dataclasses.replace(
self,
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
assessment_end_timestamp=assessment_end or self.assessment_end_timestamp,
)


@dataclass
class TableInfo(SourceInfo):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class TableInfo(SourceInfo):
class UsedTable(SourceInfo):

we already have table info - https://databricks-sdk-py.readthedocs.io/en/latest/dbdataclasses/catalog.html#databricks.sdk.service.catalog.TableInfo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@classmethod
def parse(cls, value: str, default_schema: str) -> TableInfo:
parts = value.split(".")
if len(parts) >= 3:
catalog_name = parts.pop(0)
else:
catalog_name = "hive_metastore"
if len(parts) >= 2:
schema_name = parts.pop(0)
else:
schema_name = default_schema
return TableInfo(catalog_name=catalog_name, schema_name=schema_name, table_name=parts[0])

catalog_name: str = SourceInfo.UNKNOWN
schema_name: str = SourceInfo.UNKNOWN
table_name: str = SourceInfo.UNKNOWN
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add is_read and is_write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Populated for sql. For python calls, I suggest doing in a separate PR since it's a lot of work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate PR works



class TableCollector(ABC):

@abstractmethod
def collect_tables(self, source_code: str) -> Iterable[TableInfo]: ...


@dataclass
class TableInfoNode:
table: TableInfo
node: NodeNG


class TablePyCollector(TableCollector, ABC):

def collect_tables(self, source_code: str):
tree = Tree.normalize_and_parse(source_code)
for table_node in self.collect_tables_from_tree(tree):
yield table_node.table

@abstractmethod
def collect_tables_from_source(self, source_code: str, inherited_tree: Tree | None) -> Iterable[TableInfoNode]: ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is not used in this abstract class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@abstractmethod
def collect_tables_from_tree(self, tree: Tree) -> Iterable[TableInfoNode]: ...


class TableSqlCollector(TableCollector, ABC): ...


@dataclass
class DirectFsAccess(SourceInfo):
"""A record describing a Direct File System Access"""

path: str = SourceInfo.UNKNOWN
is_read: bool = False
is_write: bool = False


@dataclass
class DirectFsAccessNode:
dfsa: DirectFsAccess
node: NodeNG


class DfsaCollector(ABC):

@abstractmethod
def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]: ...


class DfsaPyCollector(DfsaCollector, ABC):

def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]:
tree = Tree.normalize_and_parse(source_code)
for dfsa_node in self.collect_dfsas_from_tree(tree):
yield dfsa_node.dfsa

@abstractmethod
def collect_dfsas_from_source(
self, source_code: str, inherited_tree: Tree | None
) -> Iterable[DirectFsAccessNode]: ...

@abstractmethod
def collect_dfsas_from_tree(self, tree: Tree) -> Iterable[DirectFsAccessNode]: ...


class DfsaSqlCollector(DfsaCollector, ABC): ...


# The default schema to use when the schema is not specified in a table reference
# See: https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-qry-select-usedb.html
DEFAULT_CATALOG = 'hive_metastore'
Expand Down Expand Up @@ -221,20 +362,42 @@ def parse_security_mode(mode_str: str | None) -> compute.DataSecurityMode | None
return None


class SqlSequentialLinter(SqlLinter):
class SqlSequentialLinter(SqlLinter, DfsaCollector, TableCollector):

def __init__(self, linters: list[SqlLinter]):
def __init__(
self,
linters: list[SqlLinter],
dfsa_collectors: list[DfsaSqlCollector],
table_collectors: list[TableSqlCollector],
):
self._linters = linters
self._dfsa_collectors = dfsa_collectors
self._table_collectors = table_collectors

def lint_expression(self, expression: Expression) -> Iterable[Advice]:
for linter in self._linters:
yield from linter.lint_expression(expression)

def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]:
for collector in self._dfsa_collectors:
yield from collector.collect_dfsas(source_code)

def collect_tables(self, source_code: str) -> Iterable[TableInfo]:
for collector in self._table_collectors:
yield from collector.collect_tables(source_code)


class PythonSequentialLinter(Linter):
class PythonSequentialLinter(Linter, DfsaCollector, TableCollector):

def __init__(self, linters: list[PythonLinter]):
def __init__(
self,
linters: list[PythonLinter],
dfsa_collectors: list[DfsaPyCollector],
table_collectors: list[TablePyCollector],
):
self._linters = linters
self._dfsa_collectors = dfsa_collectors
self._table_collectors = table_collectors
self._tree: Tree | None = None

def lint(self, code: str) -> Iterable[Advice]:
Expand Down Expand Up @@ -271,6 +434,30 @@ def process_child_cell(self, code: str):
# error already reported when linting enclosing notebook
logger.warning(f"Failed to parse Python cell: {code}", exc_info=e)

def collect_dfsas(self, source_code: str) -> Iterable[DirectFsAccess]:
try:
tree = self._parse_and_append(source_code)
for dfsa_node in self.collect_dfsas_from_tree(tree):
yield dfsa_node.dfsa
except AstroidSyntaxError as e:
logger.warning('syntax-error', exc_info=e)

def collect_dfsas_from_tree(self, tree: Tree) -> Iterable[DirectFsAccessNode]:
for collector in self._dfsa_collectors:
yield from collector.collect_dfsas_from_tree(tree)

def collect_tables(self, source_code: str) -> Iterable[TableInfo]:
try:
tree = self._parse_and_append(source_code)
for table_node in self.collect_tables_from_tree(tree):
yield table_node.table
except AstroidSyntaxError as e:
logger.warning('syntax-error', exc_info=e)

def collect_tables_from_tree(self, tree: Tree) -> Iterable[TableInfoNode]:
for collector in self._table_collectors:
yield from collector.collect_tables_from_tree(tree)

def _make_tree(self) -> Tree:
if self._tree is None:
self._tree = Tree.new_module()
Expand Down
66 changes: 1 addition & 65 deletions src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,18 @@
from __future__ import annotations

import dataclasses
import logging
import sys
from collections.abc import Sequence, Iterable
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import DatabricksError

from databricks.labs.ucx.framework.utils import escape_sql_identifier

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

from databricks.labs.ucx.source_code.base import DirectFsAccess

logger = logging.getLogger(__name__)


@dataclass
class LineageAtom:

object_type: str
object_id: str
other: dict[str, str] | None = None


@dataclass
class DirectFsAccess:
"""A record describing a Direct File System Access"""

@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
source_lineage = data.get("source_lineage", None)
if isinstance(source_lineage, list) and len(source_lineage) > 0 and isinstance(source_lineage[0], dict):
lineage_atoms = [LineageAtom(**lineage) for lineage in source_lineage]
data["source_lineage"] = lineage_atoms
return cls(**data)

UNKNOWN = "unknown"

path: str
is_read: bool
is_write: bool
source_id: str = UNKNOWN
source_timestamp: datetime = datetime.fromtimestamp(0)
source_lineage: list[LineageAtom] = field(default_factory=list)
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)

def replace_source(
self,
source_id: str | None = None,
source_lineage: list[LineageAtom] | None = None,
source_timestamp: datetime | None = None,
):
return dataclasses.replace(
self,
source_id=source_id or self.source_id,
source_timestamp=source_timestamp or self.source_timestamp,
source_lineage=source_lineage or self.source_lineage,
)

def replace_assessment_infos(
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
):
return dataclasses.replace(
self,
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
assessment_end_timestamp=assessment_end or self.assessment_end_timestamp,
)


class DirectFsAccessCrawler(CrawlerBase[DirectFsAccess]):

@classmethod
Expand Down
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/source_code/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from astroid import ( # type: ignore
NodeNG,
)
from databricks.labs.ucx.source_code.base import Advisory, CurrentSessionState, is_a_notebook
from databricks.labs.ucx.source_code.directfs_access import LineageAtom
from databricks.labs.ucx.source_code.base import Advisory, CurrentSessionState, is_a_notebook, LineageAtom
from databricks.labs.ucx.source_code.python.python_ast import Tree
from databricks.labs.ucx.source_code.path_lookup import PathLookup

Expand Down
Loading
Loading