From d2eb42373fa1d36011c020500668da8cf863e165 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 18 Oct 2023 11:34:45 -0400 Subject: [PATCH 1/9] fix(ingest/sqlalchemy): Fix URL parsing when sqlalchemy_uri provided (#9032) --- .../ingestion/source/sql/sql_config.py | 47 +++++++++++-------- .../source/sql/two_tier_sql_source.py | 31 ++++++++---- .../tests/unit/test_athena_source.py | 12 ++--- .../tests/unit/test_clickhouse_source.py | 4 +- .../tests/unit/test_snowflake_source.py | 30 +++++++----- 5 files changed, 76 insertions(+), 48 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 08cc74aec39775..57aae32b361cf5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -1,10 +1,10 @@ import logging from abc import abstractmethod from typing import Any, Dict, Optional -from urllib.parse import quote_plus import pydantic from pydantic import Field +from sqlalchemy.engine import URL from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import ( @@ -125,7 +125,11 @@ class SQLAlchemyConnectionConfig(ConfigModel): # Duplicate of SQLCommonConfig.options options: dict = pydantic.Field( default_factory=dict, - description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.", + description=( + "Any options specified here will be passed to " + "[SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs." + " To set connection arguments in the URL, specify them under `connect_args`." + ), ) _database_alias_deprecation = pydantic_field_deprecated( @@ -161,21 +165,26 @@ def make_sqlalchemy_uri( db: Optional[str], uri_opts: Optional[Dict[str, Any]] = None, ) -> str: - url = f"{scheme}://" - if username is not None: - url += f"{quote_plus(username)}" - if password is not None: - url += f":{quote_plus(password)}" - url += "@" - if at is not None: - url += f"{at}" - if db is not None: - url += f"/{db}" - if uri_opts is not None: - if db is None: - url += "/" - params = "&".join( - f"{key}={quote_plus(value)}" for (key, value) in uri_opts.items() if value + host: Optional[str] = None + port: Optional[int] = None + if at: + try: + host, port_str = at.rsplit(":", 1) + port = int(port_str) + except ValueError: + host = at + port = None + if uri_opts: + uri_opts = {k: v for k, v in uri_opts.items() if v is not None} + + return str( + URL.create( + drivername=scheme, + username=username, + password=password, + host=host, + port=port, + database=db, + query=uri_opts or {}, ) - url = f"{url}?{params}" - return url + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py index d9062cef06eae0..7a49551dc12351 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py @@ -1,8 +1,10 @@ import typing +import urllib.parse from typing import Any, Dict, Iterable, Optional from pydantic.fields import Field from sqlalchemy import create_engine, inspect +from sqlalchemy.engine import URL from sqlalchemy.engine.reflection import Inspector from datahub.configuration.common import AllowDenyPattern @@ -41,14 +43,27 @@ def get_sql_alchemy_url( uri_opts: typing.Optional[typing.Dict[str, typing.Any]] = None, current_db: typing.Optional[str] = None, ) -> str: - return self.sqlalchemy_uri or make_sqlalchemy_uri( - self.scheme, - self.username, - self.password.get_secret_value() if self.password else None, - self.host_port, - current_db if current_db else self.database, - uri_opts=uri_opts, - ) + if self.sqlalchemy_uri: + parsed_url = urllib.parse.urlsplit(self.sqlalchemy_uri) + url = URL.create( + drivername=parsed_url.scheme, + username=parsed_url.username, + password=parsed_url.password, + host=parsed_url.hostname, + port=parsed_url.port, + database=current_db or parsed_url.path.lstrip("/"), + query=urllib.parse.parse_qs(parsed_url.query), + ).update_query_dict(uri_opts or {}) + return str(url) + else: + return make_sqlalchemy_uri( + self.scheme, + self.username, + self.password.get_secret_value() if self.password else None, + self.host_port, + current_db or self.database, + uri_opts=uri_opts, + ) class TwoTierSQLAlchemySource(SQLAlchemySource): diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index 2558f6a46715e6..7a947e8f86bfee 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -10,7 +10,6 @@ FROZEN_TIME = "2020-04-14 07:00:00" -@pytest.mark.integration def test_athena_config_query_location_old_plus_new_value_not_allowed(): from datahub.ingestion.source.sql.athena import AthenaConfig @@ -25,7 +24,6 @@ def test_athena_config_query_location_old_plus_new_value_not_allowed(): ) -@pytest.mark.integration def test_athena_config_staging_dir_is_set_as_query_result(): from datahub.ingestion.source.sql.athena import AthenaConfig @@ -48,7 +46,6 @@ def test_athena_config_staging_dir_is_set_as_query_result(): assert config.json() == expected_config.json() -@pytest.mark.integration def test_athena_uri(): from datahub.ingestion.source.sql.athena import AthenaConfig @@ -59,9 +56,12 @@ def test_athena_uri(): "work_group": "test-workgroup", } ) - assert ( - config.get_sql_alchemy_url() - == "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fquery-result-location%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600" + assert config.get_sql_alchemy_url() == ( + "awsathena+rest://@athena.us-west-1.amazonaws.com:443" + "?catalog_name=awsdatacatalog" + "&duration_seconds=3600" + "&s3_staging_dir=s3%3A%2F%2Fquery-result-location%2F" + "&work_group=test-workgroup" ) diff --git a/metadata-ingestion/tests/unit/test_clickhouse_source.py b/metadata-ingestion/tests/unit/test_clickhouse_source.py index de7e7d66f21290..1b2ffb70c8d190 100644 --- a/metadata-ingestion/tests/unit/test_clickhouse_source.py +++ b/metadata-ingestion/tests/unit/test_clickhouse_source.py @@ -26,9 +26,7 @@ def test_clickhouse_uri_native(): "scheme": "clickhouse+native", } ) - assert ( - config.get_sql_alchemy_url() == "clickhouse+native://user:password@host:1111/" - ) + assert config.get_sql_alchemy_url() == "clickhouse+native://user:password@host:1111" def test_clickhouse_uri_native_secure(): diff --git a/metadata-ingestion/tests/unit/test_snowflake_source.py b/metadata-ingestion/tests/unit/test_snowflake_source.py index 1c26ca2487e5ca..888a7c04415542 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/test_snowflake_source.py @@ -179,10 +179,12 @@ def test_snowflake_uri_default_authentication(): } ) - assert ( - config.get_sql_alchemy_url() - == "snowflake://user:password@acctname/?authenticator=SNOWFLAKE&warehouse=COMPUTE_WH&role" - "=sysadmin&application=acryl_datahub" + assert config.get_sql_alchemy_url() == ( + "snowflake://user:password@acctname" + "?application=acryl_datahub" + "&authenticator=SNOWFLAKE" + "&role=sysadmin" + "&warehouse=COMPUTE_WH" ) @@ -198,10 +200,12 @@ def test_snowflake_uri_external_browser_authentication(): } ) - assert ( - config.get_sql_alchemy_url() - == "snowflake://user@acctname/?authenticator=EXTERNALBROWSER&warehouse=COMPUTE_WH&role" - "=sysadmin&application=acryl_datahub" + assert config.get_sql_alchemy_url() == ( + "snowflake://user@acctname" + "?application=acryl_datahub" + "&authenticator=EXTERNALBROWSER" + "&role=sysadmin" + "&warehouse=COMPUTE_WH" ) @@ -219,10 +223,12 @@ def test_snowflake_uri_key_pair_authentication(): } ) - assert ( - config.get_sql_alchemy_url() - == "snowflake://user@acctname/?authenticator=SNOWFLAKE_JWT&warehouse=COMPUTE_WH&role" - "=sysadmin&application=acryl_datahub" + assert config.get_sql_alchemy_url() == ( + "snowflake://user@acctname" + "?application=acryl_datahub" + "&authenticator=SNOWFLAKE_JWT" + "&role=sysadmin" + "&warehouse=COMPUTE_WH" ) From 1eaf9c8c5ff2676a5c4ac456d7b4a6d351697e73 Mon Sep 17 00:00:00 2001 From: Tim <50115603+bossenti@users.noreply.github.com> Date: Wed, 18 Oct 2023 18:39:59 +0200 Subject: [PATCH 2/9] feature(ingest/athena): introduce support for complex and nested schemas in Athena (#8137) Co-authored-by: dnks23 Co-authored-by: Tamas Nemeth Co-authored-by: Tim Co-authored-by: Harshal Sheth --- metadata-ingestion/setup.py | 5 +- .../datahub/ingestion/source/sql/athena.py | 200 +++++++++++++++++- .../ingestion/source/sql/sql_common.py | 4 + .../datahub/ingestion/source/sql/sql_types.py | 12 +- .../utilities/sqlalchemy_type_converter.py | 200 ++++++++++++++++++ .../tests/unit/test_athena_source.py | 86 +++++++- .../test_sqlalchemy_type_converter.py | 93 ++++++++ 7 files changed, 589 insertions(+), 11 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py create mode 100644 metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 0b0a2b13fb52af..c46409ecbf52fa 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -280,8 +280,9 @@ # Misc plugins. "sql-parser": sqlglot_lib, # Source plugins - # PyAthena is pinned with exact version because we use private method in PyAthena - "athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"}, + # sqlalchemy-bigquery is included here since it provides an implementation of + # a SQLalchemy-conform STRUCT type definition + "athena": sql_common | {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"}, "azure-ad": set(), "bigquery": sql_common | bigquery_common diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 9cb613bde1e9f8..dad61e51731667 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -1,12 +1,17 @@ import json import logging +import re import typing -from typing import Any, Dict, Iterable, List, Optional, Tuple, cast +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast import pydantic from pyathena.common import BaseCursor from pyathena.model import AthenaTableMetadata +from pyathena.sqlalchemy_athena import AthenaRestDialect +from sqlalchemy import create_engine, inspect, types from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.types import TypeEngine +from sqlalchemy_bigquery import STRUCT from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter.mcp_builder import ContainerKey, DatabaseKey @@ -21,13 +26,164 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes -from datahub.ingestion.source.sql.sql_common import SQLAlchemySource +from datahub.ingestion.source.sql.sql_common import ( + SQLAlchemySource, + register_custom_type, +) from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri +from datahub.ingestion.source.sql.sql_types import MapType from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, gen_database_key, ) +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import RecordTypeClass +from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column +from datahub.utilities.sqlalchemy_type_converter import ( + get_schema_fields_for_sqlalchemy_column, +) + +logger = logging.getLogger(__name__) + +register_custom_type(STRUCT, RecordTypeClass) + + +class CustomAthenaRestDialect(AthenaRestDialect): + """Custom definition of the Athena dialect. + + Custom implementation that allows to extend/modify the behavior of the SQLalchemy + dialect that is used by PyAthena (which is the library that is used by DataHub + to extract metadata from Athena). + This dialect can then be used by the inspector (see get_inspectors()). + + """ + + # regex to identify complex types in DDL strings which are embedded in `<>`. + _complex_type_pattern = re.compile(r"(<.+>)") + + @typing.no_type_check + def _get_column_type( + self, type_: Union[str, Dict[str, Any]] + ) -> TypeEngine: # noqa: C901 + """Derives the data type of the Athena column. + + This method is overwritten to extend the behavior of PyAthena. + Pyathena is not capable of detecting complex data types, e.g., + arrays, maps, or, structs (as of version 2.25.2). + The custom implementation extends the functionality by the above-mentioned data types. + """ + + # Originally, this method only handles `type_` as a string + # With the workaround used below to parse DDL strings for structs, + # `type` might also be a dictionary + if isinstance(type_, str): + match = self._pattern_column_type.match(type_) + if match: + type_name = match.group(1).lower() + type_meta_information = match.group(2) + else: + type_name = type_.lower() + type_meta_information = None + elif isinstance(type_, dict): + # this occurs only when a type parsed as part of a STRUCT is passed + # in such case type_ is a dictionary whose type can be retrieved from the attribute + type_name = type_.get("type", None) + type_meta_information = None + else: + raise RuntimeError(f"Unsupported type definition: {type_}") + + args = [] + + if type_name in ["array"]: + detected_col_type = types.ARRAY + + # here we need to account again for two options how `type_` is passed to this method + # first, the simple array definition as a DDL string (something like array) + # this is always the case when the array is not part of a complex data type (mainly STRUCT) + # second, the array definition can also be passed in form of dictionary + # this is the case when the array is part of a complex data type + if isinstance(type_, str): + # retrieve the raw name of the data type as a string + array_type_raw = self._complex_type_pattern.findall(type_)[0][ + 1:-1 + ] # array type without enclosing <> + # convert the string name of the data type into a SQLalchemy type (expected return) + array_type = self._get_column_type(array_type_raw) + elif isinstance(type_, dict): + # retrieve the data type of the array items and + # transform it into a SQLalchemy type + array_type = self._get_column_type(type_["items"]) + else: + raise RuntimeError(f"Unsupported array definition: {type_}") + + args = [array_type] + + elif type_name in ["struct", "record"]: + # STRUCT is not part of the SQLalchemy types selection + # but is provided by another official SQLalchemy library and + # compatible with the other SQLalchemy types + detected_col_type = STRUCT + + if isinstance(type_, dict): + # in case a struct as part of another struct is passed + # it is provided in form of a dictionary and + # can simply be used for the further processing + struct_type = type_ + else: + # this is the case when the type definition of the struct is passed as a DDL string + # therefore, it is required to parse the DDL string + # here a method provided in another Datahub source is used so that the parsing + # doesn't need to be implemented twice + # `get_avro_schema_for_hive_column` accepts a DDL description as column type and + # returns the parsed data types in form of a dictionary + schema = get_avro_schema_for_hive_column( + hive_column_name=type_name, hive_column_type=type_ + ) + + # the actual type description needs to be extracted + struct_type = schema["fields"][0]["type"] + + # A STRUCT consist of multiple attributes which are expected to be passed as + # a list of tuples consisting of name data type pairs. e.g., `('age', Integer())` + # See the reference: + # https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/_struct.py#L53 + # + # To extract all of them, we simply iterate over all detected fields and + # convert them to SQLalchemy types + struct_args = [] + for field in struct_type["fields"]: + struct_args.append( + ( + field["name"], + self._get_column_type(field["type"]["type"]) + if field["type"]["type"] not in ["record", "array"] + else self._get_column_type(field["type"]), + ) + ) + + args = struct_args + + elif type_name in ["map"]: + # Instead of SQLalchemy's TupleType the custom MapType is used here + # which is just a simple wrapper around TupleType + detected_col_type = MapType + + # the type definition for maps looks like the following: key_type:val_type (e.g., string:string) + key_type_raw, value_type_raw = type_meta_information.split(",") + + # convert both type names to actual SQLalchemy types + args = [ + self._get_column_type(key_type_raw), + self._get_column_type(value_type_raw), + ] + # by using get_avro_schema_for_hive_column() for parsing STRUCTs the data type `long` + # can also be returned, so we need to extend the handling here as well + elif type_name in ["bigint", "long"]: + detected_col_type = types.BIGINT + else: + return super()._get_column_type(type_name) + return detected_col_type(*args) class AthenaConfig(SQLCommonConfig): @@ -129,6 +285,18 @@ def create(cls, config_dict, ctx): config = AthenaConfig.parse_obj(config_dict) return cls(config, ctx) + # overwrite this method to allow to specify the usage of a custom dialect + def get_inspectors(self) -> Iterable[Inspector]: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + engine = create_engine(url, **self.config.options) + + # set custom dialect to be used by the inspector + engine.dialect = CustomAthenaRestDialect() + with engine.connect() as conn: + inspector = inspect(conn) + yield inspector + def get_table_properties( self, inspector: Inspector, schema: str, table: str ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: @@ -136,9 +304,7 @@ def get_table_properties( self.cursor = cast(BaseCursor, inspector.engine.raw_connection().cursor()) assert self.cursor - # Unfortunately properties can be only get through private methods as those are not exposed - # https://github.com/laughingman7743/PyAthena/blob/9e42752b0cc7145a87c3a743bb2634fe125adfa7/pyathena/model.py#L201 - metadata: AthenaTableMetadata = self.cursor._get_table_metadata( + metadata: AthenaTableMetadata = self.cursor.get_table_metadata( table_name=table, schema_name=schema ) description = metadata.comment @@ -241,6 +407,30 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: return [schema for schema in schemas if schema == athena_config.database] return schemas + # Overwrite to modify the creation of schema fields + def get_schema_fields_for_column( + self, + dataset_name: str, + column: Dict, + pk_constraints: Optional[dict] = None, + tags: Optional[List[str]] = None, + ) -> List[SchemaField]: + fields = get_schema_fields_for_sqlalchemy_column( + column_name=column["name"], + column_type=column["type"], + description=column.get("comment", None), + nullable=column.get("nullable", True), + is_part_of_key=True + if ( + pk_constraints is not None + and isinstance(pk_constraints, dict) + and column["name"] in pk_constraints.get("constrained_columns", []) + ) + else False, + ) + + return fields + def close(self): if self.cursor: self.cursor.close() diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 056be6c2e50ac9..6524eea8222d41 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -37,6 +37,7 @@ DatasetSubTypes, ) from datahub.ingestion.source.sql.sql_config import SQLCommonConfig +from datahub.ingestion.source.sql.sql_types import MapType from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, downgrade_schema_from_v2, @@ -80,6 +81,7 @@ DatasetLineageTypeClass, DatasetPropertiesClass, GlobalTagsClass, + MapTypeClass, SubTypesClass, TagAssociationClass, UpstreamClass, @@ -154,6 +156,8 @@ class SqlWorkUnit(MetadataWorkUnit): types.DATETIME: TimeTypeClass, types.TIMESTAMP: TimeTypeClass, types.JSON: RecordTypeClass, + # additional type definitions that are used by the Athena source + MapType: MapTypeClass, # type: ignore # Because the postgresql dialect is used internally by many other dialects, # we add some postgres types here. This is ok to do because the postgresql # dialect is built-in to sqlalchemy. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index 3b4a7e1dc02879..51626891e9fefb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -1,13 +1,15 @@ import re from typing import Any, Dict, ValuesView +from sqlalchemy import types + from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayType, BooleanType, BytesType, DateType, EnumType, - MapType, + MapType as MapTypeAvro, NullType, NumberType, RecordType, @@ -363,10 +365,16 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "time": TimeType, "timestamp": TimeType, "row": RecordType, - "map": MapType, + "map": MapTypeAvro, "array": ArrayType, } + +class MapType(types.TupleType): + # Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub + pass + + # https://docs.aws.amazon.com/athena/latest/ug/data-types.html # https://github.com/dbt-athena/dbt-athena/tree/main ATHENA_SQL_TYPES_MAP: Dict[str, Any] = { diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py new file mode 100644 index 00000000000000..a431f262a85fd8 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -0,0 +1,200 @@ +import json +import logging +import uuid +from typing import Any, Dict, List, Optional, Type, Union + +from sqlalchemy import types +from sqlalchemy_bigquery import STRUCT + +from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields +from datahub.ingestion.source.sql.sql_types import MapType +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeClass + +logger = logging.getLogger(__name__) + + +class SqlAlchemyColumnToAvroConverter: + """Helper class that collects some methods to convert SQLalchemy columns to Avro schema.""" + + # tuple of complex data types that require a special handling + _COMPLEX_TYPES = (STRUCT, types.ARRAY, MapType) + + # mapping of primitive SQLalchemy data types to AVRO schema data types + PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE: Dict[Type[types.TypeEngine], str] = { + types.String: "string", + types.BINARY: "string", + types.BOOLEAN: "boolean", + types.FLOAT: "float", + types.INTEGER: "int", + types.BIGINT: "long", + types.VARCHAR: "string", + types.CHAR: "string", + } + + @classmethod + def get_avro_type( + cls, column_type: Union[types.TypeEngine, STRUCT, MapType], nullable: bool + ) -> Dict[str, Any]: + """Determines the concrete AVRO schema type for a SQLalchemy-typed column""" + + if type(column_type) in cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(): + return { + "type": cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE[type(column_type)], + "native_data_type": str(column_type), + "_nullable": nullable, + } + if isinstance(column_type, types.DECIMAL): + return { + "type": "bytes", + "logicalType": "decimal", + "precision": int(column_type.precision), + "scale": int(column_type.scale), + "native_data_type": str(column_type), + "_nullable": nullable, + } + if isinstance(column_type, types.DATE): + return { + "type": "int", + "logicalType": "date", + "native_data_type": str(column_type), + "_nullable": nullable, + } + if isinstance(column_type, types.TIMESTAMP): + return { + "type": "long", + "logicalType": "timestamp-millis", + "native_data_type": str(column_type), + "_nullable": nullable, + } + if isinstance(column_type, types.ARRAY): + array_type = column_type.item_type + return { + "type": "array", + "items": cls.get_avro_type(column_type=array_type, nullable=nullable), + "native_data_type": f"array<{str(column_type.item_type)}>", + } + if isinstance(column_type, MapType): + key_type = column_type.types[0] + value_type = column_type.types[1] + return { + "type": "map", + "values": cls.get_avro_type(column_type=value_type, nullable=nullable), + "native_data_type": str(column_type), + "key_type": cls.get_avro_type(column_type=key_type, nullable=nullable), + "key_native_data_type": str(key_type), + } + if isinstance(column_type, STRUCT): + fields = [] + for field_def in column_type._STRUCT_fields: + field_name, field_type = field_def + fields.append( + { + "name": field_name, + "type": cls.get_avro_type( + column_type=field_type, nullable=nullable + ), + } + ) + struct_name = f"__struct_{str(uuid.uuid4()).replace('-', '')}" + + return { + "type": "record", + "name": struct_name, + "fields": fields, + "native_data_type": str(column_type), + "_nullable": nullable, + } + + return { + "type": "null", + "native_data_type": str(column_type), + "_nullable": nullable, + } + + @classmethod + def get_avro_for_sqlalchemy_column( + cls, + column_name: str, + column_type: types.TypeEngine, + nullable: bool, + ) -> Union[object, Dict[str, object]]: + """Returns the AVRO schema representation of a SQLalchemy column.""" + if isinstance(column_type, cls._COMPLEX_TYPES): + return { + "type": "record", + "name": "__struct_", + "fields": [ + { + "name": column_name, + "type": cls.get_avro_type( + column_type=column_type, nullable=nullable + ), + } + ], + } + return cls.get_avro_type(column_type=column_type, nullable=nullable) + + +def get_schema_fields_for_sqlalchemy_column( + column_name: str, + column_type: types.TypeEngine, + description: Optional[str] = None, + nullable: Optional[bool] = True, + is_part_of_key: Optional[bool] = False, +) -> List[SchemaField]: + """Creates SchemaFields from a given SQLalchemy column. + + This function is analogous to `get_schema_fields_for_hive_column` from datahub.utilities.hive_schema_to_avro. + The main purpose of implementing it this way, is to make it ready/compatible for second field path generation, + which allows to explore nested structures within the UI. + """ + + if nullable is None: + nullable = True + + try: + # as a first step, the column is converted to AVRO JSON which can then be used by an existing function + avro_schema_json = ( + SqlAlchemyColumnToAvroConverter.get_avro_for_sqlalchemy_column( + column_name=column_name, + column_type=column_type, + nullable=nullable, + ) + ) + # retrieve schema field definitions from the above generated AVRO JSON structure + schema_fields = avro_schema_to_mce_fields( + avro_schema=json.dumps(avro_schema_json), + default_nullable=nullable, + swallow_exceptions=False, + ) + except Exception as e: + logger.warning( + f"Unable to parse column {column_name} and type {column_type} the error was: {e}" + ) + + # fallback description in case any exception occurred + schema_fields = [ + SchemaField( + fieldPath=column_name, + type=SchemaFieldDataTypeClass(type=NullTypeClass()), + nativeDataType=str(column_type), + ) + ] + + # for all non-nested data types an additional modification of the `fieldPath` property is required + if type(column_type) in ( + *SqlAlchemyColumnToAvroConverter.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(), + types.TIMESTAMP, + types.DATE, + types.DECIMAL, + ): + schema_fields[0].fieldPath += f".{column_name}" + + if description: + schema_fields[0].description = description + schema_fields[0].isPartOfKey = ( + is_part_of_key if is_part_of_key is not None else False + ) + + return schema_fields diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index 7a947e8f86bfee..6d3ed20eafde2c 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -3,9 +3,13 @@ import pytest from freezegun import freeze_time +from sqlalchemy import types +from sqlalchemy_bigquery import STRUCT from datahub.ingestion.api.common import PipelineContext -from src.datahub.ingestion.source.aws.s3_util import make_s3_urn +from datahub.ingestion.source.aws.s3_util import make_s3_urn +from datahub.ingestion.source.sql.athena import CustomAthenaRestDialect +from datahub.ingestion.source.sql.sql_types import MapType FROZEN_TIME = "2020-04-14 07:00:00" @@ -104,7 +108,7 @@ def test_athena_get_table_properties(): mock_cursor = mock.MagicMock() mock_inspector = mock.MagicMock() mock_inspector.engine.raw_connection().cursor.return_value = mock_cursor - mock_cursor._get_table_metadata.return_value = AthenaTableMetadata( + mock_cursor.get_table_metadata.return_value = AthenaTableMetadata( response=table_metadata ) @@ -126,3 +130,81 @@ def test_athena_get_table_properties(): } assert location == make_s3_urn("s3://testLocation", "PROD") + + +def test_get_column_type_simple_types(): + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="int"), types.Integer + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="string"), types.String + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="boolean"), types.BOOLEAN + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="long"), types.BIGINT + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="double"), types.FLOAT + ) + + +def test_get_column_type_array(): + result = CustomAthenaRestDialect()._get_column_type(type_="array") + + assert isinstance(result, types.ARRAY) + assert isinstance(result.item_type, types.String) + + +def test_get_column_type_map(): + result = CustomAthenaRestDialect()._get_column_type(type_="map") + + assert isinstance(result, MapType) + assert isinstance(result.types[0], types.String) + assert isinstance(result.types[1], types.Integer) + + +def test_column_type_struct(): + + result = CustomAthenaRestDialect()._get_column_type(type_="struct") + + assert isinstance(result, STRUCT) + assert isinstance(result._STRUCT_fields[0], tuple) + assert result._STRUCT_fields[0][0] == "test" + assert isinstance(result._STRUCT_fields[0][1], types.String) + + +def test_column_type_complex_combination(): + + result = CustomAthenaRestDialect()._get_column_type( + type_="struct>>" + ) + + assert isinstance(result, STRUCT) + + assert isinstance(result._STRUCT_fields[0], tuple) + assert result._STRUCT_fields[0][0] == "id" + assert isinstance(result._STRUCT_fields[0][1], types.String) + + assert isinstance(result._STRUCT_fields[1], tuple) + assert result._STRUCT_fields[1][0] == "name" + assert isinstance(result._STRUCT_fields[1][1], types.String) + + assert isinstance(result._STRUCT_fields[2], tuple) + assert result._STRUCT_fields[2][0] == "choices" + assert isinstance(result._STRUCT_fields[2][1], types.ARRAY) + + assert isinstance(result._STRUCT_fields[2][1].item_type, STRUCT) + + assert isinstance(result._STRUCT_fields[2][1].item_type._STRUCT_fields[0], tuple) + assert result._STRUCT_fields[2][1].item_type._STRUCT_fields[0][0] == "id" + assert isinstance( + result._STRUCT_fields[2][1].item_type._STRUCT_fields[0][1], types.String + ) + + assert isinstance(result._STRUCT_fields[2][1].item_type._STRUCT_fields[1], tuple) + assert result._STRUCT_fields[2][1].item_type._STRUCT_fields[1][0] == "label" + assert isinstance( + result._STRUCT_fields[2][1].item_type._STRUCT_fields[1][1], types.String + ) diff --git a/metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py b/metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py new file mode 100644 index 00000000000000..959da0987a8251 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py @@ -0,0 +1,93 @@ +from typing import no_type_check + +from sqlalchemy import types +from sqlalchemy_bigquery import STRUCT + +from datahub.ingestion.source.sql.sql_types import MapType +from datahub.metadata.schema_classes import ( + ArrayTypeClass, + MapTypeClass, + NullTypeClass, + NumberTypeClass, + RecordTypeClass, +) +from datahub.utilities.sqlalchemy_type_converter import ( + get_schema_fields_for_sqlalchemy_column, +) + + +def test_get_avro_schema_for_sqlalchemy_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=types.INTEGER() + ) + assert len(schema_fields) == 1 + assert schema_fields[0].fieldPath == "[version=2.0].[type=int].test" + assert schema_fields[0].type.type == NumberTypeClass() + assert schema_fields[0].nativeDataType == "INTEGER" + assert schema_fields[0].nullable is True + + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=types.String(), nullable=False + ) + assert len(schema_fields) == 1 + assert schema_fields[0].fieldPath == "[version=2.0].[type=string].test" + assert schema_fields[0].type.type == NumberTypeClass() + assert schema_fields[0].nativeDataType == "VARCHAR" + assert schema_fields[0].nullable is False + + +def test_get_avro_schema_for_sqlalchemy_array_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=types.ARRAY(types.FLOAT()) + ) + assert len(schema_fields) == 1 + assert ( + schema_fields[0].fieldPath + == "[version=2.0].[type=struct].[type=array].[type=float].test" + ) + assert schema_fields[0].type.type == ArrayTypeClass(nestedType=["float"]) + assert schema_fields[0].nativeDataType == "array" + + +def test_get_avro_schema_for_sqlalchemy_map_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=MapType(types.String(), types.BOOLEAN()) + ) + assert len(schema_fields) == 1 + assert ( + schema_fields[0].fieldPath + == "[version=2.0].[type=struct].[type=map].[type=boolean].test" + ) + assert schema_fields[0].type.type == MapTypeClass( + keyType="string", valueType="boolean" + ) + assert schema_fields[0].nativeDataType == "MapType(String(), BOOLEAN())" + + +def test_get_avro_schema_for_sqlalchemy_struct_column() -> None: + + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=STRUCT(("test", types.INTEGER())) + ) + assert len(schema_fields) == 2 + assert ( + schema_fields[0].fieldPath == "[version=2.0].[type=struct].[type=struct].test" + ) + assert schema_fields[0].type.type == RecordTypeClass() + assert schema_fields[0].nativeDataType == "STRUCT" + + assert ( + schema_fields[1].fieldPath + == "[version=2.0].[type=struct].[type=struct].test.[type=int].test" + ) + assert schema_fields[1].type.type == NumberTypeClass() + assert schema_fields[1].nativeDataType == "INTEGER" + + +@no_type_check +def test_get_avro_schema_for_sqlalchemy_unknown_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column("invalid", "test") + assert len(schema_fields) == 1 + assert schema_fields[0].type.type == NullTypeClass() + assert schema_fields[0].fieldPath == "[version=2.0].[type=null]" + assert schema_fields[0].nativeDataType == "test" From bd5c4e0d70681b4640d25e62326e05be1c9deb65 Mon Sep 17 00:00:00 2001 From: Saketh-Mahesh <81051119+Saketh-Mahesh@users.noreply.github.com> Date: Wed, 18 Oct 2023 11:48:39 -0500 Subject: [PATCH 3/9] docs: adding documentation for deployment of DataHub on Azure (#8612) Co-authored-by: Saketh Mahesh --- docs-website/sidebars.js | 1 + docs/deploy/azure.md | 234 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 235 insertions(+) create mode 100644 docs/deploy/azure.md diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 4fa73c995157a3..b2b3df4dfb33c4 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -158,6 +158,7 @@ module.exports = { // The purpose of this section is to provide the minimum steps required to deploy DataHub to the vendor of your choosing "docs/deploy/aws", "docs/deploy/gcp", + "docs/deploy/azure", "docker/README", "docs/deploy/kubernetes", "docs/deploy/environment-vars", diff --git a/docs/deploy/azure.md b/docs/deploy/azure.md new file mode 100644 index 00000000000000..b940b82827e947 --- /dev/null +++ b/docs/deploy/azure.md @@ -0,0 +1,234 @@ +--- +title: "Deploying to Azure" +--- + +# Azure setup guide + +The following is a set of instructions to quickstart DataHub on Azure Kubernetes Service (AKS). Note, the guide +assumes that you do not have a Kubernetes cluster set up. + +## Prerequisites + +This guide requires the following tools: + +- [kubectl](https://kubernetes.io/docs/tasks/tools/) to manage Kubernetes resources +- [helm](https://helm.sh/docs/intro/install/) to deploy the resources based on helm charts. Note, we only support Helm + 3. +- [AZ CLI](https://learn.microsoft.com/en-us/cli/azure/install-azure-cli) to manage Azure resources + +To use the above tools, you need to set up Azure credentials by following +this [guide](https://learn.microsoft.com/en-us/cli/azure/authenticate-azure-cli). + +## Start up a Kubernetes cluster on AKS + +You can follow this [guide](https://learn.microsoft.com/en-us/azure/aks/learn/quick-kubernetes-deploy-cli) to create a new +cluster using az cli. + +Note: you can skip the application deployment step since we are deploying DataHub instead. If you are deploying DataHub to an existing cluster, please +skip the corresponding sections. + +- Verify you have the Microsoft.OperationsManagement and Microsoft.OperationalInsights providers registered on your subscription. These Azure resource providers are required to support Container insights. Check the registration status using the following commands: + +``` +az provider show -n Microsoft.OperationsManagement -o table +az provider show -n Microsoft.OperationalInsights -o table +``` + +If they're not registered, register them using the following commands: + +``` +az provider register --namespace Microsoft.OperationsManagement +az provider register --namespace Microsoft.OperationalInsights +``` + +- Create a resource group. Change name, location to your choosing. + +``` +az group create --name myResourceGroup --location eastus +``` + +The following output indicates that the command execution was successful: + +``` +{ + "id": "/subscriptions//resourceGroups/myResourceGroup", + "location": "eastus", + "managedBy": null, + "name": "myResourceGroup", + "properties": { + "provisioningState": "Succeeded" + }, + "tags": null +} +``` +- Create an AKS Cluster. For this project, it is best to increase node count to at least 3. Change cluster name, node count, and addons to your choosing. + +``` +az aks create -g myResourceGroup -n myAKSCluster --enable-managed-identity --node-count 3 --enable-addons monitoring --generate-ssh-keys +``` + +After a few minutes, the command completes and returns JSON-formatted information about the cluster. + +- Connect to the cluster + +Configure kubectl to connect to your Kubernetes cluster using the az aks get-credentials command. + +``` +az aks get-credentials --resource-group myResourceGroup --name myAKSCluster +``` + +Verify the connection to your cluster using the `kubectl get` command. This command returns a list of the cluster nodes. + +``` +kubectl get nodes +``` + +You should get results like below. Make sure node status is Ready. + +``` +NAME STATUS ROLES AGE VERSION +aks-nodepool1-37660971-vmss000000 Ready agent 24h v1.25.6 +aks-nodepool1-37660971-vmss000001 Ready agent 24h v1.25.6 +aks-nodepool1-37660971-vmss000002 Ready agent 24h v1.25.6 +``` + +## Setup DataHub using Helm + +Once the Kubernetes cluster has been set up, you can deploy DataHub and its prerequisites using helm. Please follow the +steps in this [guide](kubernetes.md). + + +Notes: +Since we are using PostgreSQL as the storage layer, change postgresql enabled to true and mysql to false in the values.yaml file of prerequisites. +Additionally, create a postgresql secret. Make sure to include 3 passwords for the postgresql secret: postgres-password, replication-password, and password. + +## Expose endpoints using a load balancer + +Now that all the pods are up and running, you need to expose the datahub-frontend end point by setting +up [ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/). To do this, you need to first set up an +ingress controller. + + +There are many [ingress controllers](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/) to choose +from, but here, we will follow this [guide](https://learn.microsoft.com/en-us/azure/application-gateway/tutorial-ingress-controller-add-on-existing) to set up the Azure +Application Gateway Ingress Controller. + +- Deploy a New Application Gateway. + +First, you need to create a WAF policy + +``` +az network application-gateway waf-policy create -g myResourceGroup -n myWAFPolicy +``` + +- Before the application gateway can be deployed, you'll also need to create a public IP resource, a new virtual network with address space 10.0.0.0/16, and a subnet with address space 10.0.0.0/24. +Then, you can deploy your application gateway in the subnet using the publicIP. + +Caution: When you use an AKS cluster and application gateway in separate virtual networks, the address spaces of the two virtual networks must not overlap. The default address space that an AKS cluster deploys in is 10.224.0.0/12. + + +``` +az network public-ip create -n myPublicIp -g myResourceGroup --allocation-method Static --sku Standard +az network vnet create -n myVnet -g myResourceGroup --address-prefix 10.0.0.0/16 --subnet-name mySubnet --subnet-prefix 10.0.0.0/24 +az network application-gateway create -n myApplicationGateway -l eastus -g myResourceGroup --sku WAF_v2 --public-ip-address myPublicIp --vnet-name myVnet --subnet mySubnet --priority 100 --waf-policy /subscriptions/{subscription_id}/resourceGroups/myResourceGroup/providers/Microsoft.Network/ApplicationGatewayWebApplicationFirewallPolicies/myWAFPolicy +``` +Change myPublicIp, myResourceGroup, myVnet, mySubnet, and myApplicationGateway to names of your choosing. + + +- Enable the AGIC Add-On in Existing AKS Cluster Through Azure CLI + +``` +appgwId=$(az network application-gateway show -n myApplicationGateway -g myResourceGroup -o tsv --query "id") +az aks enable-addons -n myCluster -g myResourceGroup -a ingress-appgw --appgw-id $appgwId +``` + +- Peer the Two Virtual Networks Together + +Since you deployed the AKS cluster in its own virtual network and the Application gateway in another virtual network, you'll need to peer the two virtual networks together in order for traffic to flow from the Application gateway to the pods in the cluster. + +``` +nodeResourceGroup=$(az aks show -n myCluster -g myResourceGroup -o tsv --query "nodeResourceGroup") +aksVnetName=$(az network vnet list -g $nodeResourceGroup -o tsv --query "[0].name") + +aksVnetId=$(az network vnet show -n $aksVnetName -g $nodeResourceGroup -o tsv --query "id") +az network vnet peering create -n AppGWtoAKSVnetPeering -g myResourceGroup --vnet-name myVnet --remote-vnet $aksVnetId --allow-vnet-access + +appGWVnetId=$(az network vnet show -n myVnet -g myResourceGroup -o tsv --query "id") +az network vnet peering create -n AKStoAppGWVnetPeering -g $nodeResourceGroup --vnet-name $aksVnetName --remote-vnet $appGWVnetId --allow-vnet-access +``` + +- Deploy the Ingress on the Frontend Pod + +In order to use the ingress controller to expose frontend pod, we need to update the datahub-frontend section of the values.yaml file that was used to deploy DataHub. Here is a sample configuration: + +``` +datahub-frontend: + enabled: true + image: + repository: linkedin/datahub-frontend-react + # tag: "v0.10.0 # defaults to .global.datahub.version + + # Set up ingress to expose react front-end + ingress: + enabled: true + annotations: + kubernetes.io/ingress.class: azure/application-gateway + appgw.ingress.kubernetes.io/backend-protocol: "http" + + hosts: + - paths: + - /* + defaultUserCredentials: {} +``` + +You can then apply the updates: + +``` +helm upgrade --install datahub datahub/datahub --values values.yaml +``` + +You can now verify that the ingress was created correctly + +``` +kubectl get ingress +``` + +You should see a result like this: + +![frontend-image](https://github.com/Saketh-Mahesh/azure-docs-images/blob/main/frontend-status.png?raw=true) + +## Use PostgresSQL for the storage layer +Configure a PostgreSQL database in the same virtual network as the Kubernetes cluster or implement virtual network peering to connect both networks. Once the database is provisioned, you should be able to see the following page under the Connect tab on the left side. + + +Note: PostgreSQL Database MUST be deployed in same location as AKS/resource group (eastus, centralus, etc.) +Take a note of the connection details: + +![postgres-info](https://github.com/Saketh-Mahesh/azure-docs-images/blob/main/postgres-info.png?raw=true) + + + + + +- Update the postgresql settings under global in the values.yaml as follows. + +``` +global: + sql: + datasource: + host: "${POSTGRES_HOST}.postgres.database.azure.com:5432" + hostForpostgresqlClient: "${POSTGRES_HOST}.postgres.database.azure.com" + port: "5432" + url: "jdbc:postgresql://${POSTGRES_HOST}.postgres.database.azure.com:5432/datahub?user=${POSTGRES_ADMIN_LOGIN}&password=${POSTGRES_ADMIN_PASSWORD}&sslmode=require" + driver: "org.postgresql.Driver" + username: "${POSTGRES_ADMIN_LOGIN}" + password: + value: "${POSTGRES_ADMIN_PASSWORD}" +``` +Run this command helm command to update datahub configuration + +``` +helm upgrade --install datahub datahub/datahub --values values.yaml +``` + +And there you go! You have now installed DataHub on an Azure Kubernetes Cluster with an ingress controller set up to expose the frontend. Additionally you have utilized PostgreSQL as the storage layer of DataHub. \ No newline at end of file From b3ac42b1e4f43ae2ddcd9e884dc182d3a963f99a Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 18 Oct 2023 13:42:03 -0400 Subject: [PATCH 4/9] feat(frontend/ingestion): Support flagged / warning / connection failure statuses; add recipe (#8920) --- .../ExecutionRequestDetailsModal.tsx | 45 +++++++++++++++++-- .../src/app/ingest/source/utils.ts | 40 +++++++++++------ .../src/graphql/ingestion.graphql | 4 ++ 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/datahub-web-react/src/app/ingest/source/executions/ExecutionRequestDetailsModal.tsx b/datahub-web-react/src/app/ingest/source/executions/ExecutionRequestDetailsModal.tsx index 849efabdcde97b..00fdc89964f88a 100644 --- a/datahub-web-react/src/app/ingest/source/executions/ExecutionRequestDetailsModal.tsx +++ b/datahub-web-react/src/app/ingest/source/executions/ExecutionRequestDetailsModal.tsx @@ -2,6 +2,7 @@ import { DownloadOutlined } from '@ant-design/icons'; import { Button, message, Modal, Typography } from 'antd'; import React, { useEffect, useState } from 'react'; import styled from 'styled-components'; +import YAML from 'yamljs'; import { useGetIngestionExecutionRequestQuery } from '../../../../graphql/ingestion.generated'; import { ANTD_GRAY } from '../../../entity/shared/constants'; import { downloadFile } from '../../../search/utils/csvUtils'; @@ -65,6 +66,13 @@ const IngestedAssetsSection = styled.div` padding-right: 30px; `; +const RecipeSection = styled.div` + border-top: 1px solid ${ANTD_GRAY[4]}; + padding-top: 16px; + padding-left: 30px; + padding-right: 30px; +`; + const LogsSection = styled.div` padding-top: 16px; padding-left: 30px; @@ -91,6 +99,8 @@ type Props = { export const ExecutionDetailsModal = ({ urn, visible, onClose }: Props) => { const [showExpandedLogs, setShowExpandedLogs] = useState(false); + const [showExpandedRecipe, setShowExpandedRecipe] = useState(false); + const { data, loading, error, refetch } = useGetIngestionExecutionRequestQuery({ variables: { urn } }); const output = data?.executionRequest?.result?.report || 'No output found.'; @@ -120,7 +130,18 @@ export const ExecutionDetailsModal = ({ urn, visible, onClose }: Props) => { const resultSummaryText = (result && {getExecutionRequestSummaryText(result)}) || undefined; - const isOutputExpandable = output.length > 100; + + const recipeJson = data?.executionRequest?.input.arguments?.find((arg) => arg.key === 'recipe')?.value; + let recipeYaml: string; + try { + recipeYaml = recipeJson && YAML.stringify(JSON.parse(recipeJson), 8, 2).trim(); + } catch (e) { + recipeYaml = ''; + } + const recipe = showExpandedRecipe ? recipeYaml : recipeYaml?.split('\n').slice(0, 1).join('\n'); + + const areLogsExpandable = output.length > 100; + const isRecipeExpandable = recipeYaml?.includes('\n'); return ( { -
{`${logs}${!showExpandedLogs && isOutputExpandable ? '...' : ''}`}
- {isOutputExpandable && ( +
{`${logs}${!showExpandedLogs && areLogsExpandable ? '...' : ''}`}
+ {areLogsExpandable && ( setShowExpandedLogs(!showExpandedLogs)}> {showExpandedLogs ? 'Hide' : 'Show More'} )}
+ {recipe && ( + + Recipe + + + The recipe used for this ingestion run. + + + +
{`${recipe}${!showExpandedRecipe && isRecipeExpandable ? '\n...' : ''}`}
+
+ {isRecipeExpandable && ( + setShowExpandedRecipe((v) => !v)}> + {showExpandedRecipe ? 'Hide' : 'Show More'} + + )} +
+ )}
); diff --git a/datahub-web-react/src/app/ingest/source/utils.ts b/datahub-web-react/src/app/ingest/source/utils.ts index c372388e958b78..f789ed8434721d 100644 --- a/datahub-web-react/src/app/ingest/source/utils.ts +++ b/datahub-web-react/src/app/ingest/source/utils.ts @@ -1,17 +1,19 @@ -import YAML from 'yamljs'; import { CheckCircleOutlined, ClockCircleOutlined, CloseCircleOutlined, + ExclamationCircleOutlined, LoadingOutlined, + StopOutlined, WarningOutlined, } from '@ant-design/icons'; -import { ANTD_GRAY, REDESIGN_COLORS } from '../../entity/shared/constants'; +import YAML from 'yamljs'; +import { ListIngestionSourcesDocument, ListIngestionSourcesQuery } from '../../../graphql/ingestion.generated'; import { EntityType, FacetMetadata } from '../../../types.generated'; -import { capitalizeFirstLetterOnly, pluralize } from '../../shared/textUtil'; import EntityRegistry from '../../entity/EntityRegistry'; +import { ANTD_GRAY, REDESIGN_COLORS } from '../../entity/shared/constants'; +import { capitalizeFirstLetterOnly, pluralize } from '../../shared/textUtil'; import { SourceConfig } from './builder/types'; -import { ListIngestionSourcesDocument, ListIngestionSourcesQuery } from '../../../graphql/ingestion.generated'; export const getSourceConfigs = (ingestionSources: SourceConfig[], sourceType: string) => { const sourceConfigs = ingestionSources.find((source) => source.name === sourceType); @@ -40,7 +42,9 @@ export function getPlaceholderRecipe(ingestionSources: SourceConfig[], type?: st export const RUNNING = 'RUNNING'; export const SUCCESS = 'SUCCESS'; +export const WARNING = 'WARNING'; export const FAILURE = 'FAILURE'; +export const CONNECTION_FAILURE = 'CONNECTION_FAILURE'; export const CANCELLED = 'CANCELLED'; export const UP_FOR_RETRY = 'UP_FOR_RETRY'; export const ROLLING_BACK = 'ROLLING_BACK'; @@ -56,8 +60,10 @@ export const getExecutionRequestStatusIcon = (status: string) => { return ( (status === RUNNING && LoadingOutlined) || (status === SUCCESS && CheckCircleOutlined) || + (status === WARNING && ExclamationCircleOutlined) || (status === FAILURE && CloseCircleOutlined) || - (status === CANCELLED && CloseCircleOutlined) || + (status === CONNECTION_FAILURE && CloseCircleOutlined) || + (status === CANCELLED && StopOutlined) || (status === UP_FOR_RETRY && ClockCircleOutlined) || (status === ROLLED_BACK && WarningOutlined) || (status === ROLLING_BACK && LoadingOutlined) || @@ -70,7 +76,9 @@ export const getExecutionRequestStatusDisplayText = (status: string) => { return ( (status === RUNNING && 'Running') || (status === SUCCESS && 'Succeeded') || + (status === WARNING && 'Completed') || (status === FAILURE && 'Failed') || + (status === CONNECTION_FAILURE && 'Connection Failed') || (status === CANCELLED && 'Cancelled') || (status === UP_FOR_RETRY && 'Up for Retry') || (status === ROLLED_BACK && 'Rolled Back') || @@ -83,21 +91,25 @@ export const getExecutionRequestStatusDisplayText = (status: string) => { export const getExecutionRequestSummaryText = (status: string) => { switch (status) { case RUNNING: - return 'Ingestion is running'; + return 'Ingestion is running...'; case SUCCESS: - return 'Ingestion successfully completed'; + return 'Ingestion succeeded with no errors or suspected missing data.'; + case WARNING: + return 'Ingestion completed with minor or intermittent errors.'; case FAILURE: - return 'Ingestion completed with errors'; + return 'Ingestion failed to complete, or completed with serious errors.'; + case CONNECTION_FAILURE: + return 'Ingestion failed due to network, authentication, or permission issues.'; case CANCELLED: - return 'Ingestion was cancelled'; + return 'Ingestion was cancelled.'; case ROLLED_BACK: - return 'Ingestion was rolled back'; + return 'Ingestion was rolled back.'; case ROLLING_BACK: - return 'Ingestion is in the process of rolling back'; + return 'Ingestion is in the process of rolling back.'; case ROLLBACK_FAILED: - return 'Ingestion rollback failed'; + return 'Ingestion rollback failed.'; default: - return 'Ingestion status not recognized'; + return 'Ingestion status not recognized.'; } }; @@ -105,7 +117,9 @@ export const getExecutionRequestStatusDisplayColor = (status: string) => { return ( (status === RUNNING && REDESIGN_COLORS.BLUE) || (status === SUCCESS && 'green') || + (status === WARNING && 'orangered') || (status === FAILURE && 'red') || + (status === CONNECTION_FAILURE && 'crimson') || (status === UP_FOR_RETRY && 'orange') || (status === CANCELLED && ANTD_GRAY[9]) || (status === ROLLED_BACK && 'orange') || diff --git a/datahub-web-react/src/graphql/ingestion.graphql b/datahub-web-react/src/graphql/ingestion.graphql index 80f66642fe11f8..c127e9ec03f9a4 100644 --- a/datahub-web-react/src/graphql/ingestion.graphql +++ b/datahub-web-react/src/graphql/ingestion.graphql @@ -90,6 +90,10 @@ query getIngestionExecutionRequest($urn: String!) { source { type } + arguments { + key + value + } } result { status From 1b737243b266843136918ec92f6d20573b999272 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Wed, 18 Oct 2023 13:45:46 -0500 Subject: [PATCH 5/9] feat(avro): upgrade avro to 1.11 (#9031) --- build.gradle | 7 +++---- buildSrc/build.gradle | 9 ++++++++- docker/datahub-frontend/start.sh | 1 + metadata-dao-impl/kafka-producer/build.gradle | 4 ++-- metadata-events/{mxe-avro-1.7 => mxe-avro}/.gitignore | 0 metadata-events/{mxe-avro-1.7 => mxe-avro}/build.gradle | 6 +++--- metadata-events/mxe-registration/build.gradle | 2 +- metadata-events/mxe-schemas/build.gradle | 2 +- .../{mxe-utils-avro-1.7 => mxe-utils-avro}/.gitignore | 0 .../{mxe-utils-avro-1.7 => mxe-utils-avro}/build.gradle | 2 +- .../src/main/java/com/linkedin/metadata/EventUtils.java | 0 .../test/java/com/linkedin/metadata/EventUtilsTests.java | 0 .../src/test/resources/test-avro2pegasus-mae.json | 0 .../src/test/resources/test-avro2pegasus-mce.json | 0 .../src/test/resources/test-pegasus2avro-fmce.json | 0 .../src/test/resources/test-pegasus2avro-mae.json | 0 .../src/test/resources/test-pegasus2avro-mce.json | 0 metadata-integration/java/datahub-client/build.gradle | 2 +- .../main/java/datahub/client/kafka/AvroSerializer.java | 4 +++- metadata-io/build.gradle | 4 ++-- metadata-jobs/mae-consumer/build.gradle | 4 ++-- metadata-jobs/mce-consumer/build.gradle | 4 ++-- metadata-jobs/pe-consumer/build.gradle | 4 ++-- metadata-service/restli-servlet-impl/build.gradle | 2 +- metadata-service/services/build.gradle | 4 ++-- metadata-utils/build.gradle | 6 +++--- settings.gradle | 4 ++-- 27 files changed, 40 insertions(+), 31 deletions(-) rename metadata-events/{mxe-avro-1.7 => mxe-avro}/.gitignore (100%) rename metadata-events/{mxe-avro-1.7 => mxe-avro}/build.gradle (81%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/.gitignore (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/build.gradle (95%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/main/java/com/linkedin/metadata/EventUtils.java (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/test/java/com/linkedin/metadata/EventUtilsTests.java (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/test/resources/test-avro2pegasus-mae.json (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/test/resources/test-avro2pegasus-mce.json (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/test/resources/test-pegasus2avro-fmce.json (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/test/resources/test-pegasus2avro-mae.json (100%) rename metadata-events/{mxe-utils-avro-1.7 => mxe-utils-avro}/src/test/resources/test-pegasus2avro-mce.json (100%) diff --git a/build.gradle b/build.gradle index 025c588da2b523..cf55a59cfe6942 100644 --- a/build.gradle +++ b/build.gradle @@ -27,7 +27,7 @@ buildscript { dependencies { classpath 'com.linkedin.pegasus:gradle-plugins:' + pegasusVersion classpath 'com.github.node-gradle:gradle-node-plugin:2.2.4' - classpath 'io.acryl.gradle.plugin:gradle-avro-plugin:0.8.1' + classpath 'io.acryl.gradle.plugin:gradle-avro-plugin:0.2.0' classpath 'org.springframework.boot:spring-boot-gradle-plugin:' + springBootVersion classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.30.0" classpath "com.palantir.gradle.gitversion:gradle-git-version:3.0.0" @@ -67,8 +67,8 @@ project.ext.externalDependency = [ 'antlr4Runtime': 'org.antlr:antlr4-runtime:4.7.2', 'antlr4': 'org.antlr:antlr4:4.7.2', 'assertJ': 'org.assertj:assertj-core:3.11.1', - 'avro_1_7': 'org.apache.avro:avro:1.7.7', - 'avroCompiler_1_7': 'org.apache.avro:avro-compiler:1.7.7', + 'avro': 'org.apache.avro:avro:1.11.3', + 'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3', 'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.10', 'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:1.1.1', 'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.8', @@ -127,7 +127,6 @@ project.ext.externalDependency = [ 'jgrapht': 'org.jgrapht:jgrapht-core:1.5.1', 'jna': 'net.java.dev.jna:jna:5.12.1', 'jsonPatch': 'com.github.java-json-tools:json-patch:1.13', - 'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4', 'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1', 'jsonSmart': 'net.minidev:json-smart:2.4.9', 'json': 'org.json:json:20230227', diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 65b3780431db9d..1f9d30d520171b 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -5,7 +5,14 @@ buildscript { } dependencies { - implementation('io.acryl:json-schema-avro:0.1.5') { + /** + * Forked version of abandoned repository: https://github.com/fge/json-schema-avro + * Maintainer last active 2014, we maintain an active fork of this repository to utilize mapping Avro schemas to Json Schemas, + * repository is as close to official library for this as you can get. Original maintainer is one of the authors of Json Schema spec. + * Other companies are also separately maintaining forks (like: https://github.com/java-json-tools/json-schema-avro). + * We have built several customizations on top of it for various bug fixes, especially around union scheams + */ + implementation('io.acryl:json-schema-avro:0.2.2') { exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' exclude group: 'com.google.guava', module: 'guava' } diff --git a/docker/datahub-frontend/start.sh b/docker/datahub-frontend/start.sh index 9dc1514144bb1a..430982aa2456ba 100755 --- a/docker/datahub-frontend/start.sh +++ b/docker/datahub-frontend/start.sh @@ -50,6 +50,7 @@ export JAVA_OPTS="-Xms512m \ -Djava.security.auth.login.config=datahub-frontend/conf/jaas.conf \ -Dlogback.configurationFile=datahub-frontend/conf/logback.xml \ -Dlogback.debug=false \ + -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 \ ${PROMETHEUS_AGENT:-} ${OTEL_AGENT:-} \ ${TRUSTSTORE_FILE:-} ${TRUSTSTORE_TYPE:-} ${TRUSTSTORE_PASSWORD:-} \ ${HTTP_PROXY:-} ${HTTPS_PROXY:-} ${NO_PROXY:-} \ diff --git a/metadata-dao-impl/kafka-producer/build.gradle b/metadata-dao-impl/kafka-producer/build.gradle index 393b10b0e9d246..bc3415b2ccc8c1 100644 --- a/metadata-dao-impl/kafka-producer/build.gradle +++ b/metadata-dao-impl/kafka-producer/build.gradle @@ -1,9 +1,9 @@ apply plugin: 'java' dependencies { - implementation project(':metadata-events:mxe-avro-1.7') + implementation project(':metadata-events:mxe-avro') implementation project(':metadata-events:mxe-registration') - implementation project(':metadata-events:mxe-utils-avro-1.7') + implementation project(':metadata-events:mxe-utils-avro') implementation project(':entity-registry') implementation project(':metadata-io') diff --git a/metadata-events/mxe-avro-1.7/.gitignore b/metadata-events/mxe-avro/.gitignore similarity index 100% rename from metadata-events/mxe-avro-1.7/.gitignore rename to metadata-events/mxe-avro/.gitignore diff --git a/metadata-events/mxe-avro-1.7/build.gradle b/metadata-events/mxe-avro/build.gradle similarity index 81% rename from metadata-events/mxe-avro-1.7/build.gradle rename to metadata-events/mxe-avro/build.gradle index 8c0a26d22dc7d2..9d11eeb160ff0f 100644 --- a/metadata-events/mxe-avro-1.7/build.gradle +++ b/metadata-events/mxe-avro/build.gradle @@ -6,8 +6,8 @@ apply plugin: 'io.acryl.gradle.plugin.avro' apply plugin: 'java-library' dependencies { - api externalDependency.avro_1_7 - implementation(externalDependency.avroCompiler_1_7) { + api externalDependency.avro + implementation(externalDependency.avroCompiler) { exclude group: 'org.apache.velocity', module: 'velocity' } constraints { @@ -21,7 +21,7 @@ dependencies { def genDir = file("src/generated/java") -task avroCodeGen(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask, dependsOn: configurations.avsc) { +task avroCodeGen(type: com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask, dependsOn: configurations.avsc) { source("$rootDir/metadata-events/mxe-schemas/src/renamed/avro") outputDir = genDir dependsOn(':metadata-events:mxe-schemas:renameNamespace') diff --git a/metadata-events/mxe-registration/build.gradle b/metadata-events/mxe-registration/build.gradle index 60e0da59616d93..032870d93329ff 100644 --- a/metadata-events/mxe-registration/build.gradle +++ b/metadata-events/mxe-registration/build.gradle @@ -5,7 +5,7 @@ configurations { } dependencies { - implementation project(':metadata-events:mxe-avro-1.7') + implementation project(':metadata-events:mxe-avro') implementation project(':metadata-models') implementation spec.product.pegasus.dataAvro1_6 diff --git a/metadata-events/mxe-schemas/build.gradle b/metadata-events/mxe-schemas/build.gradle index fe46601fb68b79..8dc8b71bd1cd83 100644 --- a/metadata-events/mxe-schemas/build.gradle +++ b/metadata-events/mxe-schemas/build.gradle @@ -1,4 +1,4 @@ -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'pegasus' dependencies { diff --git a/metadata-events/mxe-utils-avro-1.7/.gitignore b/metadata-events/mxe-utils-avro/.gitignore similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/.gitignore rename to metadata-events/mxe-utils-avro/.gitignore diff --git a/metadata-events/mxe-utils-avro-1.7/build.gradle b/metadata-events/mxe-utils-avro/build.gradle similarity index 95% rename from metadata-events/mxe-utils-avro-1.7/build.gradle rename to metadata-events/mxe-utils-avro/build.gradle index 3b137965d6c19f..a7bf287ab224d3 100644 --- a/metadata-events/mxe-utils-avro-1.7/build.gradle +++ b/metadata-events/mxe-utils-avro/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'java-library' dependencies { - api project(':metadata-events:mxe-avro-1.7') + api project(':metadata-events:mxe-avro') api project(':metadata-models') api spec.product.pegasus.dataAvro1_6 diff --git a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java rename to metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java diff --git a/metadata-events/mxe-utils-avro-1.7/src/test/java/com/linkedin/metadata/EventUtilsTests.java b/metadata-events/mxe-utils-avro/src/test/java/com/linkedin/metadata/EventUtilsTests.java similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/test/java/com/linkedin/metadata/EventUtilsTests.java rename to metadata-events/mxe-utils-avro/src/test/java/com/linkedin/metadata/EventUtilsTests.java diff --git a/metadata-events/mxe-utils-avro-1.7/src/test/resources/test-avro2pegasus-mae.json b/metadata-events/mxe-utils-avro/src/test/resources/test-avro2pegasus-mae.json similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/test/resources/test-avro2pegasus-mae.json rename to metadata-events/mxe-utils-avro/src/test/resources/test-avro2pegasus-mae.json diff --git a/metadata-events/mxe-utils-avro-1.7/src/test/resources/test-avro2pegasus-mce.json b/metadata-events/mxe-utils-avro/src/test/resources/test-avro2pegasus-mce.json similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/test/resources/test-avro2pegasus-mce.json rename to metadata-events/mxe-utils-avro/src/test/resources/test-avro2pegasus-mce.json diff --git a/metadata-events/mxe-utils-avro-1.7/src/test/resources/test-pegasus2avro-fmce.json b/metadata-events/mxe-utils-avro/src/test/resources/test-pegasus2avro-fmce.json similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/test/resources/test-pegasus2avro-fmce.json rename to metadata-events/mxe-utils-avro/src/test/resources/test-pegasus2avro-fmce.json diff --git a/metadata-events/mxe-utils-avro-1.7/src/test/resources/test-pegasus2avro-mae.json b/metadata-events/mxe-utils-avro/src/test/resources/test-pegasus2avro-mae.json similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/test/resources/test-pegasus2avro-mae.json rename to metadata-events/mxe-utils-avro/src/test/resources/test-pegasus2avro-mae.json diff --git a/metadata-events/mxe-utils-avro-1.7/src/test/resources/test-pegasus2avro-mce.json b/metadata-events/mxe-utils-avro/src/test/resources/test-pegasus2avro-mce.json similarity index 100% rename from metadata-events/mxe-utils-avro-1.7/src/test/resources/test-pegasus2avro-mce.json rename to metadata-events/mxe-utils-avro/src/test/resources/test-pegasus2avro-mce.json diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index 95de3cdb3c5262..e6210f1f073f6b 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -30,7 +30,7 @@ dependencies { implementation(externalDependency.kafkaAvroSerializer) { exclude group: "org.apache.avro" } - implementation externalDependency.avro_1_7 + implementation externalDependency.avro constraints { implementation('commons-collections:commons-collections:3.2.2') { because 'Vulnerability Issue' diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java index ee0d459aaa7d3b..6212e57470be4f 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java @@ -16,12 +16,14 @@ class AvroSerializer { private final Schema _recordSchema; private final Schema _genericAspectSchema; + private final Schema _changeTypeEnumSchema; private final EventFormatter _eventFormatter; public AvroSerializer() throws IOException { _recordSchema = new Schema.Parser() .parse(this.getClass().getClassLoader().getResourceAsStream("MetadataChangeProposal.avsc")); _genericAspectSchema = this._recordSchema.getField("aspect").schema().getTypes().get(1); + _changeTypeEnumSchema = this._recordSchema.getField("changeType").schema(); _eventFormatter = new EventFormatter(EventFormatter.Format.PEGASUS_JSON); } @@ -43,7 +45,7 @@ public GenericRecord serialize(MetadataChangeProposal mcp) throws IOException { genericRecord.put("aspect", genericAspect); genericRecord.put("aspectName", mcp.getAspectName()); genericRecord.put("entityType", mcp.getEntityType()); - genericRecord.put("changeType", mcp.getChangeType()); + genericRecord.put("changeType", new GenericData.EnumSymbol(_changeTypeEnumSchema, mcp.getChangeType())); return genericRecord; } } \ No newline at end of file diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index ad54cf65243982..740fed61f13d56 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -8,9 +8,9 @@ configurations { dependencies { implementation project(':entity-registry') api project(':metadata-utils') - api project(':metadata-events:mxe-avro-1.7') + api project(':metadata-events:mxe-avro') api project(':metadata-events:mxe-registration') - api project(':metadata-events:mxe-utils-avro-1.7') + api project(':metadata-events:mxe-utils-avro') api project(':metadata-models') api project(':metadata-service:restli-client') api project(':metadata-service:configuration') diff --git a/metadata-jobs/mae-consumer/build.gradle b/metadata-jobs/mae-consumer/build.gradle index d36fd0de40d035..fcb8b62e4ac9d5 100644 --- a/metadata-jobs/mae-consumer/build.gradle +++ b/metadata-jobs/mae-consumer/build.gradle @@ -21,9 +21,9 @@ dependencies { implementation project(':ingestion-scheduler') implementation project(':metadata-utils') implementation project(":entity-registry") - implementation project(':metadata-events:mxe-avro-1.7') + implementation project(':metadata-events:mxe-avro') implementation project(':metadata-events:mxe-registration') - implementation project(':metadata-events:mxe-utils-avro-1.7') + implementation project(':metadata-events:mxe-utils-avro') implementation project(':datahub-graphql-core') implementation externalDependency.elasticSearchRest diff --git a/metadata-jobs/mce-consumer/build.gradle b/metadata-jobs/mce-consumer/build.gradle index 0bca55e0e5f92d..97eec9fcff051c 100644 --- a/metadata-jobs/mce-consumer/build.gradle +++ b/metadata-jobs/mce-consumer/build.gradle @@ -17,9 +17,9 @@ dependencies { } implementation project(':metadata-utils') implementation project(':metadata-events:mxe-schemas') - implementation project(':metadata-events:mxe-avro-1.7') + implementation project(':metadata-events:mxe-avro') implementation project(':metadata-events:mxe-registration') - implementation project(':metadata-events:mxe-utils-avro-1.7') + implementation project(':metadata-events:mxe-utils-avro') implementation project(':metadata-io') implementation project(':metadata-service:restli-client') implementation spec.product.pegasus.restliClient diff --git a/metadata-jobs/pe-consumer/build.gradle b/metadata-jobs/pe-consumer/build.gradle index 1899a4de15635a..81e8b8c9971f00 100644 --- a/metadata-jobs/pe-consumer/build.gradle +++ b/metadata-jobs/pe-consumer/build.gradle @@ -10,9 +10,9 @@ configurations { dependencies { avro project(path: ':metadata-models', configuration: 'avroSchema') implementation project(':li-utils') - implementation project(':metadata-events:mxe-avro-1.7') + implementation project(':metadata-events:mxe-avro') implementation project(':metadata-events:mxe-registration') - implementation project(':metadata-events:mxe-utils-avro-1.7') + implementation project(':metadata-events:mxe-utils-avro') implementation(project(':metadata-service:factories')) { exclude group: 'org.neo4j.test' } diff --git a/metadata-service/restli-servlet-impl/build.gradle b/metadata-service/restli-servlet-impl/build.gradle index cb307863748c31..de6fb6690e693b 100644 --- a/metadata-service/restli-servlet-impl/build.gradle +++ b/metadata-service/restli-servlet-impl/build.gradle @@ -48,7 +48,7 @@ dependencies { implementation externalDependency.dropwizardMetricsCore implementation externalDependency.dropwizardMetricsJmx - compileOnly externalDependency.lombok + implementation externalDependency.lombok implementation externalDependency.neo4jJavaDriver implementation externalDependency.opentelemetryAnnotations diff --git a/metadata-service/services/build.gradle b/metadata-service/services/build.gradle index 22c62af324c12d..b6af3d330d185b 100644 --- a/metadata-service/services/build.gradle +++ b/metadata-service/services/build.gradle @@ -9,9 +9,9 @@ dependencies { implementation externalDependency.jsonPatch implementation project(':entity-registry') implementation project(':metadata-utils') - implementation project(':metadata-events:mxe-avro-1.7') + implementation project(':metadata-events:mxe-avro') implementation project(':metadata-events:mxe-registration') - implementation project(':metadata-events:mxe-utils-avro-1.7') + implementation project(':metadata-events:mxe-utils-avro') implementation project(':metadata-models') implementation project(':metadata-service:restli-client') implementation project(':metadata-service:configuration') diff --git a/metadata-utils/build.gradle b/metadata-utils/build.gradle index 1c1c368611488f..7bc6aa2d434424 100644 --- a/metadata-utils/build.gradle +++ b/metadata-utils/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'java-library' dependencies { - api externalDependency.avro_1_7 + api externalDependency.avro implementation externalDependency.commonsLang api externalDependency.dropwizardMetricsCore implementation externalDependency.dropwizardMetricsJmx @@ -16,8 +16,8 @@ dependencies { api project(':li-utils') api project(':entity-registry') - api project(':metadata-events:mxe-avro-1.7') - api project(':metadata-events:mxe-utils-avro-1.7') + api project(':metadata-events:mxe-avro') + api project(':metadata-events:mxe-utils-avro') implementation externalDependency.slf4jApi compileOnly externalDependency.lombok diff --git a/settings.gradle b/settings.gradle index d6777b07b3fb3c..52de461383b5e8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,10 +20,10 @@ include 'metadata-service:openapi-analytics-servlet' include 'metadata-service:plugin' include 'metadata-service:plugin:src:test:sample-test-plugins' include 'metadata-dao-impl:kafka-producer' -include 'metadata-events:mxe-avro-1.7' +include 'metadata-events:mxe-avro' include 'metadata-events:mxe-registration' include 'metadata-events:mxe-schemas' -include 'metadata-events:mxe-utils-avro-1.7' +include 'metadata-events:mxe-utils-avro' include 'metadata-ingestion' include 'metadata-jobs:mae-consumer' include 'metadata-jobs:mce-consumer' From aae1347efce9edf1b5c4512ba3c72569e165947d Mon Sep 17 00:00:00 2001 From: Indy Prentice Date: Wed, 18 Oct 2023 16:26:24 -0300 Subject: [PATCH 6/9] fix(search): Detect field type for use in defining the sort order (#8992) Co-authored-by: Indy Prentice --- .../indexbuilder/MappingsBuilder.java | 48 +++++------- .../query/request/SearchRequestHandler.java | 8 +- .../metadata/search/utils/ESUtils.java | 74 ++++++++++++++++++- .../fixtures/SampleDataFixtureTestBase.java | 64 ++++++++++++++-- 4 files changed, 154 insertions(+), 40 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java index 004b2e0a2adc4c..1edc77bbd214c9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/MappingsBuilder.java @@ -5,6 +5,7 @@ import com.linkedin.metadata.models.SearchScoreFieldSpec; import com.linkedin.metadata.models.SearchableFieldSpec; import com.linkedin.metadata.models.annotation.SearchableAnnotation.FieldType; +import com.linkedin.metadata.search.utils.ESUtils; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,15 +32,6 @@ public static Map getPartialNgramConfigWithOverrides(Map KEYWORD_TYPE_MAP = ImmutableMap.of(TYPE, KEYWORD); - // Field Types - public static final String BOOLEAN = "boolean"; - public static final String DATE = "date"; - public static final String DOUBLE = "double"; - public static final String LONG = "long"; - public static final String OBJECT = "object"; - public static final String TEXT = "text"; - public static final String TOKEN_COUNT = "token_count"; - // Subfields public static final String DELIMITED = "delimited"; public static final String LENGTH = "length"; @@ -74,7 +66,7 @@ public static Map getMappings(@Nonnull final EntitySpec entitySp private static Map getMappingsForUrn() { Map subFields = new HashMap<>(); subFields.put(DELIMITED, ImmutableMap.of( - TYPE, TEXT, + TYPE, ESUtils.TEXT_FIELD_TYPE, ANALYZER, URN_ANALYZER, SEARCH_ANALYZER, URN_SEARCH_ANALYZER, SEARCH_QUOTE_ANALYZER, CUSTOM_QUOTE_ANALYZER) @@ -85,13 +77,13 @@ private static Map getMappingsForUrn() { ) )); return ImmutableMap.builder() - .put(TYPE, KEYWORD) + .put(TYPE, ESUtils.KEYWORD_FIELD_TYPE) .put(FIELDS, subFields) .build(); } private static Map getMappingsForRunId() { - return ImmutableMap.builder().put(TYPE, KEYWORD).build(); + return ImmutableMap.builder().put(TYPE, ESUtils.KEYWORD_FIELD_TYPE).build(); } private static Map getMappingsForField(@Nonnull final SearchableFieldSpec searchableFieldSpec) { @@ -104,23 +96,23 @@ private static Map getMappingsForField(@Nonnull final Searchable } else if (fieldType == FieldType.TEXT || fieldType == FieldType.TEXT_PARTIAL || fieldType == FieldType.WORD_GRAM) { mappingForField.putAll(getMappingsForSearchText(fieldType)); } else if (fieldType == FieldType.BROWSE_PATH) { - mappingForField.put(TYPE, TEXT); + mappingForField.put(TYPE, ESUtils.TEXT_FIELD_TYPE); mappingForField.put(FIELDS, ImmutableMap.of(LENGTH, ImmutableMap.of( - TYPE, TOKEN_COUNT, + TYPE, ESUtils.TOKEN_COUNT_FIELD_TYPE, ANALYZER, SLASH_PATTERN_ANALYZER))); mappingForField.put(ANALYZER, BROWSE_PATH_HIERARCHY_ANALYZER); mappingForField.put(FIELDDATA, true); } else if (fieldType == FieldType.BROWSE_PATH_V2) { - mappingForField.put(TYPE, TEXT); + mappingForField.put(TYPE, ESUtils.TEXT_FIELD_TYPE); mappingForField.put(FIELDS, ImmutableMap.of(LENGTH, ImmutableMap.of( - TYPE, TOKEN_COUNT, + TYPE, ESUtils.TOKEN_COUNT_FIELD_TYPE, ANALYZER, UNIT_SEPARATOR_PATTERN_ANALYZER))); mappingForField.put(ANALYZER, BROWSE_PATH_V2_HIERARCHY_ANALYZER); mappingForField.put(FIELDDATA, true); } else if (fieldType == FieldType.URN || fieldType == FieldType.URN_PARTIAL) { - mappingForField.put(TYPE, TEXT); + mappingForField.put(TYPE, ESUtils.TEXT_FIELD_TYPE); mappingForField.put(ANALYZER, URN_ANALYZER); mappingForField.put(SEARCH_ANALYZER, URN_SEARCH_ANALYZER); mappingForField.put(SEARCH_QUOTE_ANALYZER, CUSTOM_QUOTE_ANALYZER); @@ -135,13 +127,13 @@ private static Map getMappingsForField(@Nonnull final Searchable subFields.put(KEYWORD, KEYWORD_TYPE_MAP); mappingForField.put(FIELDS, subFields); } else if (fieldType == FieldType.BOOLEAN) { - mappingForField.put(TYPE, BOOLEAN); + mappingForField.put(TYPE, ESUtils.BOOLEAN_FIELD_TYPE); } else if (fieldType == FieldType.COUNT) { - mappingForField.put(TYPE, LONG); + mappingForField.put(TYPE, ESUtils.LONG_FIELD_TYPE); } else if (fieldType == FieldType.DATETIME) { - mappingForField.put(TYPE, DATE); + mappingForField.put(TYPE, ESUtils.DATE_FIELD_TYPE); } else if (fieldType == FieldType.OBJECT) { - mappingForField.put(TYPE, OBJECT); + mappingForField.put(TYPE, ESUtils.DATE_FIELD_TYPE); } else { log.info("FieldType {} has no mappings implemented", fieldType); } @@ -149,10 +141,10 @@ private static Map getMappingsForField(@Nonnull final Searchable searchableFieldSpec.getSearchableAnnotation() .getHasValuesFieldName() - .ifPresent(fieldName -> mappings.put(fieldName, ImmutableMap.of(TYPE, BOOLEAN))); + .ifPresent(fieldName -> mappings.put(fieldName, ImmutableMap.of(TYPE, ESUtils.BOOLEAN_FIELD_TYPE))); searchableFieldSpec.getSearchableAnnotation() .getNumValuesFieldName() - .ifPresent(fieldName -> mappings.put(fieldName, ImmutableMap.of(TYPE, LONG))); + .ifPresent(fieldName -> mappings.put(fieldName, ImmutableMap.of(TYPE, ESUtils.LONG_FIELD_TYPE))); mappings.putAll(getMappingsForFieldNameAliases(searchableFieldSpec)); return mappings; @@ -160,7 +152,7 @@ private static Map getMappingsForField(@Nonnull final Searchable private static Map getMappingsForKeyword() { Map mappingForField = new HashMap<>(); - mappingForField.put(TYPE, KEYWORD); + mappingForField.put(TYPE, ESUtils.KEYWORD_FIELD_TYPE); mappingForField.put(NORMALIZER, KEYWORD_NORMALIZER); // Add keyword subfield without lowercase filter mappingForField.put(FIELDS, ImmutableMap.of(KEYWORD, KEYWORD_TYPE_MAP)); @@ -169,7 +161,7 @@ private static Map getMappingsForKeyword() { private static Map getMappingsForSearchText(FieldType fieldType) { Map mappingForField = new HashMap<>(); - mappingForField.put(TYPE, KEYWORD); + mappingForField.put(TYPE, ESUtils.KEYWORD_FIELD_TYPE); mappingForField.put(NORMALIZER, KEYWORD_NORMALIZER); Map subFields = new HashMap<>(); if (fieldType == FieldType.TEXT_PARTIAL || fieldType == FieldType.WORD_GRAM) { @@ -186,14 +178,14 @@ private static Map getMappingsForSearchText(FieldType fieldType) String fieldName = entry.getKey(); String analyzerName = entry.getValue(); subFields.put(fieldName, ImmutableMap.of( - TYPE, TEXT, + TYPE, ESUtils.TEXT_FIELD_TYPE, ANALYZER, analyzerName )); } } } subFields.put(DELIMITED, ImmutableMap.of( - TYPE, TEXT, + TYPE, ESUtils.TEXT_FIELD_TYPE, ANALYZER, TEXT_ANALYZER, SEARCH_ANALYZER, TEXT_SEARCH_ANALYZER, SEARCH_QUOTE_ANALYZER, CUSTOM_QUOTE_ANALYZER)); @@ -206,7 +198,7 @@ private static Map getMappingsForSearchText(FieldType fieldType) private static Map getMappingsForSearchScoreField( @Nonnull final SearchScoreFieldSpec searchScoreFieldSpec) { return ImmutableMap.of(searchScoreFieldSpec.getSearchScoreAnnotation().getFieldName(), - ImmutableMap.of(TYPE, DOUBLE)); + ImmutableMap.of(TYPE, ESUtils.DOUBLE_FIELD_TYPE)); } private static Map getMappingsForFieldNameAliases(@Nonnull final SearchableFieldSpec searchableFieldSpec) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java index 5fcc10b7af5cfa..c06907e800d5e7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java @@ -202,7 +202,7 @@ public SearchRequest getSearchRequest(@Nonnull String input, @Nullable Filter fi if (!finalSearchFlags.isSkipHighlighting()) { searchSourceBuilder.highlighter(_highlights); } - ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion); + ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, _entitySpecs); if (finalSearchFlags.isGetSuggestions()) { ESUtils.buildNameSuggestions(searchSourceBuilder, input); @@ -243,7 +243,7 @@ public SearchRequest getSearchRequest(@Nonnull String input, @Nullable Filter fi searchSourceBuilder.query(QueryBuilders.boolQuery().must(getQuery(input, finalSearchFlags.isFulltext())).filter(filterQuery)); _aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation); searchSourceBuilder.highlighter(getHighlights()); - ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion); + ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, _entitySpecs); searchRequest.source(searchSourceBuilder); log.debug("Search request is: " + searchRequest); searchRequest.indicesOptions(null); @@ -270,7 +270,7 @@ public SearchRequest getFilterRequest(@Nullable Filter filters, @Nullable SortCr final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(filterQuery); searchSourceBuilder.from(from).size(size); - ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion); + ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, _entitySpecs); searchRequest.source(searchSourceBuilder); return searchRequest; @@ -301,7 +301,7 @@ public SearchRequest getFilterRequest(@Nullable Filter filters, @Nullable SortCr searchSourceBuilder.size(size); ESUtils.setSearchAfter(searchSourceBuilder, sort, pitId, keepAlive); - ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion); + ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, _entitySpecs); searchRequest.source(searchSourceBuilder); return searchRequest; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java index 9a7d9a1b4c4207..53765acb8e29e8 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java @@ -2,6 +2,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.SearchableFieldSpec; +import com.linkedin.metadata.models.annotation.SearchableAnnotation; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterion; import com.linkedin.metadata.query.filter.Criterion; @@ -49,7 +52,28 @@ public class ESUtils { public static final int MAX_RESULT_SIZE = 10000; public static final String OPAQUE_ID_HEADER = "X-Opaque-Id"; public static final String HEADER_VALUE_DELIMITER = "|"; - public static final String KEYWORD_TYPE = "keyword"; + + // Field types + public static final String KEYWORD_FIELD_TYPE = "keyword"; + public static final String BOOLEAN_FIELD_TYPE = "boolean"; + public static final String DATE_FIELD_TYPE = "date"; + public static final String DOUBLE_FIELD_TYPE = "double"; + public static final String LONG_FIELD_TYPE = "long"; + public static final String OBJECT_FIELD_TYPE = "object"; + public static final String TEXT_FIELD_TYPE = "text"; + public static final String TOKEN_COUNT_FIELD_TYPE = "token_count"; + // End of field types + + public static final Set FIELD_TYPES_STORED_AS_KEYWORD = Set.of( + SearchableAnnotation.FieldType.KEYWORD, + SearchableAnnotation.FieldType.TEXT, + SearchableAnnotation.FieldType.TEXT_PARTIAL, + SearchableAnnotation.FieldType.WORD_GRAM); + public static final Set FIELD_TYPES_STORED_AS_TEXT = Set.of( + SearchableAnnotation.FieldType.BROWSE_PATH, + SearchableAnnotation.FieldType.BROWSE_PATH_V2, + SearchableAnnotation.FieldType.URN, + SearchableAnnotation.FieldType.URN_PARTIAL); public static final String ENTITY_NAME_FIELD = "_entityName"; public static final String NAME_SUGGESTION = "nameSuggestion"; @@ -174,6 +198,25 @@ public static QueryBuilder getQueryBuilderFromCriterion(@Nonnull final Criterion return getQueryBuilderFromCriterionForSingleField(criterion, isTimeseries); } + public static String getElasticTypeForFieldType(SearchableAnnotation.FieldType fieldType) { + if (FIELD_TYPES_STORED_AS_KEYWORD.contains(fieldType)) { + return KEYWORD_FIELD_TYPE; + } else if (FIELD_TYPES_STORED_AS_TEXT.contains(fieldType)) { + return TEXT_FIELD_TYPE; + } else if (fieldType == SearchableAnnotation.FieldType.BOOLEAN) { + return BOOLEAN_FIELD_TYPE; + } else if (fieldType == SearchableAnnotation.FieldType.COUNT) { + return LONG_FIELD_TYPE; + } else if (fieldType == SearchableAnnotation.FieldType.DATETIME) { + return DATE_FIELD_TYPE; + } else if (fieldType == SearchableAnnotation.FieldType.OBJECT) { + return OBJECT_FIELD_TYPE; + } else { + log.warn("FieldType {} has no mappings implemented", fieldType); + return null; + } + } + /** * Populates source field of search query with the sort order as per the criterion provided. * @@ -189,14 +232,39 @@ public static QueryBuilder getQueryBuilderFromCriterion(@Nonnull final Criterion * @param sortCriterion {@link SortCriterion} to be applied to the search results */ public static void buildSortOrder(@Nonnull SearchSourceBuilder searchSourceBuilder, - @Nullable SortCriterion sortCriterion) { + @Nullable SortCriterion sortCriterion, List entitySpecs) { if (sortCriterion == null) { searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); } else { + Optional fieldTypeForDefault = Optional.empty(); + for (EntitySpec entitySpec : entitySpecs) { + List fieldSpecs = entitySpec.getSearchableFieldSpecs(); + for (SearchableFieldSpec fieldSpec : fieldSpecs) { + SearchableAnnotation annotation = fieldSpec.getSearchableAnnotation(); + if (annotation.getFieldName().equals(sortCriterion.getField()) + || annotation.getFieldNameAliases().contains(sortCriterion.getField())) { + fieldTypeForDefault = Optional.of(fieldSpec.getSearchableAnnotation().getFieldType()); + break; + } + } + if (fieldTypeForDefault.isPresent()) { + break; + } + } + if (fieldTypeForDefault.isEmpty()) { + log.warn("Sort criterion field " + sortCriterion.getField() + " was not found in any entity spec to be searched"); + } final SortOrder esSortOrder = (sortCriterion.getOrder() == com.linkedin.metadata.query.filter.SortOrder.ASCENDING) ? SortOrder.ASC : SortOrder.DESC; - searchSourceBuilder.sort(new FieldSortBuilder(sortCriterion.getField()).order(esSortOrder).unmappedType(KEYWORD_TYPE)); + FieldSortBuilder sortBuilder = new FieldSortBuilder(sortCriterion.getField()).order(esSortOrder); + if (fieldTypeForDefault.isPresent()) { + String esFieldtype = getElasticTypeForFieldType(fieldTypeForDefault.get()); + if (esFieldtype != null) { + sortBuilder.unmappedType(esFieldtype); + } + } + searchSourceBuilder.sort(sortBuilder); } if (sortCriterion == null || !sortCriterion.getField().equals(DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD)) { searchSourceBuilder.sort(new FieldSortBuilder(DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD).order(SortOrder.ASC)); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java index 16605048102965..69dd5c80bef1d1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/fixtures/SampleDataFixtureTestBase.java @@ -22,12 +22,15 @@ import com.linkedin.metadata.query.filter.Criterion; import com.linkedin.metadata.query.filter.CriterionArray; import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.query.filter.SortOrder; import com.linkedin.metadata.search.AggregationMetadata; import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchResult; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.search.elasticsearch.query.request.SearchFieldConfig; +import com.linkedin.metadata.search.utils.ESUtils; import com.linkedin.r2.RemoteInvocationException; import org.junit.Assert; import org.opensearch.client.RequestOptions; @@ -36,6 +39,9 @@ import org.opensearch.client.indices.AnalyzeResponse; import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.GetMappingsResponse; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.FieldSortBuilder; +import org.opensearch.search.sort.SortBuilder; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.Test; @@ -54,11 +60,7 @@ import static com.linkedin.metadata.Constants.DATA_JOB_ENTITY_NAME; import static com.linkedin.metadata.search.elasticsearch.query.request.SearchQueryBuilder.STRUCTURED_QUERY_PREFIX; import static com.linkedin.metadata.utils.SearchUtil.AGGREGATION_SEPARATOR_CHAR; -import static io.datahubproject.test.search.SearchTestUtils.autocomplete; -import static io.datahubproject.test.search.SearchTestUtils.scroll; -import static io.datahubproject.test.search.SearchTestUtils.search; -import static io.datahubproject.test.search.SearchTestUtils.searchAcrossEntities; -import static io.datahubproject.test.search.SearchTestUtils.searchStructured; +import static io.datahubproject.test.search.SearchTestUtils.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -174,6 +176,48 @@ public void testSearchFieldConfig() throws IOException { } } + @Test + public void testGetSortOrder() { + String dateFieldName = "lastOperationTime"; + List entityNamesToTestSearch = List.of("dataset", "chart", "corpgroup"); + List entitySpecs = entityNamesToTestSearch.stream().map( + name -> getEntityRegistry().getEntitySpec(name)) + .collect(Collectors.toList()); + SearchSourceBuilder builder = new SearchSourceBuilder(); + SortCriterion sortCriterion = new SortCriterion().setOrder(SortOrder.DESCENDING).setField(dateFieldName); + ESUtils.buildSortOrder(builder, sortCriterion, entitySpecs); + List> sorts = builder.sorts(); + assertEquals(sorts.size(), 2); // sort by last modified and then by urn + for (SortBuilder sort : sorts) { + assertTrue(sort instanceof FieldSortBuilder); + FieldSortBuilder fieldSortBuilder = (FieldSortBuilder) sort; + if (fieldSortBuilder.getFieldName().equals(dateFieldName)) { + assertEquals(fieldSortBuilder.order(), org.opensearch.search.sort.SortOrder.DESC); + assertEquals(fieldSortBuilder.unmappedType(), "date"); + } else { + assertEquals(fieldSortBuilder.getFieldName(), "urn"); + } + } + + // Test alias field + String entityNameField = "_entityName"; + SearchSourceBuilder nameBuilder = new SearchSourceBuilder(); + SortCriterion nameCriterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField(entityNameField); + ESUtils.buildSortOrder(nameBuilder, nameCriterion, entitySpecs); + sorts = nameBuilder.sorts(); + assertEquals(sorts.size(), 2); + for (SortBuilder sort : sorts) { + assertTrue(sort instanceof FieldSortBuilder); + FieldSortBuilder fieldSortBuilder = (FieldSortBuilder) sort; + if (fieldSortBuilder.getFieldName().equals(entityNameField)) { + assertEquals(fieldSortBuilder.order(), org.opensearch.search.sort.SortOrder.ASC); + assertEquals(fieldSortBuilder.unmappedType(), "keyword"); + } else { + assertEquals(fieldSortBuilder.getFieldName(), "urn"); + } + } + } + @Test public void testDatasetHasTags() throws IOException { GetMappingsRequest req = new GetMappingsRequest() @@ -1454,6 +1498,16 @@ public void testColumnExactMatch() { "Expected table with column name exact match first"); } + @Test + public void testSortOrdering() { + String query = "unit_data"; + SortCriterion criterion = new SortCriterion().setOrder(SortOrder.ASCENDING).setField("lastOperationTime"); + SearchResult result = getSearchService().searchAcrossEntities(SEARCHABLE_ENTITIES, query, null, criterion, 0, + 100, new SearchFlags().setFulltext(true).setSkipCache(true), null); + assertTrue(result.getEntities().size() > 2, + String.format("%s - Expected search results to have at least two results", query)); + } + private Stream getTokens(AnalyzeRequest request) throws IOException { return getSearchClient().indices().analyze(request, RequestOptions.DEFAULT).getTokens().stream(); } From 7855fb60a7e96e6d04d8d96f7505f8b4dd62a7c4 Mon Sep 17 00:00:00 2001 From: Indy Prentice Date: Wed, 18 Oct 2023 17:19:10 -0300 Subject: [PATCH 7/9] fix(api): Add preceding / to get index sizes path (#9043) Co-authored-by: Indy Prentice --- .../ElasticSearchTimeseriesAspectService.java | 2 +- .../search/TimeseriesAspectServiceTestBase.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index a496fc427138e9..3e8f83a531b591 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -169,7 +169,7 @@ public List getIndexSizes() { List res = new ArrayList<>(); try { String indicesPattern = _indexConvention.getAllTimeseriesAspectIndicesPattern(); - Response r = _searchClient.getLowLevelClient().performRequest(new Request("GET", indicesPattern + "/_stats")); + Response r = _searchClient.getLowLevelClient().performRequest(new Request("GET", "/" + indicesPattern + "/_stats")); JsonNode body = new ObjectMapper().readTree(r.getEntity().getContent()); body.get("indices").fields().forEachRemaining(entry -> { TimeseriesIndexSizeResult elemResult = new TimeseriesIndexSizeResult(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java index cc60ba8679e1f0..f9b8f84b10ad20 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java @@ -45,6 +45,7 @@ import com.linkedin.timeseries.GroupingBucket; import com.linkedin.timeseries.GroupingBucketType; import com.linkedin.timeseries.TimeWindowSize; +import com.linkedin.timeseries.TimeseriesIndexSizeResult; import org.opensearch.client.RestHighLevelClient; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.BeforeClass; @@ -884,4 +885,19 @@ public void testCountByFilterAfterDelete() throws InterruptedException { _elasticSearchTimeseriesAspectService.countByFilter(ENTITY_NAME, ASPECT_NAME, urnAndTimeFilter); assertEquals(count, 0L); } + + @Test(groups = {"getAggregatedStats"}, dependsOnGroups = {"upsert"}) + public void testGetIndexSizes() { + List result = _elasticSearchTimeseriesAspectService.getIndexSizes(); + /* + Example result: + {aspectName=testentityprofile, sizeMb=52.234, indexName=es_timeseries_aspect_service_test_testentity_testentityprofileaspect_v1, entityName=testentity} + {aspectName=testentityprofile, sizeMb=0.208, indexName=es_timeseries_aspect_service_test_testentitywithouttests_testentityprofileaspect_v1, entityName=testentitywithouttests} + */ + // There may be other indices in there from other tests, so just make sure that index for entity + aspect is in there + assertTrue(result.size() > 1); + assertTrue( + result.stream().anyMatch(idxSizeResult -> idxSizeResult.getIndexName().equals( + "es_timeseries_aspect_service_test_testentitywithouttests_testentityprofileaspect_v1"))); + } } From 409f981fd3e12a1d470a79cb091ac92e1a4a2c46 Mon Sep 17 00:00:00 2001 From: Indy Prentice Date: Wed, 18 Oct 2023 18:25:54 -0300 Subject: [PATCH 8/9] fix(search): Apply SearchFlags passed in through to scroll queries (#9041) Co-authored-by: Indy Prentice --- .../client/CachingEntitySearchService.java | 13 ++++++---- .../elasticsearch/ElasticSearchService.java | 13 ++++++---- .../query/request/SearchRequestHandler.java | 4 +++- .../search/LineageServiceTestBase.java | 16 ++++++++++--- .../request/SearchRequestHandlerTest.java | 24 +++++++++++++++++++ .../metadata/search/EntitySearchService.java | 6 +++-- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java index 13a7d16b723a78..ceaf37a1289d99 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java @@ -256,13 +256,13 @@ public ScrollResult getCachedScrollResults( cacheAccess.stop(); if (result == null) { Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "scroll_cache_miss").time(); - result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText); + result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText, flags); cache.put(cacheKey, toJsonString(result)); cacheMiss.stop(); MetricUtils.counter(this.getClass(), "scroll_cache_miss_count").inc(); } } else { - result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText); + result = getRawScrollResults(entities, query, filters, sortCriterion, scrollId, keepAlive, size, isFullText, flags); } return result; } @@ -328,7 +328,8 @@ private ScrollResult getRawScrollResults( @Nullable final String scrollId, @Nullable final String keepAlive, final int count, - final boolean fulltext) { + final boolean fulltext, + @Nullable final SearchFlags searchFlags) { if (fulltext) { return entitySearchService.fullTextScroll( entities, @@ -337,7 +338,8 @@ private ScrollResult getRawScrollResults( sortCriterion, scrollId, keepAlive, - count); + count, + searchFlags); } else { return entitySearchService.structuredScroll(entities, input, @@ -345,7 +347,8 @@ private ScrollResult getRawScrollResults( sortCriterion, scrollId, keepAlive, - count); + count, + searchFlags); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index ef5a555e95ba89..024cf2b0abec23 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -175,23 +175,26 @@ public List getBrowsePaths(@Nonnull String entityName, @Nonnull Urn urn) @Nonnull @Override public ScrollResult fullTextScroll(@Nonnull List entities, @Nonnull String input, @Nullable Filter postFilters, - @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size) { + @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags) { log.debug(String.format( "Scrolling Structured Search documents entities: %s, input: %s, postFilters: %s, sortCriterion: %s, scrollId: %s, size: %s", entities, input, postFilters, sortCriterion, scrollId, size)); + SearchFlags flags = Optional.ofNullable(searchFlags).orElse(new SearchFlags()); + flags.setFulltext(true); return esSearchDAO.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size, - new SearchFlags().setFulltext(true)); + flags); } @Nonnull @Override public ScrollResult structuredScroll(@Nonnull List entities, @Nonnull String input, @Nullable Filter postFilters, - @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size) { + @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nullable String keepAlive, int size, @Nullable SearchFlags searchFlags) { log.debug(String.format( "Scrolling FullText Search documents entities: %s, input: %s, postFilters: %s, sortCriterion: %s, scrollId: %s, size: %s", entities, input, postFilters, sortCriterion, scrollId, size)); - return esSearchDAO.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size, - new SearchFlags().setFulltext(false)); + SearchFlags flags = Optional.ofNullable(searchFlags).orElse(new SearchFlags()); + flags.setFulltext(false); + return esSearchDAO.scroll(entities, input, postFilters, sortCriterion, scrollId, keepAlive, size, flags); } public Optional raw(@Nonnull String indexName, @Nullable String jsonQuery) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java index c06907e800d5e7..49571a60d5f211 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java @@ -242,7 +242,9 @@ public SearchRequest getSearchRequest(@Nonnull String input, @Nullable Filter fi BoolQueryBuilder filterQuery = getFilterQuery(filter); searchSourceBuilder.query(QueryBuilders.boolQuery().must(getQuery(input, finalSearchFlags.isFulltext())).filter(filterQuery)); _aggregationQueryBuilder.getAggregations().forEach(searchSourceBuilder::aggregation); - searchSourceBuilder.highlighter(getHighlights()); + if (!finalSearchFlags.isSkipHighlighting()) { + searchSourceBuilder.highlighter(_highlights); + } ESUtils.buildSortOrder(searchSourceBuilder, sortCriterion, _entitySpecs); searchRequest.source(searchSourceBuilder); log.debug("Search request is: " + searchRequest); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java index 461a146022446c..696e3b62834bdb 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java @@ -47,8 +47,10 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import org.junit.Assert; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.action.search.SearchRequest; import org.springframework.cache.CacheManager; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; @@ -108,6 +110,7 @@ abstract public class LineageServiceTestBase extends AbstractTestNGSpringContext private GraphService _graphService; private CacheManager _cacheManager; private LineageSearchService _lineageSearchService; + private RestHighLevelClient _searchClientSpy; private static final String ENTITY_NAME = "testEntity"; private static final Urn TEST_URN = TestEntityUtil.getTestEntityUrn(); @@ -162,10 +165,11 @@ private ElasticSearchService buildEntitySearchService() { EntityIndexBuilders indexBuilders = new EntityIndexBuilders(getIndexBuilder(), _entityRegistry, _indexConvention, _settingsBuilder); - ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, getSearchClient(), _indexConvention, false, + _searchClientSpy = spy(getSearchClient()); + ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, _searchClientSpy, _indexConvention, false, ELASTICSEARCH_IMPLEMENTATION_ELASTICSEARCH, getSearchConfiguration(), null); - ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, getSearchClient(), _indexConvention, getSearchConfiguration(), getCustomSearchConfiguration()); - ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, getSearchClient(), _indexConvention, getBulkProcessor(), 1); + ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, _searchClientSpy, _indexConvention, getSearchConfiguration(), getCustomSearchConfiguration()); + ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, _searchClientSpy, _indexConvention, getBulkProcessor(), 1); return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO); } @@ -246,9 +250,15 @@ public void testSearchService() throws Exception { _elasticSearchService.upsertDocument(ENTITY_NAME, document2.toString(), urn2.toString()); syncAfterWrite(getBulkProcessor()); + Mockito.reset(_searchClientSpy); searchResult = searchAcrossLineage(null, TEST1); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0).getEntity(), urn); + // Verify that highlighting was turned off in the query + ArgumentCaptor searchRequestCaptor = ArgumentCaptor.forClass(SearchRequest.class); + Mockito.verify(_searchClientSpy, times(1)).search(searchRequestCaptor.capture(), any()); + SearchRequest capturedRequest = searchRequestCaptor.getValue(); + assertNull(capturedRequest.source().highlighter()); clearCache(false); when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java index 90c6c523c588ff..0ea035a10f91da 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java @@ -97,6 +97,30 @@ public void testDatasetFieldsAndHighlights() { ), "unexpected lineage fields in highlights: " + highlightFields); } + @Test + public void testSearchRequestHandlerHighlightingTurnedOff() { + SearchRequestHandler requestHandler = SearchRequestHandler.getBuilder(TestEntitySpecBuilder.getSpec(), testQueryConfig, null); + SearchRequest searchRequest = requestHandler.getSearchRequest("testQuery", null, null, 0, + 10, new SearchFlags().setFulltext(false).setSkipHighlighting(true), null); + SearchSourceBuilder sourceBuilder = searchRequest.source(); + assertEquals(sourceBuilder.from(), 0); + assertEquals(sourceBuilder.size(), 10); + // Filters + Collection aggBuilders = sourceBuilder.aggregations().getAggregatorFactories(); + // Expect 2 aggregations: textFieldOverride and _index + assertEquals(aggBuilders.size(), 2); + for (AggregationBuilder aggBuilder : aggBuilders) { + if (aggBuilder.getName().equals("textFieldOverride")) { + TermsAggregationBuilder filterPanelBuilder = (TermsAggregationBuilder) aggBuilder; + assertEquals(filterPanelBuilder.field(), "textFieldOverride.keyword"); + } else if (!aggBuilder.getName().equals("_entityType")) { + fail("Found unexepected aggregation: " + aggBuilder.getName()); + } + } + // Highlights should not be present + assertNull(sourceBuilder.highlighter()); + } + @Test public void testSearchRequestHandler() { SearchRequestHandler requestHandler = SearchRequestHandler.getBuilder(TestEntitySpecBuilder.getSpec(), testQueryConfig, null); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index a46b58aabfb0b2..64f59780b887f3 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -188,11 +188,12 @@ BrowseResult browse(@Nonnull String entityName, @Nonnull String path, @Nullable * @param sortCriterion {@link SortCriterion} to be applied to search results * @param scrollId opaque scroll identifier to pass to search service * @param size the number of search hits to return + * @param searchFlags flags controlling search options * @return a {@link ScrollResult} that contains a list of matched documents and related search result metadata */ @Nonnull ScrollResult fullTextScroll(@Nonnull List entities, @Nonnull String input, @Nullable Filter postFilters, - @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size); + @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags); /** * Gets a list of documents that match given search request. The results are aggregated and filters are applied to the @@ -204,11 +205,12 @@ ScrollResult fullTextScroll(@Nonnull List entities, @Nonnull String inpu * @param sortCriterion {@link SortCriterion} to be applied to search results * @param scrollId opaque scroll identifier to pass to search service * @param size the number of search hits to return + * @param searchFlags flags controlling search options * @return a {@link ScrollResult} that contains a list of matched documents and related search result metadata */ @Nonnull ScrollResult structuredScroll(@Nonnull List entities, @Nonnull String input, @Nullable Filter postFilters, - @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size); + @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable SearchFlags searchFlags); /** * Max result size returned by the underlying search backend From 269c4eac7ef09d73224050e432bfbf60727e4d65 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Thu, 19 Oct 2023 01:43:05 +0100 Subject: [PATCH 9/9] fix(ownership): Corrects validation of ownership type and makes it consistent across graphQL calls (#9044) Co-authored-by: Ellie O'Neil --- .../resolvers/mutate/AddOwnerResolver.java | 27 ++- .../resolvers/mutate/AddOwnersResolver.java | 2 +- .../mutate/BatchAddOwnersResolver.java | 3 +- .../resolvers/mutate/util/OwnerUtils.java | 65 +++----- .../owner/AddOwnersResolverTest.java | 157 ++++++++++++++++-- 5 files changed, 183 insertions(+), 71 deletions(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnerResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnerResolver.java index 5ca7007d98e43c..3f2dab0a5ba711 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnerResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnerResolver.java @@ -2,14 +2,11 @@ import com.google.common.collect.ImmutableList; import com.linkedin.common.urn.CorpuserUrn; - import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.AuthorizationException; import com.linkedin.datahub.graphql.generated.AddOwnerInput; -import com.linkedin.datahub.graphql.generated.OwnerEntityType; import com.linkedin.datahub.graphql.generated.OwnerInput; -import com.linkedin.datahub.graphql.generated.OwnershipType; import com.linkedin.datahub.graphql.generated.ResourceRefInput; import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils; import com.linkedin.metadata.entity.EntityService; @@ -20,7 +17,6 @@ import lombok.extern.slf4j.Slf4j; import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*; -import static com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils.*; @Slf4j @@ -32,30 +28,33 @@ public class AddOwnerResolver implements DataFetcher> @Override public CompletableFuture get(DataFetchingEnvironment environment) throws Exception { final AddOwnerInput input = bindArgument(environment.getArgument("input"), AddOwnerInput.class); - Urn ownerUrn = Urn.createFromString(input.getOwnerUrn()); - OwnerEntityType ownerEntityType = input.getOwnerEntityType(); - OwnershipType type = input.getType() == null ? OwnershipType.NONE : input.getType(); - String ownershipUrn = input.getOwnershipTypeUrn() == null ? mapOwnershipTypeToEntity(type.name()) : input.getOwnershipTypeUrn(); Urn targetUrn = Urn.createFromString(input.getResourceUrn()); + OwnerInput.Builder ownerInputBuilder = OwnerInput.builder(); + ownerInputBuilder.setOwnerUrn(input.getOwnerUrn()); + ownerInputBuilder.setOwnerEntityType(input.getOwnerEntityType()); + if (input.getType() != null) { + ownerInputBuilder.setType(input.getType()); + } + if (input.getOwnershipTypeUrn() != null) { + ownerInputBuilder.setOwnershipTypeUrn(input.getOwnershipTypeUrn()); + } + OwnerInput ownerInput = ownerInputBuilder.build(); if (!OwnerUtils.isAuthorizedToUpdateOwners(environment.getContext(), targetUrn)) { throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator."); } return CompletableFuture.supplyAsync(() -> { - OwnerUtils.validateAddInput( - ownerUrn, input.getOwnershipTypeUrn(), ownerEntityType, - targetUrn, - _entityService - ); + OwnerUtils.validateAddOwnerInput(ownerInput, ownerUrn, _entityService); + try { log.debug("Adding Owner. input: {}", input); Urn actor = CorpuserUrn.createFromString(((QueryContext) environment.getContext()).getActorUrn()); OwnerUtils.addOwnersToResources( - ImmutableList.of(new OwnerInput(input.getOwnerUrn(), ownerEntityType, type, ownershipUrn)), + ImmutableList.of(ownerInput), ImmutableList.of(new ResourceRefInput(input.getResourceUrn(), null, null)), actor, _entityService diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnersResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnersResolver.java index 06424efa83819f..4e5b5bdb2a651d 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnersResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/AddOwnersResolver.java @@ -39,7 +39,7 @@ public CompletableFuture get(DataFetchingEnvironment environment) throw throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator."); } - OwnerUtils.validateAddInput( + OwnerUtils.validateAddOwnerInput( owners, targetUrn, _entityService diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/BatchAddOwnersResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/BatchAddOwnersResolver.java index 019c044d81ab32..5beaeecae673f0 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/BatchAddOwnersResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/BatchAddOwnersResolver.java @@ -53,8 +53,7 @@ public CompletableFuture get(DataFetchingEnvironment environment) throw private void validateOwners(List owners) { for (OwnerInput ownerInput : owners) { - OwnerUtils.validateOwner(UrnUtils.getUrn(ownerInput.getOwnerUrn()), ownerInput.getOwnerEntityType(), - UrnUtils.getUrn(ownerInput.getOwnershipTypeUrn()), _entityService); + OwnerUtils.validateOwner(ownerInput, _entityService); } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java index d2f7f896e59532..72339958044231 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java @@ -50,7 +50,7 @@ public static void addOwnersToResources( ) { final List changes = new ArrayList<>(); for (ResourceRefInput resource : resources) { - changes.add(buildAddOwnersProposal(owners, UrnUtils.getUrn(resource.getResourceUrn()), actor, entityService)); + changes.add(buildAddOwnersProposal(owners, UrnUtils.getUrn(resource.getResourceUrn()), entityService)); } EntityUtils.ingestChangeProposals(changes, entityService, actor, false); } @@ -69,7 +69,7 @@ public static void removeOwnersFromResources( } - private static MetadataChangeProposal buildAddOwnersProposal(List owners, Urn resourceUrn, Urn actor, EntityService entityService) { + static MetadataChangeProposal buildAddOwnersProposal(List owners, Urn resourceUrn, EntityService entityService) { Ownership ownershipAspect = (Ownership) EntityUtils.getAspectFromEntity( resourceUrn.toString(), Constants.OWNERSHIP_ASPECT_NAME, entityService, @@ -181,18 +181,13 @@ public static boolean isAuthorizedToUpdateOwners(@Nonnull QueryContext context, orPrivilegeGroups); } - public static Boolean validateAddInput( + public static Boolean validateAddOwnerInput( List owners, Urn resourceUrn, EntityService entityService ) { for (OwnerInput owner : owners) { - boolean result = validateAddInput( - UrnUtils.getUrn(owner.getOwnerUrn()), - owner.getOwnershipTypeUrn(), - owner.getOwnerEntityType(), - resourceUrn, - entityService); + boolean result = validateAddOwnerInput(owner, resourceUrn, entityService); if (!result) { return false; } @@ -200,44 +195,29 @@ public static Boolean validateAddInput( return true; } - public static Boolean validateAddInput( - Urn ownerUrn, - String ownershipEntityUrn, - OwnerEntityType ownerEntityType, + public static Boolean validateAddOwnerInput( + OwnerInput owner, Urn resourceUrn, EntityService entityService ) { - if (OwnerEntityType.CORP_GROUP.equals(ownerEntityType) && !Constants.CORP_GROUP_ENTITY_NAME.equals(ownerUrn.getEntityType())) { - throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Expected a corp group urn.", resourceUrn)); - } - - if (OwnerEntityType.CORP_USER.equals(ownerEntityType) && !Constants.CORP_USER_ENTITY_NAME.equals(ownerUrn.getEntityType())) { - throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Expected a corp user urn.", resourceUrn)); - } - if (!entityService.exists(resourceUrn)) { throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Resource does not exist.", resourceUrn)); } - if (!entityService.exists(ownerUrn)) { - throw new IllegalArgumentException(String.format("Failed to change ownership for resource %s. Owner %s does not exist.", resourceUrn, ownerUrn)); - } - - if (ownershipEntityUrn != null && !entityService.exists(UrnUtils.getUrn(ownershipEntityUrn))) { - throw new IllegalArgumentException(String.format("Failed to change ownership type for resource %s. Ownership Type " - + "%s does not exist.", resourceUrn, ownershipEntityUrn)); - } + validateOwner(owner, entityService); return true; } public static void validateOwner( - Urn ownerUrn, - OwnerEntityType ownerEntityType, - Urn ownershipEntityUrn, + OwnerInput owner, EntityService entityService ) { + + OwnerEntityType ownerEntityType = owner.getOwnerEntityType(); + Urn ownerUrn = UrnUtils.getUrn(owner.getOwnerUrn()); + if (OwnerEntityType.CORP_GROUP.equals(ownerEntityType) && !Constants.CORP_GROUP_ENTITY_NAME.equals(ownerUrn.getEntityType())) { throw new IllegalArgumentException( String.format("Failed to change ownership for resource(s). Expected a corp group urn, found %s", ownerUrn)); @@ -252,9 +232,14 @@ public static void validateOwner( throw new IllegalArgumentException(String.format("Failed to change ownership for resource(s). Owner with urn %s does not exist.", ownerUrn)); } - if (!entityService.exists(ownershipEntityUrn)) { - throw new IllegalArgumentException(String.format("Failed to change ownership for resource(s). Ownership type with " - + "urn %s does not exist.", ownershipEntityUrn)); + if (owner.getOwnershipTypeUrn() != null && !entityService.exists(UrnUtils.getUrn(owner.getOwnershipTypeUrn()))) { + throw new IllegalArgumentException(String.format("Failed to change ownership for resource(s). Custom Ownership type with " + + "urn %s does not exist.", owner.getOwnershipTypeUrn())); + } + + if (owner.getType() == null && owner.getOwnershipTypeUrn() == null) { + throw new IllegalArgumentException("Failed to change ownership for resource(s). Expected either " + + "type or ownershipTypeUrn to be specified."); } } @@ -269,11 +254,11 @@ public static Boolean validateRemoveInput( } public static void addCreatorAsOwner( - QueryContext context, - String urn, - OwnerEntityType ownerEntityType, - OwnershipType ownershipType, - EntityService entityService) { + QueryContext context, + String urn, + OwnerEntityType ownerEntityType, + OwnershipType ownershipType, + EntityService entityService) { try { Urn actorUrn = CorpuserUrn.createFromString(context.getActorUrn()); String ownershipTypeUrn = mapOwnershipTypeToEntity(ownershipType.name()); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java index efc0c5dfcf36d8..329d71ec125db0 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java @@ -2,6 +2,11 @@ import com.google.common.collect.ImmutableList; import com.linkedin.common.AuditStamp; +import com.linkedin.common.Owner; +import com.linkedin.common.OwnerArray; +import com.linkedin.common.Ownership; +import com.linkedin.common.OwnershipSource; +import com.linkedin.common.OwnershipSourceType; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.datahub.graphql.QueryContext; @@ -28,6 +33,7 @@ public class AddOwnersResolverTest { private static final String TEST_ENTITY_URN = "urn:li:dataset:(urn:li:dataPlatform:mysql,my-test,PROD)"; private static final String TEST_OWNER_1_URN = "urn:li:corpuser:test-id-1"; private static final String TEST_OWNER_2_URN = "urn:li:corpuser:test-id-2"; + private static final String TEST_OWNER_3_URN = "urn:li:corpGroup:test-id-3"; @Test public void testGetSuccessNoExistingOwners() throws Exception { @@ -75,33 +81,41 @@ public void testGetSuccessNoExistingOwners() throws Exception { } @Test - public void testGetSuccessExistingOwners() throws Exception { + public void testGetSuccessExistingOwnerNewType() throws Exception { EntityService mockService = getMockEntityService(); + com.linkedin.common.Ownership oldOwnership = new Ownership().setOwners(new OwnerArray( + ImmutableList.of(new Owner() + .setOwner(UrnUtils.getUrn(TEST_OWNER_1_URN)) + .setType(com.linkedin.common.OwnershipType.NONE) + .setSource(new OwnershipSource().setType(OwnershipSourceType.MANUAL)) + ))); + Mockito.when(mockService.getAspect( - Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN)), - Mockito.eq(Constants.OWNERSHIP_ASPECT_NAME), - Mockito.eq(0L))) - .thenReturn(null); + Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN)), + Mockito.eq(Constants.OWNERSHIP_ASPECT_NAME), + Mockito.eq(0L))) + .thenReturn(oldOwnership); Mockito.when(mockService.exists(Urn.createFromString(TEST_ENTITY_URN))).thenReturn(true); Mockito.when(mockService.exists(Urn.createFromString(TEST_OWNER_1_URN))).thenReturn(true); - Mockito.when(mockService.exists(Urn.createFromString(TEST_OWNER_2_URN))).thenReturn(true); Mockito.when(mockService.exists(Urn.createFromString( - OwnerUtils.mapOwnershipTypeToEntity(com.linkedin.datahub.graphql.generated.OwnershipType.TECHNICAL_OWNER.name())))) - .thenReturn(true); + OwnerUtils.mapOwnershipTypeToEntity(com.linkedin.datahub.graphql.generated.OwnershipType.TECHNICAL_OWNER.name())))) + .thenReturn(true); AddOwnersResolver resolver = new AddOwnersResolver(mockService); // Execute resolver QueryContext mockContext = getMockAllowContext(); DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + AddOwnersInput input = new AddOwnersInput(ImmutableList.of( - new OwnerInput(TEST_OWNER_1_URN, OwnerEntityType.CORP_USER, OwnershipType.TECHNICAL_OWNER, - OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.TECHNICAL_OWNER.name())), - new OwnerInput(TEST_OWNER_2_URN, OwnerEntityType.CORP_USER, OwnershipType.TECHNICAL_OWNER, - OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.TECHNICAL_OWNER.name())) + OwnerInput.builder() + .setOwnerUrn(TEST_OWNER_1_URN) + .setOwnershipTypeUrn(OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.TECHNICAL_OWNER.name())) + .setOwnerEntityType(OwnerEntityType.CORP_USER) + .build() ), TEST_ENTITY_URN); Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input); Mockito.when(mockEnv.getContext()).thenReturn(mockContext); @@ -111,11 +125,126 @@ public void testGetSuccessExistingOwners() throws Exception { verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( - Mockito.eq(Urn.createFromString(TEST_OWNER_1_URN)) + Mockito.eq(Urn.createFromString(TEST_OWNER_1_URN)) ); + } + + @Test + public void testGetSuccessDeprecatedTypeToOwnershipType() throws Exception { + EntityService mockService = getMockEntityService(); + + com.linkedin.common.Ownership oldOwnership = new Ownership().setOwners(new OwnerArray( + ImmutableList.of(new Owner() + .setOwner(UrnUtils.getUrn(TEST_OWNER_1_URN)) + .setType(com.linkedin.common.OwnershipType.TECHNICAL_OWNER) + .setSource(new OwnershipSource().setType(OwnershipSourceType.MANUAL)) + ))); + + Mockito.when(mockService.getAspect( + Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN)), + Mockito.eq(Constants.OWNERSHIP_ASPECT_NAME), + Mockito.eq(0L))) + .thenReturn(oldOwnership); + + Mockito.when(mockService.exists(Urn.createFromString(TEST_ENTITY_URN))).thenReturn(true); + Mockito.when(mockService.exists(Urn.createFromString(TEST_OWNER_1_URN))).thenReturn(true); + + Mockito.when(mockService.exists(Urn.createFromString( + OwnerUtils.mapOwnershipTypeToEntity(com.linkedin.datahub.graphql.generated.OwnershipType.TECHNICAL_OWNER.name())))) + .thenReturn(true); + + AddOwnersResolver resolver = new AddOwnersResolver(mockService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + + AddOwnersInput input = new AddOwnersInput(ImmutableList.of(OwnerInput.builder() + .setOwnerUrn(TEST_OWNER_1_URN) + .setOwnershipTypeUrn(OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.TECHNICAL_OWNER.name())) + .setOwnerEntityType(OwnerEntityType.CORP_USER) + .build() + ), TEST_ENTITY_URN); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + assertTrue(resolver.get(mockEnv).get()); + + // Unable to easily validate exact payload due to the injected timestamp + verifyIngestProposal(mockService, 1); Mockito.verify(mockService, Mockito.times(1)).exists( - Mockito.eq(Urn.createFromString(TEST_OWNER_2_URN)) + Mockito.eq(Urn.createFromString(TEST_OWNER_1_URN)) + ); + } + + @Test + public void testGetSuccessMultipleOwnerTypes() throws Exception { + EntityService mockService = getMockEntityService(); + + com.linkedin.common.Ownership oldOwnership = new Ownership().setOwners(new OwnerArray( + ImmutableList.of(new Owner() + .setOwner(UrnUtils.getUrn(TEST_OWNER_1_URN)) + .setType(com.linkedin.common.OwnershipType.NONE) + .setSource(new OwnershipSource().setType(OwnershipSourceType.MANUAL)) + ))); + + Mockito.when(mockService.getAspect( + Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN)), + Mockito.eq(Constants.OWNERSHIP_ASPECT_NAME), + Mockito.eq(0L))) + .thenReturn(oldOwnership); + + Mockito.when(mockService.exists(Urn.createFromString(TEST_ENTITY_URN))).thenReturn(true); + Mockito.when(mockService.exists(Urn.createFromString(TEST_OWNER_1_URN))).thenReturn(true); + Mockito.when(mockService.exists(Urn.createFromString(TEST_OWNER_2_URN))).thenReturn(true); + Mockito.when(mockService.exists(Urn.createFromString(TEST_OWNER_3_URN))).thenReturn(true); + + Mockito.when(mockService.exists(Urn.createFromString( + OwnerUtils.mapOwnershipTypeToEntity(com.linkedin.datahub.graphql.generated.OwnershipType.TECHNICAL_OWNER.name())))) + .thenReturn(true); + Mockito.when(mockService.exists(Urn.createFromString( + OwnerUtils.mapOwnershipTypeToEntity(com.linkedin.datahub.graphql.generated.OwnershipType.BUSINESS_OWNER.name())))) + .thenReturn(true); + + AddOwnersResolver resolver = new AddOwnersResolver(mockService); + + // Execute resolver + QueryContext mockContext = getMockAllowContext(); + DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class); + + AddOwnersInput input = new AddOwnersInput(ImmutableList.of(OwnerInput.builder() + .setOwnerUrn(TEST_OWNER_1_URN) + .setOwnershipTypeUrn(OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.TECHNICAL_OWNER.name())) + .setOwnerEntityType(OwnerEntityType.CORP_USER) + .build(), + OwnerInput.builder() + .setOwnerUrn(TEST_OWNER_2_URN) + .setOwnershipTypeUrn(OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.BUSINESS_OWNER.name())) + .setOwnerEntityType(OwnerEntityType.CORP_USER) + .build(), + OwnerInput.builder() + .setOwnerUrn(TEST_OWNER_3_URN) + .setOwnershipTypeUrn(OwnerUtils.mapOwnershipTypeToEntity(OwnershipType.TECHNICAL_OWNER.name())) + .setOwnerEntityType(OwnerEntityType.CORP_GROUP) + .build() + ), TEST_ENTITY_URN); + Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input); + Mockito.when(mockEnv.getContext()).thenReturn(mockContext); + assertTrue(resolver.get(mockEnv).get()); + + // Unable to easily validate exact payload due to the injected timestamp + verifyIngestProposal(mockService, 1); + + Mockito.verify(mockService, Mockito.times(1)).exists( + Mockito.eq(Urn.createFromString(TEST_OWNER_1_URN)) + ); + + Mockito.verify(mockService, Mockito.times(1)).exists( + Mockito.eq(Urn.createFromString(TEST_OWNER_2_URN)) + ); + + Mockito.verify(mockService, Mockito.times(1)).exists( + Mockito.eq(Urn.createFromString(TEST_OWNER_3_URN)) ); }