From c13679b55e13845442651ac39e1e8aaeb6383294 Mon Sep 17 00:00:00 2001 From: "Bharti, Aakash" Date: Thu, 30 May 2024 14:49:56 +0530 Subject: [PATCH] fix metadata-ingestion:lint issues --- .../source/confluent_schema_registry.py | 8 - .../integration/kafka/kafka_mces_golden.json | 1913 ++++++++++++----- .../tests/unit/test_kafka_source.py | 65 +- 3 files changed, 1457 insertions(+), 529 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 5bc4b541165067..9f28a498f25c93 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -369,10 +369,6 @@ def _get_schema_metadata( self, dataset_subtype: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: - schema: Optional[Schema] = None - key_schema: Optional[Schema] = None - fields: List[SchemaField] = [] - key_fields: List[SchemaField] = [] # Process the value schema schema, fields = self._get_schema_and_fields( dataset_subtype=dataset_subtype, @@ -415,10 +411,6 @@ def get_schema_metadata( ) -> Optional[SchemaMetadata]: logger.debug(f"Inside get_schema_metadata {dataset_subtype} {platform_urn}") - schema: Optional[Schema] = None - key_schema: Optional[Schema] = None - fields: List[SchemaField] = [] - key_fields: List[SchemaField] = [] # Process the value schema schema, fields = self._get_schema_and_fields( dataset_subtype=dataset_subtype, diff --git a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json index 7dd328168e84c0..26c680105cd946 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json @@ -1,573 +1,1458 @@ [ -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "key_topic", - "platform": "urn:li:dataPlatform:kafka", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "44fd7a7b325d6fdd4275b1f02a79c1a8", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "", - "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", - "keySchemaType": "AVRO" - } - }, - "fields": [ - { - "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", - "nullable": false, - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.NumberType": {} - } - }, - "nativeDataType": "id", - "recursive": false, - "isPartOfKey": true + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" }, - { - "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", - "nullable": false, - "type": { + "hash": "44fd7a7b325d6fdd4275b1f02a79c1a8", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true }, - "nativeDataType": "namespace", - "recursive": false, - "isPartOfKey": true - } - ] - } - }, - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/prod/kafka" - ] - } - }, - { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": { - "Partitions": "1", - "Replication Factor": "1", - "min.insync.replicas": "1", - "retention.bytes": "-1", - "retention.ms": "604800000", - "cleanup.policy": "delete", - "max.message.bytes": "1048588", - "unclean.leader.election.enable": "false" - }, - "name": "key_topic", - "tags": [] + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false" + }, + "name": "key_topic", + "tags": [] + } } - } - ] + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "Topic" - ] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "key_value_topic", - "platform": "urn:li:dataPlatform:kafka", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "a79a2fe3adab60b21d272a9cc3e93595", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", - "documentSchemaType": "AVRO", - "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", - "keySchemaType": "AVRO" - } - }, - "fields": [ - { - "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", - "nullable": false, - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.NumberType": {} - } - }, - "nativeDataType": "id", - "recursive": false, - "isPartOfKey": true + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" }, - { - "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", - "nullable": false, - "type": { + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "a79a2fe3adab60b21d272a9cc3e93595", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true }, - "nativeDataType": "namespace", - "recursive": false, - "isPartOfKey": true - }, - { - "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", - "nullable": false, - "type": { + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true }, - "nativeDataType": "email", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Email" - } - ] + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, - "isPartOfKey": false, - "jsonProps": "{\"tags\": [\"Email\"]}" + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false", + "SchemaName": "key_value_topic-value" + }, + "name": "key_value_topic", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "62c7c400ec5760797a59c45e59c2f2dc", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "\"string\"", + "keySchemaType": "AVRO" + } }, - { - "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", - "nullable": false, - "type": { + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=string]", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": true }, - "nativeDataType": "firstName", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Name" - } - ] + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, - "isPartOfKey": false, - "jsonProps": "{\"tags\": [\"Name\"]}" + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "Partitions": "1", + "Replication Factor": "1", + "min.insync.replicas": "1", + "retention.bytes": "-1", + "retention.ms": "604800000", + "cleanup.policy": "delete", + "max.message.bytes": "1048588", + "unclean.leader.election.enable": "false", + "SchemaName": "value_topic-value" + }, + "name": "value_topic", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Topic" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic-key,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_topic-key", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "5e468f7aa532c2f2ed9686ff3ec943ec", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } }, - { - "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", - "nullable": false, - "type": { + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true }, - "nativeDataType": "lastName", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Name" - } - ] + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true }, - "isPartOfKey": false, - "jsonProps": "{\"tags\": [\"Name\"]}" - } - ] - } - }, - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/prod/kafka" - ] - } - }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [ - { - "tag": "urn:li:tag:PII" - } - ] + { + "fieldPath": "[version=2.0].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "key_topic-key", + "description": "Key schema for kafka topic", + "tags": [] + } } - }, - { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": { - "Partitions": "1", - "Replication Factor": "1", - "min.insync.replicas": "1", - "retention.bytes": "-1", - "retention.ms": "604800000", - "cleanup.policy": "delete", - "max.message.bytes": "1048588", - "unclean.leader.election.enable": "false" - }, - "name": "key_value_topic", - "description": "Value schema for kafka topic", - "tags": [] + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic-key", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "5e468f7aa532c2f2ed9686ff3ec943ec", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "key_value_topic-key", + "description": "Key schema for kafka topic", + "tags": [] + } } - } - ] + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "Topic" - ] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "domains", - "aspect": { - "json": { - "domains": [ - "urn:li:domain:sales" - ] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "value_topic", - "platform": "urn:li:dataPlatform:kafka", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "62c7c400ec5760797a59c45e59c2f2dc", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", - "documentSchemaType": "AVRO", - "keySchema": "\"string\"", - "keySchemaType": "AVRO" - } - }, - "fields": [ - { - "fieldPath": "[version=2.0].[key=True].[type=string]", - "nullable": false, - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "string", - "recursive": false, - "isPartOfKey": true + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic-value", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" }, - { - "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", - "nullable": false, - "type": { + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "c9b692583e304b9cb703ffa748a9f37d", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].email", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "email", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Email" - } - ] + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Email\"]}" }, - "isPartOfKey": false, - "jsonProps": "{\"tags\": [\"Email\"]}" - }, - { - "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", - "nullable": false, - "type": { + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" }, - "nativeDataType": "firstName", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Name" - } - ] + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" }, - "isPartOfKey": false, - "jsonProps": "{\"tags\": [\"Name\"]}" - }, - { - "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", - "nullable": false, - "type": { + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" }, - "nativeDataType": "lastName", - "recursive": false, - "globalTags": { - "tags": [ - { - "tag": "urn:li:tag:Name" - } - ] + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, - "isPartOfKey": false, - "jsonProps": "{\"tags\": [\"Name\"]}" - } - ] - } - }, - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/prod/kafka" - ] - } - }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [ - { - "tag": "urn:li:tag:PII" - } - ] + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "key_value_topic-value", + "description": "Value schema for kafka topic", + "tags": [] + } } - }, - { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": { - "Partitions": "1", - "Replication Factor": "1", - "min.insync.replicas": "1", - "retention.bytes": "-1", - "retention.ms": "604800000", - "cleanup.policy": "delete", - "max.message.bytes": "1048588", - "unclean.leader.election.enable": "false" - }, - "name": "value_topic", - "description": "Value schema for kafka topic", - "tags": [] + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-key,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic-key", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "c088cd2eb2de57e32c00b32d4871ec72", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "\"string\"", + "documentSchemaType": "AVRO", + "keySchema": "\"string\"", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=string]", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=string]", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "value_topic-key", + "tags": [] + } } - } - ] + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "subTypes", - "aspect": { - "json": { - "typeNames": [ - "Topic" - ] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic,PROD)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [] + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:Email", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "Email" + { + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-value,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic-value", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "c9b692583e304b9cb703ffa748a9f37d", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "value_topic-value", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:Name", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "Name" + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:PII", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "PII" + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "kafka-test", - "lastRunId": "no-run-id-provided" + { + "entityType": "tag", + "entityUrn": "urn:li:tag:Email", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Email" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "tag", + "entityUrn": "urn:li:tag:Name", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Name" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "tag", + "entityUrn": "urn:li:tag:PII", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "PII" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } } -} -] \ No newline at end of file +] diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index 231e41c960db1c..b4e37d288a3041 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -334,6 +334,8 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: mock_kafka_consumer.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() + # Along with with 4 topics(3 with schema and 1 schemaless) which constitutes to 8 workunits, + # there will be 6 schemas (1 key and 1 value schema for 3 topics) which constitutes to 12 workunits assert len(workunits) == 20 i: int = -1 for wu in workunits: @@ -343,6 +345,8 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: mce: MetadataChangeEvent = wu.metadata i += 1 + # Only topic (named schema_less_topic) does not have schema metadata but other workunits (that are created + # for schema) will have corresponding SchemaMetadata aspect if i < len(topic_subject_schema_map.keys()): # First 3 workunits (topics) must have schemaMetadata aspect assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass) @@ -380,11 +384,18 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: ) # Make sure we have 2 fields, one from the key schema & one from the value schema. assert len(schemaMetadataAspect.fields) == 2 - else: + elif i == len(topic_subject_schema_map.keys()): # Last topic('schema_less_topic') has no schema defined in the registry. # The schemaMetadata aspect should not be present for this. for aspect in mce.proposedSnapshot.aspects: assert not isinstance(aspect, SchemaMetadataClass) + else: + # Last 2 workunits (schemas) must have schemaMetadata aspect + assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass) + schemaMetadataAspectObj: SchemaMetadataClass = mce.proposedSnapshot.aspects[ + 1 + ] + assert isinstance(schemaMetadataAspectObj.platformSchema, KafkaSchemaClass) @pytest.mark.parametrize( @@ -643,6 +654,8 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: }, ctx, ) + # Along with with 1 topics(and 5 meta mapping) it constitutes to 6 workunits, + # there will be 2 schemas which constitutes to 4 workunits (1 mce and 1 mcp each) workunits = [w for w in kafka_source.get_workunits()] assert len(workunits) == 10 mce = workunits[0].metadata @@ -677,11 +690,49 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: "urn:li:glossaryTerm:double_meta_property", ] ) - assert isinstance(workunits[2].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper) + mce = workunits[2].metadata + assert isinstance(mce, MetadataChangeEvent) assert isinstance(workunits[3].metadata, MetadataChangeProposalWrapper) - assert isinstance(workunits[4].metadata, MetadataChangeProposalWrapper) + + mce = workunits[4].metadata + assert isinstance(mce, MetadataChangeEvent) + ownership_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass) + ][0] + assert ownership_aspect == make_ownership_aspect_from_urn_list( + [ + make_owner_urn("charles", OwnerType.USER), + make_owner_urn("jdoe.last@gmail.com", OwnerType.USER), + ], + "SERVICE", + ) + + tags_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass) + ][0] + assert tags_aspect == make_global_tag_aspect_with_tag_list( + ["has_pii_test", "int_meta_property"] + ) + + terms_aspect = [ + asp + for asp in mce.proposedSnapshot.aspects + if isinstance(asp, GlossaryTermsClass) + ][0] + assert terms_aspect == make_glossary_terms_aspect_from_urn_list( + [ + "urn:li:glossaryTerm:Finance_test", + "urn:li:glossaryTerm:double_meta_property", + ] + ) + assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper) - assert workunits[2].metadata.aspectName == "glossaryTermKey" - assert workunits[3].metadata.aspectName == "glossaryTermKey" - assert workunits[4].metadata.aspectName == "tagKey" - assert workunits[5].metadata.aspectName == "tagKey" + assert isinstance(workunits[6].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[7].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[8].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[9].metadata, MetadataChangeProposalWrapper) + assert workunits[6].metadata.aspectName == "glossaryTermKey" + assert workunits[7].metadata.aspectName == "glossaryTermKey" + assert workunits[8].metadata.aspectName == "tagKey" + assert workunits[9].metadata.aspectName == "tagKey"