Skip to content

Commit

Permalink
feat(ingest/unity): capture create/lastModified timestamps (#7819)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamas Nemeth <[email protected]>
  • Loading branch information
hsheth2 and treff7es authored Apr 17, 2023
1 parent a8681da commit e461d03
Show file tree
Hide file tree
Showing 3 changed files with 355 additions and 11 deletions.
49 changes: 48 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
import time
from typing import Dict, Iterable, List, Optional

from datahub.emitter.mce_builder import (
Expand Down Expand Up @@ -58,12 +59,15 @@
DatasetPropertiesClass,
DomainsClass,
MySqlDDLClass,
OperationClass,
OperationTypeClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
SchemaFieldClass,
SchemaMetadataClass,
SubTypesClass,
TimeStampClass,
UpstreamClass,
UpstreamLineageClass,
)
Expand Down Expand Up @@ -254,6 +258,7 @@ def process_table(

sub_type = self._create_table_sub_type_aspect(table)
schema_metadata = self._create_schema_metadata_aspect(table)
operation = self._create_table_operation_aspect(table)

domain = self._get_domain_aspect(
dataset_name=str(
Expand All @@ -279,6 +284,7 @@ def process_table(
view_props,
sub_type,
schema_metadata,
operation,
domain,
ownership,
lineage,
Expand Down Expand Up @@ -454,18 +460,59 @@ def _create_table_property_aspect(
custom_properties["created_by"] = table.created_by
custom_properties["created_at"] = str(table.created_at)
if table.properties:
custom_properties["properties"] = str(table.properties)
custom_properties.update({k: str(v) for k, v in table.properties.items()})
custom_properties["table_id"] = table.table_id
custom_properties["owner"] = table.owner
custom_properties["updated_by"] = table.updated_by
custom_properties["updated_at"] = str(table.updated_at)

created = TimeStampClass(
int(table.created_at.timestamp() * 1000), make_user_urn(table.created_by)
)
last_modified = created
if table.updated_at and table.updated_by is not None:
last_modified = TimeStampClass(
int(table.updated_at.timestamp() * 1000),
make_user_urn(table.updated_by),
)

return DatasetPropertiesClass(
name=table.name,
description=table.comment,
customProperties=custom_properties,
created=created,
lastModified=last_modified,
)

def _create_table_operation_aspect(self, table: proxy.Table) -> OperationClass:
"""Produce an operation aspect for a table.
If a last updated time is present, we produce an update operation.
Otherwise, we produce a create operation. We do this in addition to
setting the last updated time in the dataset properties aspect, as
the UI is currently missing the ability to display the last updated
from the properties aspect.
"""

reported_time = int(time.time() * 1000)

operation = OperationClass(
timestampMillis=reported_time,
lastUpdatedTimestamp=int(table.created_at.timestamp() * 1000),
actor=make_user_urn(table.created_by),
operationType=OperationTypeClass.CREATE,
)

if table.updated_at and table.updated_by is not None:
operation = OperationClass(
timestampMillis=reported_time,
lastUpdatedTimestamp=int(table.updated_at.timestamp() * 1000),
actor=make_user_urn(table.updated_by),
operationType=OperationTypeClass.UPDATE,
)

return operation

def _create_table_ownership_aspect(
self, table: proxy.Table
) -> Optional[OwnershipClass]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _default_deserializer(value: Any) -> Any:


@dataclass(eq=False)
class FileBackedDict(MutableMapping[str, _VT], Generic[_VT], Closeable):
class FileBackedDict(MutableMapping[str, _VT], Closeable, Generic[_VT]):
"""
A dict-like object that stores its data in a temporary SQLite database.
This is useful for storing large amounts of data that don't fit in memory.
Expand Down
Loading

0 comments on commit e461d03

Please sign in to comment.