diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 87983fb794cb6..60a3a1c1b0960 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -332,7 +332,8 @@ def operational_data_for_time_window( ON access_history.user_name = users.name WHERE query_start_time >= to_timestamp_ltz({start_time_millis}, 3) AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) - AND query_history.query_type in ('INSERT', 'UPDATE', 'DELETE', 'CREATE', 'CREATE_TABLE', 'CREATE_TABLE_AS_SELECT') + AND access_history.objects_modified is not null + AND ARRAY_SIZE(access_history.objects_modified) > 0 ORDER BY query_start_time DESC ;""" diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 6d92afa5c8463..b86b6054d0f83 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -43,6 +43,20 @@ "CREATE": OperationTypeClass.CREATE, "CREATE_TABLE": OperationTypeClass.CREATE, "CREATE_TABLE_AS_SELECT": OperationTypeClass.CREATE, + "MERGE": OperationTypeClass.CUSTOM, + "COPY": OperationTypeClass.CUSTOM, + "TRUNCATE_TABLE": OperationTypeClass.CUSTOM, + # TODO: Dataset for below query types are not detected by snowflake in snowflake.access_history.objects_modified. + # However it seems possible to support these using sql parsing in future. + # When this support is added, snowflake_query.operational_data_for_time_window needs to be updated. + # "CREATE_VIEW": OperationTypeClass.CREATE, + # "CREATE_EXTERNAL_TABLE": OperationTypeClass.CREATE, + # "ALTER_TABLE_MODIFY_COLUMN": OperationTypeClass.ALTER, + # "ALTER_TABLE_ADD_COLUMN": OperationTypeClass.ALTER, + # "RENAME_COLUMN": OperationTypeClass.ALTER, + # "ALTER_SET_TAG": OperationTypeClass.ALTER, + # "ALTER_TABLE_DROP_COLUMN": OperationTypeClass.ALTER, + # "ALTER": OperationTypeClass.ALTER, } @@ -328,12 +342,14 @@ def _check_usage_date_ranges(self) -> Any: def _get_operation_aspect_work_unit( self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: - if event.query_start_time and event.query_type in OPERATION_STATEMENT_TYPES: + if event.query_start_time and event.query_type: start_time = event.query_start_time query_type = event.query_type user_email = event.email user_name = event.user_name - operation_type = OPERATION_STATEMENT_TYPES[query_type] + operation_type = OPERATION_STATEMENT_TYPES.get( + query_type, OperationTypeClass.CUSTOM + ) reported_time: int = int(time.time() * 1000) last_updated_timestamp: int = int(start_time.timestamp() * 1000) user_urn = make_user_urn(self.get_user_identifier(user_name, user_email)) @@ -363,6 +379,9 @@ def _get_operation_aspect_work_unit( lastUpdatedTimestamp=last_updated_timestamp, actor=user_urn, operationType=operation_type, + customOperationType=query_type + if operation_type is OperationTypeClass.CUSTOM + else None, ) mcp = MetadataChangeProposalWrapper( entityUrn=dataset_urn,