Skip to content

Commit

Permalink
feat(ingest/dynamodb): add support for classification (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored and sleeperdeep committed Jun 25, 2024
1 parent b62f57a commit 682d516
Show file tree
Hide file tree
Showing 28 changed files with 330 additions and 198 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ def get_capability_text(src_capability: SourceCapability) -> str:
SourceCapability.DOMAINS: "../../../domains.md",
SourceCapability.PLATFORM_INSTANCE: "../../../platform-instances.md",
SourceCapability.DATA_PROFILING: "../../../../metadata-ingestion/docs/dev_guides/sql_profiles.md",
SourceCapability.CLASSIFICATION: "../../../../metadata-ingestion/docs/dev_guides/classification.md",
}

capability_doc = capability_docs_mapping.get(src_capability)
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
}

classification_lib = {
"acryl-datahub-classify==0.0.9",
"acryl-datahub-classify==0.0.10",
}

sql_common = (
Expand Down Expand Up @@ -445,7 +445,7 @@
"types-click==0.1.12",
# The boto3-stubs package seems to have regularly breaking minor releases,
# we pin to a specific version to avoid this.
"boto3-stubs[s3,glue,sagemaker,sts]==1.28.15",
"boto3-stubs[s3,glue,sagemaker,sts,dynamodb]==1.28.15",
"mypy-boto3-sagemaker==1.28.15", # For some reason, above pin only restricts `mypy-boto3-sagemaker<1.29.0,>=1.28.0`
"types-tabulate",
# avrogen package requires this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classifier import ClassificationConfig, Classifier
from datahub.ingestion.glossary.classifier_registry import classifier_registry
from datahub.ingestion.source.sql.data_reader import DataReader
from datahub.ingestion.source.common.data_reader import DataReader
from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
GlossaryTermAssociation,
GlossaryTerms,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -289,12 +291,22 @@ def classification_workunit_processor(
table_id: List[str],
data_reader_kwargs: dict = {},
) -> Iterable[MetadataWorkUnit]:
"""
Classification handling for a particular table.
Currently works only for workunits having MCP or MCPW
"""
table_name = ".".join(table_id)
if not classification_handler.is_classification_enabled_for_table(table_name):
yield from table_wu_generator
for wu in table_wu_generator:
maybe_schema_metadata = wu.get_aspect_of_type(SchemaMetadata)
if maybe_schema_metadata:
if (
isinstance(wu.metadata, MetadataChangeEvent)
and len(wu.metadata.proposedSnapshot.aspects) > 1
) or not maybe_schema_metadata:
yield wu
continue
else: # This is MCP or MCPW workunit with SchemaMetadata aspect
try:
classification_handler.classify_schema_fields(
table_name,
Expand All @@ -304,7 +316,7 @@ def classification_workunit_processor(
data_reader.get_sample_data_for_table,
table_id,
classification_handler.config.classification.sample_size
* 1.2,
* SAMPLE_SIZE_MULTIPLIER,
**data_reader_kwargs,
)
if data_reader
Expand All @@ -317,10 +329,8 @@ def classification_workunit_processor(
is_primary_source=wu.is_primary_source,
)
except Exception as e:
logger.debug(
logger.warning(
f"Failed to classify table columns for {table_name} due to error -> {e}",
exc_info=e,
)
yield wu
else:
yield wu
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
gen_schema_container,
get_domain_wu,
)
from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler,
Expand Down Expand Up @@ -764,7 +765,7 @@ def _process_schema(
data_reader_kwargs=dict(
sample_size_percent=(
self.config.classification.sample_size
* 1.2
* SAMPLE_SIZE_MULTIPLIER
/ table.rows_count
if table.rows_count
else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from google.cloud import bigquery

from datahub.ingestion.source.sql.data_reader import DataReader
from datahub.ingestion.source.common.data_reader import DataReader
from datahub.utilities.perf_timer import PerfTimer

logger = logging.Logger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from abc import abstractmethod
from typing import Dict, List, Optional

from datahub.ingestion.api.closeable import Closeable


class DataReader(Closeable):
def get_sample_data_for_column(
self, table_id: List[str], column_name: str, sample_size: int
) -> list:
raise NotImplementedError()

@abstractmethod
def get_sample_data_for_table(
self,
table_id: List[str],
sample_size: int,
*,
sample_size_percent: Optional[float] = None,
filter: Optional[str] = None,
) -> Dict[str, list]:
"""
Fetches table values , approx sample_size rows
Args:
table_id (List[str]): Table name identifier. One of
- [<db_name>, <schema_name>, <table_name>] or
- [<schema_name>, <table_name>] or
- [<table_name>]
sample_size (int): sample size
Keyword Args:
sample_size_percent(float, between 0 and 1): For bigquery-like data platforms that provide only
percentage based sampling methods. If present, actual sample_size
may be ignored.
filter (string): For bigquery-like data platforms that need mandatory filter on partition
column for some cases
Returns:
Dict[str, list]: dictionary of (column name -> list of column values)
"""

# Ideally we do not want null values in sample data for a column.
# However that would require separate query per column and
# that would be expensive, hence not done. To compensate for possibility
# of some null values in collected sample, its usually recommended to
# fetch extra (20% more) rows than configured sample_size.

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List

from datahub.ingestion.source.common.data_reader import DataReader

if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient

PAGE_SIZE = 100


class DynamoDBTableItemsReader(DataReader):
"""
DynamoDB is a NoSQL database and may have different attributes (columns) present
in different items (rows) of the table.
"""

@staticmethod
def create(client: "DynamoDBClient") -> "DynamoDBTableItemsReader":
return DynamoDBTableItemsReader(client)

def __init__(self, client: "DynamoDBClient") -> None:
# The lifecycle of this client is managed externally
self.client = client

def get_sample_data_for_table(
self, table_id: List[str], sample_size: int, **kwargs: Any
) -> Dict[str, list]:
"""
For dynamoDB, table_id should be in formation ( table-name ) or (region, table-name )
"""
column_values: Dict[str, list] = defaultdict(list)
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html
paginator = self.client.get_paginator("scan")
response_iterator = paginator.paginate(
TableName=table_id[-1],
PaginationConfig={
"MaxItems": sample_size,
"PageSize": PAGE_SIZE,
},
)
# iterate through pagination result to retrieve items
for page in response_iterator:
items: List[Dict] = page["Items"]
if len(items) > 0:
for item in items:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html#scan
# For an item (row), the key is the attribute name and the value is a dict with only one entry,
# whose key is the data type and value is the data
# for complex data types - L (list) or M (map) - we will recursively process the value into json-like form
for attribute_name, attribute_value in item.items():
column_values[attribute_name].append(
self._get_value(attribute_value)
)

# Note: Consider including items configured via `include_table_item` in sample data ?

return column_values

def _get_value(self, attribute_value: Dict[str, Any]) -> Any:
# Each attribute value is described as a name-value pair.
# The name is the data type, and the value is the data itself.
for data_type, value in attribute_value.items():
if data_type == "L" and isinstance(value, list):
return [self._get_value(e) for e in value]
elif data_type == "M" and isinstance(value, dict):
return {
nested_attr: self._get_value(nested_value)
for nested_attr, nested_value in value.items()
}
else:
return value

def close(self) -> None:
pass
Loading

0 comments on commit 682d516

Please sign in to comment.