From 7e604010f1a8f10809efe2fd7a686d1c1d085a0c Mon Sep 17 00:00:00 2001 From: aabharti-visa <145495867+aabharti-visa@users.noreply.github.com> Date: Tue, 11 Jun 2024 17:30:12 +0530 Subject: [PATCH] feat(ingestion/kafka)-Add support for ingesting schemas from schema registry (#10612) --- .../source/confluent_schema_registry.py | 58 +- .../src/datahub/ingestion/source/kafka.py | 80 +- .../source/kafka_schema_registry_base.py | 14 +- .../source/snowflake/snowflake_assertion.py | 2 - .../source/snowflake/snowflake_query.py | 1 - .../integration/kafka/kafka_mces_golden.json | 889 +++++++++++++++++- .../tests/unit/test_kafka_source.py | 71 +- 7 files changed, 1059 insertions(+), 56 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 54475cb509621..fba71240282c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -231,16 +231,25 @@ def get_schemas_from_confluent_ref_json( return all_schemas def _get_schema_and_fields( - self, topic: str, is_key_schema: bool + self, topic: str, is_key_schema: bool, is_subject: bool ) -> Tuple[Optional[Schema], List[SchemaField]]: schema: Optional[Schema] = None - schema_type_str: str = "key" if is_key_schema else "value" - topic_subject: Optional[str] = self._get_subject_for_topic( - topic=topic, is_key_schema=is_key_schema - ) + kafka_entity = "subject" if is_subject else "topic" + + # if provided schema as topic, assuming it as value subject + schema_type_str: Optional[str] = "value" + topic_subject: Optional[str] = None + if not is_subject: + schema_type_str = "key" if is_key_schema else "value" + topic_subject = self._get_subject_for_topic( + topic=topic, is_key_schema=is_key_schema + ) + else: + topic_subject = topic + if topic_subject is not None: logger.debug( - f"The {schema_type_str} schema subject:'{topic_subject}' is found for topic:'{topic}'." + f"The {schema_type_str} schema subject:'{topic_subject}' is found for {kafka_entity}:'{topic}'." ) try: registered_schema = self.schema_registry_client.get_latest_version( @@ -249,7 +258,7 @@ def _get_schema_and_fields( schema = registered_schema.schema except Exception as e: logger.warning( - f"For topic: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}." + f"For {kafka_entity}: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}." ) self.report.report_warning( topic, @@ -257,21 +266,23 @@ def _get_schema_and_fields( ) else: logger.debug( - f"For topic: {topic}, the schema registry subject for the {schema_type_str} schema is not found." + f"For {kafka_entity}: {topic}, the schema registry subject for the {schema_type_str} schema is not found." ) if not is_key_schema: # Value schema is always expected. Report a warning. self.report.report_warning( topic, f"The schema registry subject for the {schema_type_str} schema is not found." - f" The topic is either schema-less, or no messages have been written to the topic yet.", + f" The {kafka_entity} is either schema-less, or no messages have been written to the {kafka_entity} yet.", ) # Obtain the schema fields from schema for the topic. fields: List[SchemaField] = [] if schema is not None: fields = self._get_schema_fields( - topic=topic, schema=schema, is_key_schema=is_key_schema + topic=topic, + schema=schema, + is_key_schema=is_key_schema, ) return (schema, fields) @@ -352,16 +363,21 @@ def _get_schema_fields( return fields def _get_schema_metadata( - self, topic: str, platform_urn: str + self, topic: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: + # Process the value schema schema, fields = self._get_schema_and_fields( - topic=topic, is_key_schema=False + topic=topic, + is_key_schema=False, + is_subject=is_subject, ) # type: Tuple[Optional[Schema], List[SchemaField]] # Process the key schema key_schema, key_fields = self._get_schema_and_fields( - topic=topic, is_key_schema=True + topic=topic, + is_key_schema=True, + is_subject=is_subject, ) # type:Tuple[Optional[Schema], List[SchemaField]] # Create the schemaMetadata aspect. @@ -388,17 +404,22 @@ def _get_schema_metadata( return None def get_schema_metadata( - self, topic: str, platform_urn: str + self, topic: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: - logger.debug(f"Inside _get_schema_metadata {topic} {platform_urn}") + logger.debug(f"Inside get_schema_metadata {topic} {platform_urn}") + # Process the value schema schema, fields = self._get_schema_and_fields( - topic=topic, is_key_schema=False + topic=topic, + is_key_schema=False, + is_subject=is_subject, ) # type: Tuple[Optional[Schema], List[SchemaField]] # Process the key schema key_schema, key_fields = self._get_schema_and_fields( - topic=topic, is_key_schema=True + topic=topic, + is_key_schema=True, + is_subject=is_subject, ) # type:Tuple[Optional[Schema], List[SchemaField]] # Create the schemaMetadata aspect. @@ -423,3 +444,6 @@ def get_schema_metadata( fields=key_fields + fields, ) return None + + def get_subjects(self) -> List[str]: + return self.known_schema_registry_subjects diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 99ef737206ab0..0d718e509d5c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -303,34 +303,63 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ).topics extra_topic_details = self.fetch_extra_topic_details(topics.keys()) - for t, t_detail in topics.items(): - self.report.report_topic_scanned(t) - if self.source_config.topic_patterns.allowed(t): + for topic, topic_detail in topics.items(): + self.report.report_topic_scanned(topic) + if self.source_config.topic_patterns.allowed(topic): try: yield from self._extract_record( - t, t_detail, extra_topic_details.get(t) + topic, False, topic_detail, extra_topic_details.get(topic) ) except Exception as e: - logger.warning(f"Failed to extract topic {t}", exc_info=True) + logger.warning(f"Failed to extract topic {topic}", exc_info=True) self.report.report_warning( - "topic", f"Exception while extracting topic {t}: {e}" + "topic", f"Exception while extracting topic {topic}: {e}" ) else: - self.report.report_dropped(t) + self.report.report_dropped(topic) + + # Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes + for subject in self.schema_registry_client.get_subjects(): + try: + yield from self._extract_record( + subject, True, topic_detail=None, extra_topic_config=None + ) + except Exception as e: + logger.warning(f"Failed to extract subject {subject}", exc_info=True) + self.report.report_warning( + "subject", f"Exception while extracting topic {subject}: {e}" + ) def _extract_record( self, topic: str, + is_subject: bool, topic_detail: Optional[TopicMetadata], extra_topic_config: Optional[Dict[str, ConfigEntry]], ) -> Iterable[MetadataWorkUnit]: - logger.debug(f"topic = {topic}") - AVRO = "AVRO" - # 1. Create the default dataset snapshot for the topic. - dataset_name = topic + kafka_entity = "subject" if is_subject else "topic" + + logger.debug(f"extracting schema metadata from kafka entity = {kafka_entity}") + platform_urn = make_data_platform_urn(self.platform) + + # 1. Create schemaMetadata aspect (pass control to SchemaRegistry) + schema_metadata = self.schema_registry_client.get_schema_metadata( + topic, platform_urn, is_subject + ) + + # topic can have no associated subject, but still it can be ingested without schema + # for schema ingestion, ingest only if it has valid schema + if is_subject: + if schema_metadata is None: + return + dataset_name = schema_metadata.schemaName + else: + dataset_name = topic + + # 2. Create the default dataset snapshot for the topic. dataset_urn = make_dataset_urn_with_platform_instance( platform=self.platform, name=dataset_name, @@ -342,10 +371,6 @@ def _extract_record( aspects=[Status(removed=False)], # we append to this list later on ) - # 2. Attach schemaMetadata aspect (pass control to SchemaRegistry) - schema_metadata = self.schema_registry_client.get_schema_metadata( - topic, platform_urn - ) if schema_metadata is not None: dataset_snapshot.aspects.append(schema_metadata) @@ -356,9 +381,19 @@ def _extract_record( browse_path = BrowsePathsClass([browse_path_str]) dataset_snapshot.aspects.append(browse_path) - custom_props = self.build_custom_properties( - topic, topic_detail, extra_topic_config - ) + # build custom properties for topic, schema properties may be added as needed + custom_props: Dict[str, str] = {} + if not is_subject: + custom_props = self.build_custom_properties( + topic, topic_detail, extra_topic_config + ) + schema_name: Optional[ + str + ] = self.schema_registry_client._get_subject_for_topic( + topic, is_key_schema=False + ) + if schema_name is not None: + custom_props["Schema Name"] = schema_name # 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro description: Optional[str] = None @@ -414,7 +449,7 @@ def _extract_record( ) dataset_properties = DatasetPropertiesClass( - name=topic, customProperties=custom_props, description=description + name=dataset_name, customProperties=custom_props, description=description ) dataset_snapshot.aspects.append(dataset_properties) @@ -431,12 +466,13 @@ def _extract_record( # 6. Emit the datasetSnapshot MCE mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - yield MetadataWorkUnit(id=f"kafka-{topic}", mce=mce) + yield MetadataWorkUnit(id=f"kafka-{kafka_entity}", mce=mce) - # 7. Add the subtype aspect marking this as a "topic" + # 7. Add the subtype aspect marking this as a "topic" or "schema" + typeName = DatasetSubTypes.SCHEMA if is_subject else DatasetSubTypes.TOPIC yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, - aspect=SubTypesClass(typeNames=[DatasetSubTypes.TOPIC]), + aspect=SubTypesClass(typeNames=[typeName]), ).as_workunit() domain_urn: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_schema_registry_base.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_schema_registry_base.py index 34ff76f44d1dd..59f174a9a5045 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_schema_registry_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_schema_registry_base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import List, Optional from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata @@ -7,6 +7,16 @@ class KafkaSchemaRegistryBase(ABC): @abstractmethod def get_schema_metadata( - self, topic: str, platform_urn: str + self, topic: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: pass + + @abstractmethod + def get_subjects(self) -> List[str]: + pass + + @abstractmethod + def _get_subject_for_topic( + self, dataset_subtype: str, is_key_schema: bool + ) -> Optional[str]: + pass diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py index 8abb656e30e73..a28a81cc5b955 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_assertion.py @@ -59,7 +59,6 @@ def __init__( def get_assertion_workunits( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: - self.connection = self.create_connection() if self.connection is None: return @@ -80,7 +79,6 @@ def get_assertion_workunits( yield self._gen_platform_instance_wu(mcp.entityUrn) def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit: - # Construct a MetadataChangeProposalWrapper object for assertion platform return MetadataChangeProposalWrapper( entityUrn=urn, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 205490a6d29c6..8187fce78e5e4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -1019,7 +1019,6 @@ def table_upstreams_only( @staticmethod def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str: - pattern = r"datahub\\_\\_%" escape_pattern = r"\\" return f""" diff --git a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json index 7dd328168e84c..7df790b74e829 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json @@ -270,7 +270,8 @@ "retention.ms": "604800000", "cleanup.policy": "delete", "max.message.bytes": "1048588", - "unclean.leader.election.enable": "false" + "unclean.leader.election.enable": "false", + "Schema Name": "key_value_topic-value" }, "name": "key_value_topic", "description": "Value schema for kafka topic", @@ -472,7 +473,8 @@ "retention.ms": "604800000", "cleanup.policy": "delete", "max.message.bytes": "1048588", - "unclean.leader.election.enable": "false" + "unclean.leader.election.enable": "false", + "Schema Name": "value_topic-value" }, "name": "value_topic", "description": "Value schema for kafka topic", @@ -522,6 +524,889 @@ "lastRunId": "no-run-id-provided" } }, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic-key,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_topic-key", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "5e468f7aa532c2f2ed9686ff3ec943ec", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "key_topic-key", + "description": "Key schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic-key", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "5e468f7aa532c2f2ed9686ff3ec943ec", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[key=True].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=long].id", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "id", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "[version=2.0].[type=UserKey].[type=string].namespace", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "namespace", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "key_value_topic-key", + "description": "Key schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "key_value_topic-value", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "c9b692583e304b9cb703ffa748a9f37d", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "key_value_topic-value", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "json": { + "domains": [ + "urn:li:domain:sales" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,key_value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-key,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic-key", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "c088cd2eb2de57e32c00b32d4871ec72", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "\"string\"", + "documentSchemaType": "AVRO", + "keySchema": "\"string\"", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=string]", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": true + }, + { + "fieldPath": "[version=2.0].[type=string]", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "value_topic-key", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-key,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-value,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "value_topic-value", + "platform": "urn:li:dataPlatform:kafka", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "c9b692583e304b9cb703ffa748a9f37d", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.KafkaSchema": { + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "documentSchemaType": "AVRO", + "keySchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", + "keySchemaType": "AVRO" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[key=True].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": true, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].email", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "email", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "firstName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + }, + { + "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", + "nullable": false, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "lastName", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/prod/kafka" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "name": "value_topic-value", + "description": "Value schema for kafka topic", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Schema" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,value_topic-value,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "tag", "entityUrn": "urn:li:tag:Email", diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index 5ad9ac45534aa..b4e37d288a304 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -334,7 +334,9 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: mock_kafka_consumer.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() - assert len(workunits) == 8 + # Along with with 4 topics(3 with schema and 1 schemaless) which constitutes to 8 workunits, + # there will be 6 schemas (1 key and 1 value schema for 3 topics) which constitutes to 12 workunits + assert len(workunits) == 20 i: int = -1 for wu in workunits: assert isinstance(wu, MetadataWorkUnit) @@ -343,6 +345,8 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: mce: MetadataChangeEvent = wu.metadata i += 1 + # Only topic (named schema_less_topic) does not have schema metadata but other workunits (that are created + # for schema) will have corresponding SchemaMetadata aspect if i < len(topic_subject_schema_map.keys()): # First 3 workunits (topics) must have schemaMetadata aspect assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass) @@ -380,11 +384,18 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: ) # Make sure we have 2 fields, one from the key schema & one from the value schema. assert len(schemaMetadataAspect.fields) == 2 - else: + elif i == len(topic_subject_schema_map.keys()): # Last topic('schema_less_topic') has no schema defined in the registry. # The schemaMetadata aspect should not be present for this. for aspect in mce.proposedSnapshot.aspects: assert not isinstance(aspect, SchemaMetadataClass) + else: + # Last 2 workunits (schemas) must have schemaMetadata aspect + assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass) + schemaMetadataAspectObj: SchemaMetadataClass = mce.proposedSnapshot.aspects[ + 1 + ] + assert isinstance(schemaMetadataAspectObj.platformSchema, KafkaSchemaClass) @pytest.mark.parametrize( @@ -465,7 +476,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: workunits = list(kafka_source.get_workunits()) - assert len(workunits) == 2 + assert len(workunits) == 6 if ignore_warnings_on_schema_type: assert not kafka_source.report.warnings else: @@ -643,8 +654,10 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: }, ctx, ) + # Along with with 1 topics(and 5 meta mapping) it constitutes to 6 workunits, + # there will be 2 schemas which constitutes to 4 workunits (1 mce and 1 mcp each) workunits = [w for w in kafka_source.get_workunits()] - assert len(workunits) == 6 + assert len(workunits) == 10 mce = workunits[0].metadata assert isinstance(mce, MetadataChangeEvent) @@ -677,11 +690,49 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: "urn:li:glossaryTerm:double_meta_property", ] ) - assert isinstance(workunits[2].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper) + mce = workunits[2].metadata + assert isinstance(mce, MetadataChangeEvent) assert isinstance(workunits[3].metadata, MetadataChangeProposalWrapper) - assert isinstance(workunits[4].metadata, MetadataChangeProposalWrapper) + + mce = workunits[4].metadata + assert isinstance(mce, MetadataChangeEvent) + ownership_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass) + ][0] + assert ownership_aspect == make_ownership_aspect_from_urn_list( + [ + make_owner_urn("charles", OwnerType.USER), + make_owner_urn("jdoe.last@gmail.com", OwnerType.USER), + ], + "SERVICE", + ) + + tags_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass) + ][0] + assert tags_aspect == make_global_tag_aspect_with_tag_list( + ["has_pii_test", "int_meta_property"] + ) + + terms_aspect = [ + asp + for asp in mce.proposedSnapshot.aspects + if isinstance(asp, GlossaryTermsClass) + ][0] + assert terms_aspect == make_glossary_terms_aspect_from_urn_list( + [ + "urn:li:glossaryTerm:Finance_test", + "urn:li:glossaryTerm:double_meta_property", + ] + ) + assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper) - assert workunits[2].metadata.aspectName == "glossaryTermKey" - assert workunits[3].metadata.aspectName == "glossaryTermKey" - assert workunits[4].metadata.aspectName == "tagKey" - assert workunits[5].metadata.aspectName == "tagKey" + assert isinstance(workunits[6].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[7].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[8].metadata, MetadataChangeProposalWrapper) + assert isinstance(workunits[9].metadata, MetadataChangeProposalWrapper) + assert workunits[6].metadata.aspectName == "glossaryTermKey" + assert workunits[7].metadata.aspectName == "glossaryTermKey" + assert workunits[8].metadata.aspectName == "tagKey" + assert workunits[9].metadata.aspectName == "tagKey"