Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): expose default kafka producer mechanism #6381

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.opentelemetry.extension.annotations.WithSpan;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,7 +28,6 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
* <p>The topic names that this emits to can be controlled by constructing this with a {@link TopicConvention}.
* If none is given, defaults to a {@link TopicConventionImpl} with the default delimiter of an underscore (_).
Expand All @@ -38,32 +36,21 @@
public class KafkaEventProducer implements EventProducer {

private final Producer<String, ? extends IndexedRecord> _producer;
private final Optional<Callback> _callback;
private final TopicConvention _topicConvention;
private final KafkaHealthChecker _kafkaHealthChecker;

/**
* Constructor.
*
* @param producer The Kafka {@link Producer} to use
* @param topicConvention the convention to use to get kafka topic names
* @param kafkaHealthChecker The {@link Callback} to invoke when the request is completed
*/
public KafkaEventProducer(@Nonnull final Producer<String, ? extends IndexedRecord> producer,
@Nonnull final TopicConvention topicConvention) {
this(producer, topicConvention, null);
}

/**
* Constructor.
*
* @param producer The Kafka {@link Producer} to use
* @param topicConvention the convention to use to get kafka topic names
* @param callback The {@link Callback} to invoke when the request is completed
*/
public KafkaEventProducer(@Nonnull final Producer<String, ? extends IndexedRecord> producer,
@Nonnull final TopicConvention topicConvention, @Nullable final Callback callback) {
@Nonnull final TopicConvention topicConvention, @Nonnull final KafkaHealthChecker kafkaHealthChecker) {
_producer = producer;
_callback = Optional.ofNullable(callback);
_topicConvention = topicConvention;
_kafkaHealthChecker = kafkaHealthChecker;
}

@Override
Expand Down Expand Up @@ -93,30 +80,16 @@ public void produceMetadataAuditEvent(@Nonnull Urn urn, @Nullable Snapshot oldSn
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataAuditEvent: %s",
urn,
metadataAuditEvent.toString()));
metadataAuditEvent));
record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent.toString()), e);
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
_callback.get());
} else {
_producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
(metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MAE for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MAE for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
String topic = _topicConvention.getMetadataAuditEventTopicName();
_producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MAE", urn.toString()));
}

@Override
Expand All @@ -127,33 +100,19 @@ public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataChangeLog: %s",
urn,
metadataChangeLog.toString()));
metadataChangeLog));
record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog.toString()), e);
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}

String topic = _topicConvention.getMetadataChangeLogVersionedTopicName();
if (aspectSpec.isTimeseries()) {
topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName();
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
} else {
_producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MCL for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MCL for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
_producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString()));
}

@Override
Expand All @@ -176,21 +135,8 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal);
}

String topic = _topicConvention.getMetadataChangeProposalTopicName();
if (_callback.isPresent()) {
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
} else {
_producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MCP for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MCP for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
_producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString()));
}

@Override
Expand All @@ -199,25 +145,16 @@ public void producePlatformEvent(@Nonnull String name, @Nullable String key, @No
try {
log.debug(String.format("Converting Pegasus Event to Avro Event urn %s\nEvent: %s",
name,
event.toString()));
event));
record = EventUtils.pegasusToAvroPE(event);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event.toString()), e);
log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event), e);
throw new ModelConversionException("Failed to convert Pegasus Platform Event to Avro", e);
}

final Callback callback = _callback.orElseGet(() -> (metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit Platform Event for entity with name %s", name), e);
} else {
log.debug(String.format(
"Successfully emitted Platform Event for entity with name %s at offset %s, partition %s, topic %s", name,
metadata.offset(), metadata.partition(), metadata.topic()));
}
});

final String topic = _topicConvention.getPlatformEventTopicName();
_producer.send(new ProducerRecord(topic, key == null ? name : key, record), callback);
_producer.send(new ProducerRecord(topic, key == null ? name : key, record),
_kafkaHealthChecker.getKafkaCallBack("Platform Event", name));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.linkedin.metadata.dao.producer;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

@Slf4j
@EnableScheduling
@Component
public class KafkaHealthChecker {

@Value("${kafka.producer.requestTimeout}")
private long kafkaProducerRequestTimeout;

@Value("${kafka.producer.backoffTimeout}")
private long kafkaProducerBackOffTimeout;

private static long lastMoment = 0L;
private Set<Long> messagesInProgress = new HashSet<>();

private synchronized long getNextUniqueMoment() {
long moment = System.currentTimeMillis();
lastMoment = moment != lastMoment ? moment : ++lastMoment;
return lastMoment;
}

public Callback getKafkaCallBack(String eventType, String entityDesc) {
long moment = getNextUniqueMoment();
sendMessageStarted(moment);
return (metadata, e) -> {
sendMessageEnded(moment);
if (e != null) {
log.error(String.format("Failed to emit %s for entity %s", eventType, entityDesc), e);
RyanHolstien marked this conversation as resolved.
Show resolved Hide resolved
MetricUtils.counter(this.getClass(),
MetricRegistry.name("producer_failed_count", eventType.replaceAll(" ", "_"))).inc();
} else {
log.debug(String.format(
"Successfully emitted %s for entity %s at offset %s, partition %s, topic %s",
eventType, entityDesc, metadata.offset(), metadata.partition(), metadata.topic()));
}
};
}

private synchronized void sendMessageStarted(long uniqueMessageId) {
messagesInProgress.add(uniqueMessageId);
}

private synchronized void sendMessageEnded(long uniqueMessageId) {
messagesInProgress.remove(uniqueMessageId);
}

@Scheduled(cron = "0/15 * * * * ?")
private synchronized void periodicKafkaHealthChecker() {
long moment = System.currentTimeMillis();
long count = messagesInProgress.stream()
.filter(item -> item + kafkaProducerRequestTimeout + kafkaProducerBackOffTimeout < moment)
.count();
if (count > 0) {
log.error("Kafka Health Check Failed. %d message(s) is(are) waiting to be sent.", count);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand All @@ -20,15 +21,17 @@
public class EntityServiceFactory {

@Bean(name = "entityService")
@DependsOn({"entityAspectDao", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"})
@DependsOn({"entityAspectDao", "kafkaEventProducer", "kafkaHealthChecker",
TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"})
@Nonnull
protected EntityService createInstance(
Producer<String, ? extends IndexedRecord> producer,
TopicConvention convention,
KafkaHealthChecker kafkaHealthChecker,
@Qualifier("entityAspectDao") AspectDao aspectDao,
EntityRegistry entityRegistry) {

final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention);
final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention, kafkaHealthChecker);
return new EntityService(aspectDao, eventProducer, entityRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.mxe.TopicConvention;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -30,10 +31,14 @@ public class DataHubKafkaEventProducerFactory {
@Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
private TopicConvention topicConvention;

@Autowired
private KafkaHealthChecker kafkaHealthChecker;

@Bean(name = "kafkaEventProducer")
protected KafkaEventProducer createInstance() {
return new KafkaEventProducer(
kafkaProducer,
topicConvention);
topicConvention,
kafkaHealthChecker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ public class DataHubKafkaProducerFactory {
@Value("${kafka.schemaRegistry.type}")
private String schemaRegistryType;

@Value("${kafka.producer.retryCount}")
private String kafkaProducerRetryCount;

@Value("${kafka.producer.deliveryTimeout}")
private String kafkaProducerDeliveryTimeout;

@Value("${kafka.producer.requestTimeout}")
private String kafkaProducerRequestTimeout;

@Value("${kafka.producer.backoffTimeout}")
private String kafkaProducerBackOffTimeout;

@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
Expand Down Expand Up @@ -66,6 +78,11 @@ protected Producer<String, IndexedRecord> createInstance(KafkaProperties propert

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());

props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetryCount);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeout);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeout);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerBackOffTimeout);

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet()
.stream()
Expand Down
5 changes: 5 additions & 0 deletions metadata-service/factories/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ kafka:
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
producer:
retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3}
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:15000}
requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000}
backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500}
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} # Application only for type = kafka
Expand Down