Skip to content

Commit

Permalink
feat(kafka): expose default kafka producer mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
djordje-mijatovic committed Dec 5, 2022
1 parent 7d32deb commit e328b16
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.dao.producer;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -37,6 +39,8 @@ public Callback getKafkaCallBack(String eventType, String entityDesc) {
sendMessageEnded(moment);
if (e != null) {
log.error(String.format("Failed to emit %s for entity %s", eventType, entityDesc), e);
MetricUtils.counter(this.getClass(),
MetricRegistry.name("producer_failed_count", eventType.replaceAll(" ", "_"))).inc();
} else {
log.debug(String.format(
"Successfully emitted %s for entity %s at offset %s, partition %s, topic %s",
Expand Down
6 changes: 3 additions & 3 deletions metadata-service/factories/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ kafka:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
producer:
retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:20}
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:300000}
requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:30000}
retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3}
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:15000}
requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000}
backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500}
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # KAFKA or AWS_GLUE
Expand Down

0 comments on commit e328b16

Please sign in to comment.