Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dynamodb): add support for classification #10138

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ def get_capability_text(src_capability: SourceCapability) -> str:
SourceCapability.DOMAINS: "../../../domains.md",
SourceCapability.PLATFORM_INSTANCE: "../../../platform-instances.md",
SourceCapability.DATA_PROFILING: "../../../../metadata-ingestion/docs/dev_guides/sql_profiles.md",
SourceCapability.CLASSIFICATION: "../../../../metadata-ingestion/docs/dev_guides/classification.md",
}

capability_doc = capability_docs_mapping.get(src_capability)
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
}

classification_lib = {
"acryl-datahub-classify==0.0.9",
"acryl-datahub-classify==0.0.10",
}

sql_common = (
Expand Down Expand Up @@ -445,7 +445,7 @@
"types-click==0.1.12",
# The boto3-stubs package seems to have regularly breaking minor releases,
# we pin to a specific version to avoid this.
"boto3-stubs[s3,glue,sagemaker,sts]==1.28.15",
"boto3-stubs[s3,glue,sagemaker,sts,dynamodb]==1.28.15",
"mypy-boto3-sagemaker==1.28.15", # For some reason, above pin only restricts `mypy-boto3-sagemaker<1.29.0,>=1.28.0`
"types-tabulate",
# avrogen package requires this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classifier import ClassificationConfig, Classifier
from datahub.ingestion.glossary.classifier_registry import classifier_registry
from datahub.ingestion.source.sql.data_reader import DataReader
from datahub.ingestion.source.common.data_reader import DataReader
from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
GlossaryTermAssociation,
GlossaryTerms,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -289,12 +291,22 @@ def classification_workunit_processor(
table_id: List[str],
data_reader_kwargs: dict = {},
) -> Iterable[MetadataWorkUnit]:
"""
Classification handling for a particular table.
Currently works only for workunits having MCP or MCPW
"""
table_name = ".".join(table_id)
if not classification_handler.is_classification_enabled_for_table(table_name):
yield from table_wu_generator
for wu in table_wu_generator:
maybe_schema_metadata = wu.get_aspect_of_type(SchemaMetadata)
if maybe_schema_metadata:
if (
isinstance(wu.metadata, MetadataChangeEvent)
and len(wu.metadata.proposedSnapshot.aspects) > 1
) or not maybe_schema_metadata:
yield wu
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we remove the SchemaMetadata aspect from the snapshot, since we're emitting it after

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, this is leaving the MCEs as is and not processing it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In followup, I am planning to add support for classifying SchemaMetadata in MCE and refractoring sql_common to use this.

continue
else: # This is MCP or MCPW workunit with SchemaMetadata aspect
try:
classification_handler.classify_schema_fields(
table_name,
Expand All @@ -304,7 +316,7 @@ def classification_workunit_processor(
data_reader.get_sample_data_for_table,
table_id,
classification_handler.config.classification.sample_size
* 1.2,
* SAMPLE_SIZE_MULTIPLIER,
**data_reader_kwargs,
)
if data_reader
Expand All @@ -317,10 +329,8 @@ def classification_workunit_processor(
is_primary_source=wu.is_primary_source,
)
except Exception as e:
logger.debug(
logger.warning(
f"Failed to classify table columns for {table_name} due to error -> {e}",
exc_info=e,
)
yield wu
else:
yield wu
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
gen_schema_container,
get_domain_wu,
)
from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler,
Expand Down Expand Up @@ -764,7 +765,7 @@ def _process_schema(
data_reader_kwargs=dict(
sample_size_percent=(
self.config.classification.sample_size
* 1.2
* SAMPLE_SIZE_MULTIPLIER
/ table.rows_count
if table.rows_count
else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from google.cloud import bigquery

from datahub.ingestion.source.sql.data_reader import DataReader
from datahub.ingestion.source.common.data_reader import DataReader
from datahub.utilities.perf_timer import PerfTimer

logger = logging.Logger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from abc import abstractmethod
from typing import Dict, List, Optional

from datahub.ingestion.api.closeable import Closeable


class DataReader(Closeable):
def get_sample_data_for_column(
self, table_id: List[str], column_name: str, sample_size: int
) -> list:
raise NotImplementedError()

@abstractmethod
def get_sample_data_for_table(
self,
table_id: List[str],
sample_size: int,
*,
sample_size_percent: Optional[float] = None,
filter: Optional[str] = None,
) -> Dict[str, list]:
"""
Fetches table values , approx sample_size rows
Args:
table_id (List[str]): Table name identifier. One of
- [<db_name>, <schema_name>, <table_name>] or
- [<schema_name>, <table_name>] or
- [<table_name>]
sample_size (int): sample size
Keyword Args:
sample_size_percent(float, between 0 and 1): For bigquery-like data platforms that provide only
percentage based sampling methods. If present, actual sample_size
may be ignored.
filter (string): For bigquery-like data platforms that need mandatory filter on partition
column for some cases
Returns:
Dict[str, list]: dictionary of (column name -> list of column values)
"""

# Ideally we do not want null values in sample data for a column.
# However that would require separate query per column and
# that would be expensive, hence not done. To compensate for possibility
# of some null values in collected sample, its usually recommended to
# fetch extra (20% more) rows than configured sample_size.

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List

from datahub.ingestion.source.common.data_reader import DataReader

if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient

PAGE_SIZE = 100


class DynamoDBTableItemsReader(DataReader):
"""
DynamoDB is a NoSQL database and may have different attributes (columns) present
in different items (rows) of the table.
"""

@staticmethod
def create(client: "DynamoDBClient") -> "DynamoDBTableItemsReader":
return DynamoDBTableItemsReader(client)

def __init__(self, client: "DynamoDBClient") -> None:
# The lifecycle of this client is managed externally
self.client = client

def get_sample_data_for_table(
self, table_id: List[str], sample_size: int, **kwargs: Any
) -> Dict[str, list]:
"""
For dynamoDB, table_id should be in formation ( table-name ) or (region, table-name )
"""
column_values: Dict[str, list] = defaultdict(list)
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html
paginator = self.client.get_paginator("scan")
response_iterator = paginator.paginate(
TableName=table_id[-1],
PaginationConfig={
"MaxItems": sample_size,
"PageSize": PAGE_SIZE,
},
)
# iterate through pagination result to retrieve items
for page in response_iterator:
items: List[Dict] = page["Items"]
if len(items) > 0:
for item in items:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html#scan
# For an item (row), the key is the attribute name and the value is a dict with only one entry,
# whose key is the data type and value is the data
# for complex data types - L (list) or M (map) - we will recursively process the value into json-like form
for attribute_name, attribute_value in item.items():
column_values[attribute_name].append(
self._get_value(attribute_value)
)

# Note: Consider including items configured via `include_table_item` in sample data ?

return column_values

def _get_value(self, attribute_value: Dict[str, Any]) -> Any:
# Each attribute value is described as a name-value pair.
# The name is the data type, and the value is the data itself.
for data_type, value in attribute_value.items():
if data_type == "L" and isinstance(value, list):
return [self._get_value(e) for e in value]
elif data_type == "M" and isinstance(value, dict):
return {
nested_attr: self._get_value(nested_value)
for nested_attr, nested_value in value.items()
}
else:
return value

def close(self) -> None:
pass
Loading
Loading