diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index cfa199314fc07..fba71240282c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -76,10 +76,7 @@ def create( ) -> "ConfluentSchemaRegistry": return cls(source_config, report) - def _get_subject_for_topic( - self, dataset_subtype: str, is_key_schema: bool - ) -> Optional[str]: - topic: str = dataset_subtype + def _get_subject_for_topic(self, topic: str, is_key_schema: bool) -> Optional[str]: subject_key_suffix: str = "-key" if is_key_schema else "-value" # For details on schema registry subject name strategy, # see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work @@ -234,24 +231,25 @@ def get_schemas_from_confluent_ref_json( return all_schemas def _get_schema_and_fields( - self, dataset_subtype: str, is_key_schema: bool, is_subject: bool + self, topic: str, is_key_schema: bool, is_subject: bool ) -> Tuple[Optional[Schema], List[SchemaField]]: schema: Optional[Schema] = None + kafka_entity = "subject" if is_subject else "topic" - # if provided schema as dataset_subtype, assuming it as value subject + # if provided schema as topic, assuming it as value subject schema_type_str: Optional[str] = "value" topic_subject: Optional[str] = None if not is_subject: schema_type_str = "key" if is_key_schema else "value" topic_subject = self._get_subject_for_topic( - dataset_subtype=dataset_subtype, is_key_schema=is_key_schema + topic=topic, is_key_schema=is_key_schema ) else: - topic_subject = dataset_subtype + topic_subject = topic if topic_subject is not None: logger.debug( - f"The {schema_type_str} schema subject:'{topic_subject}' is found for dataset_subtype:'{dataset_subtype}'." + f"The {schema_type_str} schema subject:'{topic_subject}' is found for {kafka_entity}:'{topic}'." ) try: registered_schema = self.schema_registry_client.get_latest_version( @@ -260,29 +258,29 @@ def _get_schema_and_fields( schema = registered_schema.schema except Exception as e: logger.warning( - f"For dataset_subtype: {dataset_subtype}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}." + f"For {kafka_entity}: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}." ) self.report.report_warning( - dataset_subtype, + topic, f"failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}.", ) else: logger.debug( - f"For dataset_subtype: {dataset_subtype}, the schema registry subject for the {schema_type_str} schema is not found." + f"For {kafka_entity}: {topic}, the schema registry subject for the {schema_type_str} schema is not found." ) if not is_key_schema: # Value schema is always expected. Report a warning. self.report.report_warning( - dataset_subtype, + topic, f"The schema registry subject for the {schema_type_str} schema is not found." - f" The dataset_subtype is either schema-less, or no messages have been written to the dataset_subtype yet.", + f" The {kafka_entity} is either schema-less, or no messages have been written to the {kafka_entity} yet.", ) - # Obtain the schema fields from schema for the dataset_subtype. + # Obtain the schema fields from schema for the topic. fields: List[SchemaField] = [] if schema is not None: fields = self._get_schema_fields( - dataset_subtype=dataset_subtype, + topic=topic, schema=schema, is_key_schema=is_key_schema, ) @@ -308,7 +306,7 @@ def _load_json_schema_with_resolved_references( return jsonref_schema def _get_schema_fields( - self, dataset_subtype: str, schema: Schema, is_key_schema: bool + self, topic: str, schema: Schema, is_key_schema: bool ) -> List[SchemaField]: # Parse the schema and convert it to SchemaFields. fields: List[SchemaField] = [] @@ -331,7 +329,7 @@ def _get_schema_fields( imported_schemas: List[ ProtobufSchema ] = self.get_schemas_from_confluent_ref_protobuf(schema) - base_name: str = dataset_subtype.replace(".", "_") + base_name: str = topic.replace(".", "_") fields = protobuf_util.protobuf_schema_to_mce_fields( ProtobufSchema( f"{base_name}-key.proto" @@ -343,16 +341,14 @@ def _get_schema_fields( is_key_schema=is_key_schema, ) elif schema.schema_type == "JSON": - base_name = dataset_subtype.replace(".", "_") + base_name = topic.replace(".", "_") canonical_name = ( f"{base_name}-key" if is_key_schema else f"{base_name}-value" ) jsonref_schema = self._load_json_schema_with_resolved_references( schema=schema, name=canonical_name, - subject=f"{dataset_subtype}-key" - if is_key_schema - else f"{dataset_subtype}-value", + subject=f"{topic}-key" if is_key_schema else f"{topic}-value", ) fields = list( JsonSchemaTranslator.get_fields_from_schema( @@ -361,25 +357,25 @@ def _get_schema_fields( ) elif not self.source_config.ignore_warnings_on_schema_type: self.report.report_warning( - dataset_subtype, + topic, f"Parsing kafka schema type {schema.schema_type} is currently not implemented", ) return fields def _get_schema_metadata( - self, dataset_subtype: str, platform_urn: str, is_subject: bool + self, topic: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: # Process the value schema schema, fields = self._get_schema_and_fields( - dataset_subtype=dataset_subtype, + topic=topic, is_key_schema=False, is_subject=is_subject, ) # type: Tuple[Optional[Schema], List[SchemaField]] # Process the key schema key_schema, key_fields = self._get_schema_and_fields( - dataset_subtype=dataset_subtype, + topic=topic, is_key_schema=True, is_subject=is_subject, ) # type:Tuple[Optional[Schema], List[SchemaField]] @@ -393,7 +389,7 @@ def _get_schema_metadata( md5_hash: str = md5(schema_as_string.encode()).hexdigest() return SchemaMetadata( - schemaName=dataset_subtype, + schemaName=topic, version=0, hash=md5_hash, platform=platform_urn, @@ -408,20 +404,20 @@ def _get_schema_metadata( return None def get_schema_metadata( - self, dataset_subtype: str, platform_urn: str, is_subject: bool + self, topic: str, platform_urn: str, is_subject: bool ) -> Optional[SchemaMetadata]: - logger.debug(f"Inside get_schema_metadata {dataset_subtype} {platform_urn}") + logger.debug(f"Inside get_schema_metadata {topic} {platform_urn}") # Process the value schema schema, fields = self._get_schema_and_fields( - dataset_subtype=dataset_subtype, + topic=topic, is_key_schema=False, is_subject=is_subject, ) # type: Tuple[Optional[Schema], List[SchemaField]] # Process the key schema key_schema, key_fields = self._get_schema_and_fields( - dataset_subtype=dataset_subtype, + topic=topic, is_key_schema=True, is_subject=is_subject, ) # type:Tuple[Optional[Schema], List[SchemaField]] @@ -435,7 +431,7 @@ def get_schema_metadata( md5_hash = md5(schema_as_string.encode()).hexdigest() return SchemaMetadata( - schemaName=dataset_subtype, + schemaName=topic, version=0, hash=md5_hash, platform=platform_urn, diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index ae055c51bb6be..0d718e509d5c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -308,7 +308,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if self.source_config.topic_patterns.allowed(topic): try: yield from self._extract_record( - topic, "", topic_detail, extra_topic_details.get(topic) + topic, False, topic_detail, extra_topic_details.get(topic) ) except Exception as e: logger.warning(f"Failed to extract topic {topic}", exc_info=True) @@ -322,7 +322,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for subject in self.schema_registry_client.get_subjects(): try: yield from self._extract_record( - "", subject, topic_detail=None, extra_topic_config=None + subject, True, topic_detail=None, extra_topic_config=None ) except Exception as e: logger.warning(f"Failed to extract subject {subject}", exc_info=True) @@ -333,25 +333,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def _extract_record( self, topic: str, - subject: str, + is_subject: bool, topic_detail: Optional[TopicMetadata], extra_topic_config: Optional[Dict[str, ConfigEntry]], ) -> Iterable[MetadataWorkUnit]: AVRO = "AVRO" - kafka_entity = topic if len(topic) != 0 else subject - is_subject = False if len(topic) != 0 else True + kafka_entity = "subject" if is_subject else "topic" - logger.debug(f"kafka entity name = {kafka_entity}") + logger.debug(f"extracting schema metadata from kafka entity = {kafka_entity}") platform_urn = make_data_platform_urn(self.platform) # 1. Create schemaMetadata aspect (pass control to SchemaRegistry) schema_metadata = self.schema_registry_client.get_schema_metadata( - kafka_entity, platform_urn, is_subject + topic, platform_urn, is_subject ) # topic can have no associated subject, but still it can be ingested without schema + # for schema ingestion, ingest only if it has valid schema if is_subject: if schema_metadata is None: return @@ -359,10 +359,7 @@ def _extract_record( else: dataset_name = topic - # dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic # 2. Create the default dataset snapshot for the topic. - # if schema_metadata is not None: - # dataset_name = schema_metadata.schemaName if len(topic) == 0 else topic dataset_urn = make_dataset_urn_with_platform_instance( platform=self.platform, name=dataset_name, @@ -386,17 +383,17 @@ def _extract_record( # build custom properties for topic, schema properties may be added as needed custom_props: Dict[str, str] = {} - if len(topic) != 0: + if not is_subject: custom_props = self.build_custom_properties( topic, topic_detail, extra_topic_config ) - schemaName: Optional[ + schema_name: Optional[ str ] = self.schema_registry_client._get_subject_for_topic( - dataset_subtype=topic, is_key_schema=False + topic, is_key_schema=False ) - if schemaName is not None: - custom_props["Schema Name"] = schemaName + if schema_name is not None: + custom_props["Schema Name"] = schema_name # 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro description: Optional[str] = None @@ -472,7 +469,7 @@ def _extract_record( yield MetadataWorkUnit(id=f"kafka-{kafka_entity}", mce=mce) # 7. Add the subtype aspect marking this as a "topic" or "schema" - typeName = DatasetSubTypes.TOPIC if len(topic) != 0 else DatasetSubTypes.SCHEMA + typeName = DatasetSubTypes.SCHEMA if is_subject else DatasetSubTypes.TOPIC yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=SubTypesClass(typeNames=[typeName]),