From e461d03d94ee77c4251cde9617418f17ba10e180 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 18 Apr 2023 00:48:21 +0530 Subject: [PATCH] feat(ingest/unity): capture create/lastModified timestamps (#7819) Co-authored-by: Tamas Nemeth --- .../datahub/ingestion/source/unity/source.py | 49 ++- .../utilities/file_backed_collections.py | 2 +- .../unity/unity_catalog_mces_golden.json | 315 +++++++++++++++++- 3 files changed, 355 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 6ff150f5daf76..554bf79b167e1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -1,5 +1,6 @@ import logging import re +import time from typing import Dict, Iterable, List, Optional from datahub.emitter.mce_builder import ( @@ -58,12 +59,15 @@ DatasetPropertiesClass, DomainsClass, MySqlDDLClass, + OperationClass, + OperationTypeClass, OwnerClass, OwnershipClass, OwnershipTypeClass, SchemaFieldClass, SchemaMetadataClass, SubTypesClass, + TimeStampClass, UpstreamClass, UpstreamLineageClass, ) @@ -254,6 +258,7 @@ def process_table( sub_type = self._create_table_sub_type_aspect(table) schema_metadata = self._create_schema_metadata_aspect(table) + operation = self._create_table_operation_aspect(table) domain = self._get_domain_aspect( dataset_name=str( @@ -279,6 +284,7 @@ def process_table( view_props, sub_type, schema_metadata, + operation, domain, ownership, lineage, @@ -454,18 +460,59 @@ def _create_table_property_aspect( custom_properties["created_by"] = table.created_by custom_properties["created_at"] = str(table.created_at) if table.properties: - custom_properties["properties"] = str(table.properties) + custom_properties.update({k: str(v) for k, v in table.properties.items()}) custom_properties["table_id"] = table.table_id custom_properties["owner"] = table.owner custom_properties["updated_by"] = table.updated_by custom_properties["updated_at"] = str(table.updated_at) + created = TimeStampClass( + int(table.created_at.timestamp() * 1000), make_user_urn(table.created_by) + ) + last_modified = created + if table.updated_at and table.updated_by is not None: + last_modified = TimeStampClass( + int(table.updated_at.timestamp() * 1000), + make_user_urn(table.updated_by), + ) + return DatasetPropertiesClass( name=table.name, description=table.comment, customProperties=custom_properties, + created=created, + lastModified=last_modified, ) + def _create_table_operation_aspect(self, table: proxy.Table) -> OperationClass: + """Produce an operation aspect for a table. + + If a last updated time is present, we produce an update operation. + Otherwise, we produce a create operation. We do this in addition to + setting the last updated time in the dataset properties aspect, as + the UI is currently missing the ability to display the last updated + from the properties aspect. + """ + + reported_time = int(time.time() * 1000) + + operation = OperationClass( + timestampMillis=reported_time, + lastUpdatedTimestamp=int(table.created_at.timestamp() * 1000), + actor=make_user_urn(table.created_by), + operationType=OperationTypeClass.CREATE, + ) + + if table.updated_at and table.updated_by is not None: + operation = OperationClass( + timestampMillis=reported_time, + lastUpdatedTimestamp=int(table.updated_at.timestamp() * 1000), + actor=make_user_urn(table.updated_by), + operationType=OperationTypeClass.UPDATE, + ) + + return operation + def _create_table_ownership_aspect( self, table: proxy.Table ) -> Optional[OwnershipClass]: diff --git a/metadata-ingestion/src/datahub/utilities/file_backed_collections.py b/metadata-ingestion/src/datahub/utilities/file_backed_collections.py index b6450c23d43f4..58e693b315653 100644 --- a/metadata-ingestion/src/datahub/utilities/file_backed_collections.py +++ b/metadata-ingestion/src/datahub/utilities/file_backed_collections.py @@ -136,7 +136,7 @@ def _default_deserializer(value: Any) -> Any: @dataclass(eq=False) -class FileBackedDict(MutableMapping[str, _VT], Generic[_VT], Closeable): +class FileBackedDict(MutableMapping[str, _VT], Closeable, Generic[_VT]): """ A dict-like object that stores its data in a temporary SQLite database. This is useful for storing large amounts of data that don't fit in memory. diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index cae5ff200fa61..a6f8f519053d6 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -261,13 +261,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -350,6 +361,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.default.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.default.quickstart_table,PROD)", @@ -487,13 +520,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -576,6 +620,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.information_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.information_schema.quickstart_table,PROD)", @@ -713,13 +779,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -802,6 +879,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.quickstart_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.quickstart_schema.quickstart_table,PROD)", @@ -1022,13 +1121,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -1111,6 +1221,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.default.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.default.quickstart_table,PROD)", @@ -1248,13 +1380,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -1337,6 +1480,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.information_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.information_schema.quickstart_table,PROD)", @@ -1474,13 +1639,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -1563,6 +1739,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.quickstart_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.quickstart_schema.quickstart_table,PROD)", @@ -1783,13 +1981,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -1872,6 +2081,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.default.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.default.quickstart_table,PROD)", @@ -2009,13 +2240,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -2098,6 +2340,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.information_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.information_schema.quickstart_table,PROD)", @@ -2235,13 +2499,24 @@ "table_type": "MANAGED", "created_by": "abc@acryl.io", "created_at": "2022-10-19 13:21:38.688000", - "properties": "{'delta.lastCommitTimestamp': '1666185711000', 'delta.lastUpdateVersion': '1', 'delta.minReaderVersion': '1', 'delta.minWriterVersion': '2'}", + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", "updated_at": "2022-10-19 13:27:29.633000" }, "name": "quickstart_table", + "created": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + }, + "lastModified": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + }, "tags": [] } }, @@ -2324,6 +2599,28 @@ "runId": "unity-catalog-test" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.quickstart_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1638860400000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:abc@acryl.io", + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1666186049633 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.quickstart_schema.quickstart_table,PROD)",