diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java index 2af6d78b41612..e0685f329b0d5 100644 --- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java @@ -19,7 +19,6 @@ import io.opentelemetry.extension.annotations.WithSpan; import java.io.IOException; import java.util.Arrays; -import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -29,7 +28,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; - /** *

The topic names that this emits to can be controlled by constructing this with a {@link TopicConvention}. * If none is given, defaults to a {@link TopicConventionImpl} with the default delimiter of an underscore (_). @@ -38,32 +36,21 @@ public class KafkaEventProducer implements EventProducer { private final Producer _producer; - private final Optional _callback; private final TopicConvention _topicConvention; + private final KafkaHealthChecker _kafkaHealthChecker; /** * Constructor. * * @param producer The Kafka {@link Producer} to use * @param topicConvention the convention to use to get kafka topic names + * @param kafkaHealthChecker The {@link Callback} to invoke when the request is completed */ public KafkaEventProducer(@Nonnull final Producer producer, - @Nonnull final TopicConvention topicConvention) { - this(producer, topicConvention, null); - } - - /** - * Constructor. - * - * @param producer The Kafka {@link Producer} to use - * @param topicConvention the convention to use to get kafka topic names - * @param callback The {@link Callback} to invoke when the request is completed - */ - public KafkaEventProducer(@Nonnull final Producer producer, - @Nonnull final TopicConvention topicConvention, @Nullable final Callback callback) { + @Nonnull final TopicConvention topicConvention, @Nonnull final KafkaHealthChecker kafkaHealthChecker) { _producer = producer; - _callback = Optional.ofNullable(callback); _topicConvention = topicConvention; + _kafkaHealthChecker = kafkaHealthChecker; } @Override @@ -93,30 +80,16 @@ public void produceMetadataAuditEvent(@Nonnull Urn urn, @Nullable Snapshot oldSn try { log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataAuditEvent: %s", urn, - metadataAuditEvent.toString())); + metadataAuditEvent)); record = EventUtils.pegasusToAvroMAE(metadataAuditEvent); } catch (IOException e) { - log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent.toString()), e); + log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent), e); throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e); } - if (_callback.isPresent()) { - _producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record), - _callback.get()); - } else { - _producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record), - (metadata, e) -> { - if (e != null) { - log.error(String.format("Failed to emit MAE for entity with urn %s", urn), e); - } else { - log.debug(String.format("Successfully emitted MAE for entity with urn %s at offset %s, partition %s, topic %s", - urn, - metadata.offset(), - metadata.partition(), - metadata.topic())); - } - }); - } + String topic = _topicConvention.getMetadataAuditEventTopicName(); + _producer.send(new ProducerRecord(topic, urn.toString(), record), + _kafkaHealthChecker.getKafkaCallBack("MAE", urn.toString())); } @Override @@ -127,10 +100,10 @@ public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec try { log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataChangeLog: %s", urn, - metadataChangeLog.toString())); + metadataChangeLog)); record = EventUtils.pegasusToAvroMCL(metadataChangeLog); } catch (IOException e) { - log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog.toString()), e); + log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog), e); throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e); } @@ -138,22 +111,8 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog); if (aspectSpec.isTimeseries()) { topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName(); } - - if (_callback.isPresent()) { - _producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get()); - } else { - _producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> { - if (e != null) { - log.error(String.format("Failed to emit MCL for entity with urn %s", urn), e); - } else { - log.debug(String.format("Successfully emitted MCL for entity with urn %s at offset %s, partition %s, topic %s", - urn, - metadata.offset(), - metadata.partition(), - metadata.topic())); - } - }); - } + _producer.send(new ProducerRecord(topic, urn.toString(), record), + _kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString())); } @Override @@ -176,21 +135,8 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal); } String topic = _topicConvention.getMetadataChangeProposalTopicName(); - if (_callback.isPresent()) { - _producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get()); - } else { - _producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> { - if (e != null) { - log.error(String.format("Failed to emit MCP for entity with urn %s", urn), e); - } else { - log.debug(String.format("Successfully emitted MCP for entity with urn %s at offset %s, partition %s, topic %s", - urn, - metadata.offset(), - metadata.partition(), - metadata.topic())); - } - }); - } + _producer.send(new ProducerRecord(topic, urn.toString(), record), + _kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString())); } @Override @@ -199,25 +145,16 @@ public void producePlatformEvent(@Nonnull String name, @Nullable String key, @No try { log.debug(String.format("Converting Pegasus Event to Avro Event urn %s\nEvent: %s", name, - event.toString())); + event)); record = EventUtils.pegasusToAvroPE(event); } catch (IOException e) { - log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event.toString()), e); + log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event), e); throw new ModelConversionException("Failed to convert Pegasus Platform Event to Avro", e); } - final Callback callback = _callback.orElseGet(() -> (metadata, e) -> { - if (e != null) { - log.error(String.format("Failed to emit Platform Event for entity with name %s", name), e); - } else { - log.debug(String.format( - "Successfully emitted Platform Event for entity with name %s at offset %s, partition %s, topic %s", name, - metadata.offset(), metadata.partition(), metadata.topic())); - } - }); - final String topic = _topicConvention.getPlatformEventTopicName(); - _producer.send(new ProducerRecord(topic, key == null ? name : key, record), callback); + _producer.send(new ProducerRecord(topic, key == null ? name : key, record), + _kafkaHealthChecker.getKafkaCallBack("Platform Event", name)); } @VisibleForTesting diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaHealthChecker.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaHealthChecker.java new file mode 100644 index 0000000000000..9f956b399eaff --- /dev/null +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaHealthChecker.java @@ -0,0 +1,71 @@ +package com.linkedin.metadata.dao.producer; + +import com.codahale.metrics.MetricRegistry; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Callback; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.Set; + +@Slf4j +@EnableScheduling +@Component +public class KafkaHealthChecker { + + @Value("${kafka.producer.requestTimeout}") + private long kafkaProducerRequestTimeout; + + @Value("${kafka.producer.backoffTimeout}") + private long kafkaProducerBackOffTimeout; + + private static long lastMoment = 0L; + private Set messagesInProgress = new HashSet<>(); + + private synchronized long getNextUniqueMoment() { + long moment = System.currentTimeMillis(); + lastMoment = moment != lastMoment ? moment : ++lastMoment; + return lastMoment; + } + + public Callback getKafkaCallBack(String eventType, String entityDesc) { + long moment = getNextUniqueMoment(); + sendMessageStarted(moment); + return (metadata, e) -> { + sendMessageEnded(moment); + if (e != null) { + log.error(String.format("Failed to emit %s for entity %s", eventType, entityDesc), e); + MetricUtils.counter(this.getClass(), + MetricRegistry.name("producer_failed_count", eventType.replaceAll(" ", "_"))).inc(); + } else { + log.debug(String.format( + "Successfully emitted %s for entity %s at offset %s, partition %s, topic %s", + eventType, entityDesc, metadata.offset(), metadata.partition(), metadata.topic())); + } + }; + } + + private synchronized void sendMessageStarted(long uniqueMessageId) { + messagesInProgress.add(uniqueMessageId); + } + + private synchronized void sendMessageEnded(long uniqueMessageId) { + messagesInProgress.remove(uniqueMessageId); + } + + @Scheduled(cron = "0/15 * * * * ?") + private synchronized void periodicKafkaHealthChecker() { + long moment = System.currentTimeMillis(); + long count = messagesInProgress.stream() + .filter(item -> item + kafkaProducerRequestTimeout + kafkaProducerBackOffTimeout < moment) + .count(); + if (count > 0) { + log.error("Kafka Health Check Failed. %d message(s) is(are) waiting to be sent.", count); + } + } + +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java index a8f876dc9e9a9..01d4c15ba2d38 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java @@ -2,6 +2,7 @@ import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.metadata.dao.producer.KafkaEventProducer; +import com.linkedin.metadata.dao.producer.KafkaHealthChecker; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -20,15 +21,17 @@ public class EntityServiceFactory { @Bean(name = "entityService") - @DependsOn({"entityAspectDao", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"}) + @DependsOn({"entityAspectDao", "kafkaEventProducer", "kafkaHealthChecker", + TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"}) @Nonnull protected EntityService createInstance( Producer producer, TopicConvention convention, + KafkaHealthChecker kafkaHealthChecker, @Qualifier("entityAspectDao") AspectDao aspectDao, EntityRegistry entityRegistry) { - final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention); + final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention, kafkaHealthChecker); return new EntityService(aspectDao, eventProducer, entityRegistry); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java index cdb230724b85f..5aa67ba5d2817 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java @@ -3,6 +3,7 @@ import com.linkedin.gms.factory.common.TopicConventionFactory; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; import com.linkedin.metadata.dao.producer.KafkaEventProducer; +import com.linkedin.metadata.dao.producer.KafkaHealthChecker; import com.linkedin.mxe.TopicConvention; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.Producer; @@ -30,10 +31,14 @@ public class DataHubKafkaEventProducerFactory { @Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN) private TopicConvention topicConvention; + @Autowired + private KafkaHealthChecker kafkaHealthChecker; + @Bean(name = "kafkaEventProducer") protected KafkaEventProducer createInstance() { return new KafkaEventProducer( kafkaProducer, - topicConvention); + topicConvention, + kafkaHealthChecker); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java index 0549ed2fc2c8b..47215dd494cf4 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java @@ -35,6 +35,18 @@ public class DataHubKafkaProducerFactory { @Value("${kafka.schemaRegistry.type}") private String schemaRegistryType; + @Value("${kafka.producer.retryCount}") + private String kafkaProducerRetryCount; + + @Value("${kafka.producer.deliveryTimeout}") + private String kafkaProducerDeliveryTimeout; + + @Value("${kafka.producer.requestTimeout}") + private String kafkaProducerRequestTimeout; + + @Value("${kafka.producer.backoffTimeout}") + private String kafkaProducerBackOffTimeout; + @Autowired @Lazy @Qualifier("kafkaSchemaRegistry") @@ -66,6 +78,11 @@ protected Producer createInstance(KafkaProperties propert props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName()); + props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetryCount); + props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeout); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeout); + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerBackOffTimeout); + // Override KafkaProperties with SchemaRegistryConfig only for non-empty values schemaRegistryConfig.getProperties().entrySet() .stream() diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index 85a1735d221e3..5ba34375d6cd3 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -167,6 +167,11 @@ kafka: listener: concurrency: ${KAFKA_LISTENER_CONCURRENCY:1} bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092} + producer: + retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3} + deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:15000} + requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000} + backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500} schemaRegistry: type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # KAFKA or AWS_GLUE url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} # Application only for type = kafka