diff --git a/metadata-ingestion/docs/sources/databricks/README.md b/metadata-ingestion/docs/sources/databricks/README.md index b380a892c22b9..32b0b20c9480b 100644 --- a/metadata-ingestion/docs/sources/databricks/README.md +++ b/metadata-ingestion/docs/sources/databricks/README.md @@ -1,12 +1,13 @@ DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup. -## Databricks Hive +## Databricks Unity Catalog (new) -The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace. +The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have Unity Catalog Enabled Workspace, you can use the `unity-catalog` source (aka `databricks` source, see below for details) to integrate your metadata into DataHub as an alternate to the Hive pathway. This also ingests hive metastore catalog in Databricks and is recommended approach to ingest Databricks ecosystem in DataHub. -## Databricks Unity Catalog (new) +## Databricks Hive (old) + +The alternative way to integrate is via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace. -The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the `unity-catalog` source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway. ## Databricks Spark diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index 2a98dda1c79c5..140698a6c4b10 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -21,6 +21,7 @@ TableProfile, TableReference, ) +from datahub.ingestion.source.unity.report import UnityCatalogReport logger = logging.getLogger(__name__) HIVE_METASTORE = "hive_metastore" @@ -66,9 +67,12 @@ class HiveMetastoreProxy(Closeable): as unity catalog apis do not return details about this legacy metastore. """ - def __init__(self, sqlalchemy_url: str, options: dict) -> None: + def __init__( + self, sqlalchemy_url: str, options: dict, report: UnityCatalogReport + ) -> None: try: self.inspector = HiveMetastoreProxy.get_inspector(sqlalchemy_url, options) + self.report = report except Exception: # This means that there is no `hive_metastore` catalog in databricks workspace # Not tested but seems like the logical conclusion. @@ -100,22 +104,53 @@ def hive_metastore_schemas(self, catalog: Catalog) -> Iterable[Schema]: ) def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]: - views = self.inspector.get_view_names(schema.name) + # NOTE: Ideally, we use `inspector.get_view_names` and `inspector.get_table_names` here instead of + # making show queries in this class however Databricks dialect for databricks-sql-connector<3.0.0 does not + # back-quote schemas with special char such as hyphen. + # Currently, databricks-sql-connector is pinned to <3.0.0 due to requirement of SQLAlchemy > 2.0.21 for + # later versions. + views = self.get_view_names(schema.name) for table_name in views: yield self._get_table(schema, table_name, True) - for table_name in self.inspector.get_table_names(schema.name): + for table_name in self.get_table_names(schema.name): if table_name in views: continue yield self._get_table(schema, table_name, False) + def get_table_names(self, schema_name: str) -> List[str]: + try: + rows = self._execute_sql(f"SHOW TABLES FROM `{schema_name}`") + # 3 columns - database, tableName, isTemporary + return [row.tableName for row in rows] + except Exception as e: + self.report.report_warning( + "Failed to get tables for schema", f"{HIVE_METASTORE}.{schema_name}" + ) + logger.warning( + f"Failed to get tables {schema_name} due to {e}", exc_info=True + ) + return [] + + def get_view_names(self, schema_name: str) -> List[str]: + try: + rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") + # 3 columns - database, tableName, isTemporary + return [row.tableName for row in rows] + except Exception as e: + self.report.report_warning("Failed to get views for schema", schema_name) + logger.warning( + f"Failed to get views {schema_name} due to {e}", exc_info=True + ) + return [] + def _get_table( self, schema: Schema, table_name: str, is_view: bool = False, ) -> Table: - columns = self._get_columns(schema, table_name) + columns = self._get_columns(schema.name, table_name) detailed_info = self._get_table_info(schema.name, table_name) comment = detailed_info.pop("Comment", None) @@ -134,9 +169,9 @@ def _get_table( columns=columns, storage_location=storage_location, data_source_format=datasource_format, - view_definition=self._get_view_definition(schema.name, table_name) - if is_view - else None, + view_definition=( + self._get_view_definition(schema.name, table_name) if is_view else None + ), properties=detailed_info, owner=None, generation=None, @@ -150,61 +185,69 @@ def _get_table( def get_table_profile( self, ref: TableReference, include_column_stats: bool = False - ) -> TableProfile: + ) -> Optional[TableProfile]: columns = self._get_columns( - Schema( - id=ref.schema, - name=ref.schema, - # This is okay, as none of this is used in profiling - catalog=self.hive_metastore_catalog(None), - comment=None, - owner=None, - ), + ref.schema, ref.table, ) detailed_info = self._get_table_info(ref.schema, ref.table) + if not columns and not detailed_info: + return None + table_stats = ( self._get_cached_table_statistics(detailed_info["Statistics"]) if detailed_info.get("Statistics") else {} ) + column_profiles: List[ColumnProfile] = [] + if include_column_stats: + for column in columns: + column_profile = self._get_column_profile(column.name, ref) + if column_profile: + column_profiles.append(column_profile) + return TableProfile( - num_rows=int(table_stats[ROWS]) - if table_stats.get(ROWS) is not None - else None, - total_size=int(table_stats[BYTES]) - if table_stats.get(BYTES) is not None - else None, + num_rows=( + int(table_stats[ROWS]) if table_stats.get(ROWS) is not None else None + ), + total_size=( + int(table_stats[BYTES]) if table_stats.get(BYTES) is not None else None + ), num_columns=len(columns), - column_profiles=[ - self._get_column_profile(column.name, ref) for column in columns - ] - if include_column_stats - else [], + column_profiles=column_profiles, ) - def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile: - - props = self._column_describe_extended(ref.schema, ref.table, column) - col_stats = {} - for prop in props: - col_stats[prop[0]] = prop[1] - return ColumnProfile( - name=column, - null_count=int(col_stats[NUM_NULLS]) - if col_stats.get(NUM_NULLS) is not None - else None, - distinct_count=int(col_stats[DISTINCT_COUNT]) - if col_stats.get(DISTINCT_COUNT) is not None - else None, - min=col_stats.get(MIN), - max=col_stats.get(MAX), - avg_len=col_stats.get(AVG_COL_LEN), - max_len=col_stats.get(MAX_COL_LEN), - version=col_stats.get(VERSION), - ) + def _get_column_profile( + self, column: str, ref: TableReference + ) -> Optional[ColumnProfile]: + try: + props = self._column_describe_extended(ref.schema, ref.table, column) + col_stats = {} + for prop in props: + col_stats[prop[0]] = prop[1] + return ColumnProfile( + name=column, + null_count=( + int(col_stats[NUM_NULLS]) + if col_stats.get(NUM_NULLS) is not None + else None + ), + distinct_count=( + int(col_stats[DISTINCT_COUNT]) + if col_stats.get(DISTINCT_COUNT) is not None + else None + ), + min=col_stats.get(MIN), + max=col_stats.get(MAX), + avg_len=col_stats.get(AVG_COL_LEN), + max_len=col_stats.get(MAX_COL_LEN), + version=col_stats.get(VERSION), + ) + except Exception as e: + logger.debug(f"Failed to get column profile for {ref}.{column} due to {e}") + return None def _get_cached_table_statistics(self, statistics: str) -> dict: # statistics is in format "xx bytes" OR "1382 bytes, 2 rows" @@ -242,9 +285,14 @@ def _get_view_definition(self, schema_name: str, table_name: str) -> Optional[st ) for row in rows: return row[0] - except Exception: + except Exception as e: + self.report.report_warning( + "Failed to get view definition for table", + f"{HIVE_METASTORE}.{schema_name}.{table_name}", + ) logger.debug( - f"Failed to get view definition for {schema_name}.{table_name}" + f"Failed to get view definition for {schema_name}.{table_name} due to {e}", + exc_info=True, ) return None @@ -258,60 +306,81 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType: else: return HiveTableType.UNKNOWN + @lru_cache(maxsize=1) def _get_table_info(self, schema_name: str, table_name: str) -> dict: - rows = self._describe_extended(schema_name, table_name) - - index = rows.index(("# Detailed Table Information", "", "")) - rows = rows[index + 1 :] - # Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375 # Generate properties dictionary. properties = {} - active_heading = None - for col_name, data_type, value in rows: - col_name = col_name.rstrip() - if col_name.startswith("# "): - continue - elif col_name == "" and data_type is None: - active_heading = None - continue - elif col_name != "" and data_type is None: - active_heading = col_name - elif col_name != "" and data_type is not None: - properties[col_name] = data_type.strip() - else: - # col_name == "", data_type is not None - prop_name = "{} {}".format(active_heading, data_type.rstrip()) - properties[prop_name] = value.rstrip() + try: + rows = self._describe_extended(schema_name, table_name) + + index = rows.index(("# Detailed Table Information", "", "")) + rows = rows[index + 1 :] + # Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375 + + active_heading = None + for col_name, data_type, value in rows: + col_name = col_name.rstrip() + if col_name.startswith("# "): + continue + elif col_name == "" and data_type is None: + active_heading = None + continue + elif col_name != "" and data_type is None: + active_heading = col_name + elif col_name != "" and data_type is not None: + properties[col_name] = data_type.strip() + else: + # col_name == "", data_type is not None + prop_name = "{} {}".format(active_heading, data_type.rstrip()) + properties[prop_name] = value.rstrip() + except Exception as e: + self.report.report_warning( + "Failed to get detailed info for table", + f"{HIVE_METASTORE}.{schema_name}.{table_name}", + ) + logger.debug( + f"Failed to get detailed info for table {schema_name}.{table_name} due to {e}", + exc_info=True, + ) return properties - def _get_columns(self, schema: Schema, table_name: str) -> List[Column]: - rows = self._describe_extended(schema.name, table_name) - + @lru_cache(maxsize=1) + def _get_columns(self, schema_name: str, table_name: str) -> List[Column]: columns: List[Column] = [] - for i, row in enumerate(rows): - if i == 0 and row[0].strip() == "col_name": - continue # first row - if row[0].strip() in ( - "", - "# Partition Information", - "# Detailed Table Information", - ): - break - columns.append( - Column( - name=row[0].strip(), - id=f"{schema.id}.{table_name}.{row[0].strip()}", - type_text=row[1].strip(), - type_name=type_map.get(row[1].strip().lower()), - type_scale=None, - type_precision=None, - position=None, - nullable=None, - comment=row[2], + try: + rows = self._describe_extended(schema_name, table_name) + for i, row in enumerate(rows): + if i == 0 and row[0].strip() == "col_name": + continue # first row + if row[0].strip() in ( + "", + "# Partition Information", + "# Detailed Table Information", + ): + break + columns.append( + Column( + name=row[0].strip(), + id=f"{HIVE_METASTORE}.{schema_name}.{table_name}.{row[0].strip()}", + type_text=row[1].strip(), + type_name=type_map.get(row[1].strip().lower()), + type_scale=None, + type_precision=None, + position=None, + nullable=None, + comment=row[2], + ) ) + except Exception as e: + self.report.report_warning( + "Failed to get columns for table", + f"{HIVE_METASTORE}.{schema_name}.{table_name}", + ) + logger.debug( + f"Failed to get columns for table {schema_name}.{table_name} due to {e}", + exc_info=True, ) - return columns @lru_cache(maxsize=1) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 20aa10305fa8f..1e90f3a044f42 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -1,6 +1,7 @@ """ Manage the communication with DataBricks Server and provide equivalent dataclasses for dependent modules """ + import dataclasses import logging from datetime import datetime, timezone @@ -204,16 +205,16 @@ def workspace_notebooks(self) -> Iterable[Notebook]: id=obj.object_id, path=obj.path, language=obj.language, - created_at=datetime.fromtimestamp( - obj.created_at / 1000, tz=timezone.utc - ) - if obj.created_at - else None, - modified_at=datetime.fromtimestamp( - obj.modified_at / 1000, tz=timezone.utc - ) - if obj.modified_at - else None, + created_at=( + datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) + if obj.created_at + else None + ), + modified_at=( + datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc) + if obj.modified_at + else None + ), ) def query_history( @@ -268,12 +269,14 @@ def _query_history( response: dict = self._workspace_client.api_client.do( # type: ignore method, path, body={**body, "filter_by": filter_by.as_dict()} ) - # we use default raw=False in above request, therefore will always get dict + # we use default raw=False(default) in above request, therefore will always get dict while True: if "res" not in response or not response["res"]: return for v in response["res"]: yield QueryInfo.from_dict(v) + if not response.get("next_page_token"): # last page + return response = self._workspace_client.api_client.do( # type: ignore method, path, body={**body, "page_token": response["next_page_token"]} ) @@ -434,22 +437,28 @@ def _create_table( schema=schema, storage_location=obj.storage_location, data_source_format=obj.data_source_format, - columns=list(self._extract_columns(obj.columns, table_id)) - if obj.columns - else [], + columns=( + list(self._extract_columns(obj.columns, table_id)) + if obj.columns + else [] + ), view_definition=obj.view_definition or None, properties=obj.properties or {}, owner=obj.owner, generation=obj.generation, - created_at=datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) - if obj.created_at - else None, + created_at=( + datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) + if obj.created_at + else None + ), created_by=obj.created_by, - updated_at=datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) - if obj.updated_at - else None - if obj.updated_at - else None, + updated_at=( + datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) + if obj.updated_at + else None + if obj.updated_at + else None + ), updated_by=obj.updated_by, table_id=obj.table_id, comment=obj.comment, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py index 5992f103ccac3..5d6d2bec6d2fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py @@ -165,7 +165,7 @@ def _check_analyze_table_statement_status( def _get_table_profile( self, ref: TableReference, include_columns: bool - ) -> TableProfile: + ) -> Optional[TableProfile]: if self.hive_metastore_proxy and ref.catalog == HIVE_METASTORE: return self.hive_metastore_proxy.get_table_profile(ref, include_columns) table_info = self._workspace_client.tables.get(ref.qualified_table_name) @@ -185,12 +185,14 @@ def _create_table_profile( num_rows=self._get_int(table_info, "spark.sql.statistics.numRows"), total_size=self._get_int(table_info, "spark.sql.statistics.totalSize"), num_columns=len(columns_names), - column_profiles=[ - self._create_column_profile(column, table_info) - for column in columns_names - ] - if include_columns - else [], + column_profiles=( + [ + self._create_column_profile(column, table_info) + for column in columns_names + ] + if include_columns + else [] + ), ) def _create_column_profile( @@ -237,12 +239,16 @@ def _raise_if_error( StatementState.CLOSED, ]: raise DatabricksError( - response.status.error.message - if response.status.error and response.status.error.message - else "Unknown Error", - error_code=response.status.error.error_code.value - if response.status.error and response.status.error.error_code - else "Unknown Error Code", + ( + response.status.error.message + if response.status.error and response.status.error.message + else "Unknown Error" + ), + error_code=( + response.status.error.error_code.value + if response.status.error and response.status.error.error_code + else "Unknown Error Code" + ), status=response.status.state.value, context=key, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9a326ec584d21..2008991dad72e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -213,7 +213,9 @@ def init_hive_metastore_proxy(self): if self.config.include_hive_metastore: try: self.hive_metastore_proxy = HiveMetastoreProxy( - self.config.get_sql_alchemy_url(HIVE_METASTORE), self.config.options + self.config.get_sql_alchemy_url(HIVE_METASTORE), + self.config.options, + self.report, ) self.report.hive_metastore_catalog_found = True diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index 56c7334ea90b1..f22e15da45df2 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -1,4 +1,5 @@ import uuid +from collections import namedtuple from unittest import mock from unittest.mock import patch @@ -274,6 +275,9 @@ def register_mock_data(workspace_client): ] +TableEntry = namedtuple("TableEntry", ["database", "tableName", "isTemporary"]) + + def mock_hive_sql(query): if query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` betStatusId": @@ -367,12 +371,24 @@ def mock_hive_sql(query): ("Type", "VIEW", ""), ("Owner", "root", ""), ] + elif query == "DESCRIBE EXTENDED `bronze_kambi`.`delta_error_table`": + raise Exception( + "[DELTA_PATH_DOES_NOT_EXIST] doesn't exist, or is not a Delta table." + ) elif query == "SHOW CREATE TABLE `bronze_kambi`.`view1`": return [ ( "CREATE VIEW `hive_metastore`.`bronze_kambi`.`view1` AS SELECT * FROM `hive_metastore`.`bronze_kambi`.`bet`", ) ] + elif query == "SHOW TABLES FROM `bronze_kambi`": + return [ + TableEntry("bronze_kambi", "bet", False), + TableEntry("bronze_kambi", "delta_error_table", False), + TableEntry("bronze_kambi", "view1", False), + ] + elif query == "SHOW VIEWS FROM `bronze_kambi`": + return [TableEntry("bronze_kambi", "view1", False)] return [] @@ -394,8 +410,6 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock): inspector = mock.MagicMock() inspector.get_schema_names.return_value = ["bronze_kambi"] - inspector.get_view_names.return_value = ["view1"] - inspector.get_table_names.return_value = ["bet", "view1"] get_inspector.return_value = inspector execute_sql.side_effect = mock_hive_sql diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index 1f0193fef6063..f01878fed1353 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -1394,6 +1394,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:21058fb6993a790a4a43727021e52956" + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1417,6 +1433,74 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/name", + "value": "delta_error_table" + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "hive_metastore.bronze_kambi.delta_error_table" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "UNKNOWN" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "hive_metastore.bronze_kambi.delta_error_table" + } + ] + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "hive_metastore.bronze_kambi.delta_error_table", + "platform": "urn:li:dataPlatform:databricks", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1433,6 +1517,24 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1465,6 +1567,31 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:d91b261e5da1bf1434c6318b8c2ac586", + "urn": "urn:li:container:d91b261e5da1bf1434c6318b8c2ac586" + }, + { + "id": "urn:li:container:21058fb6993a790a4a43727021e52956", + "urn": "urn:li:container:21058fb6993a790a4a43727021e52956" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -2402,6 +2529,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table_external,PROD)",