Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): downgrade column type mapping warning to info #11115

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 2 additions & 72 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,10 @@
from collections import OrderedDict
from datetime import datetime
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from smart_open import open as smart_open

from datahub.emitter.mce_builder import (
Expand All @@ -48,7 +29,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport
Expand All @@ -72,22 +53,14 @@
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
MapTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
Expand All @@ -100,55 +73,12 @@
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)

# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000

# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None

for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break

# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass

return SchemaFieldDataType(type=TypeClass())


# config flags to emit telemetry for
config_options_to_report = [
"platform",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,11 @@ def get_column_type(
# if still not found, report the warning
if TypeClass is None:
if column_type:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
report.info(
title="Unable to map column types to DataHub types",
message="Got an unexpected column type. The column's parsed field type will not be populated.",
context=f"{dataset_name} - {column_type}",
log=False,
)
TypeClass = NullTypeClass

Expand Down
79 changes: 3 additions & 76 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,13 @@
from collections import OrderedDict
from datetime import datetime
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from pyspark.sql.utils import AnalysisException
from smart_open import open as smart_open

Expand All @@ -52,7 +33,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders
from datahub.ingestion.source.aws.s3_util import (
Expand All @@ -72,22 +53,13 @@
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
MapTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
Expand All @@ -101,55 +73,12 @@
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)

# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000

# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None

for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break

# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass

return SchemaFieldDataType(type=TypeClass())


# config flags to emit telemetry for
config_options_to_report = [
"platform",
Expand Down Expand Up @@ -490,9 +419,7 @@ def add_partition_columns_to_schema(
if not is_fieldpath_v2
else f"[version=2.0].[type=string].{partition_key}",
nativeDataType="string",
type=SchemaFieldDataType(StringTypeClass())
if not is_fieldpath_v2
else SchemaFieldDataTypeClass(type=StringTypeClass()),
type=SchemaFieldDataTypeClass(StringTypeClass()),
isPartitioningKey=True,
nullable=True,
recursive=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,11 @@ def get_column_type(
break

if TypeClass is None:
sql_report.report_warning(
dataset_name, f"unable to map type {column_type!r} to metadata schema"
sql_report.info(
title="Unable to map column types to DataHub types",
message="Got an unexpected column type. The column's parsed field type will not be populated.",
context=f"{dataset_name} - {column_type!r}",
log=False,
)
TypeClass = NullTypeClass

Expand Down
Loading