Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/kafka-perf-batch' into kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
otherpirate committed Aug 9, 2018
2 parents 2a4267e + 223a128 commit 8993c75
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
default:
topicName = k.Topic
}

return topicName
}

Expand Down Expand Up @@ -262,21 +263,37 @@ func (k *Kafka) Description() string {
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
batches := make(map[string]map[string][]telegraf.Metric)
for _, metric := range metrics {
buf, err := k.serializer.Serialize(metric)
if err != nil {
return err
topicName := k.GetTopicName(metric)
if _, ok := batches[topicName]; !ok {
batches[topicName] = make(map[string][]telegraf.Metric)
}

m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
key, _ := metric.GetTag(k.RoutingTag)
if _, ok := batches[topicName][key]; !ok {
batches[topicName][key] = make([]telegraf.Metric, 0)
}
if h, ok := metric.GetTag(k.RoutingTag); ok {
m.Key = sarama.StringEncoder(h)
batches[topicName][key] = append(batches[topicName][key], metric)
}

msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for topicName, v := range batches {
for routingKey, metrics := range v {

buf, err := k.serializer.SerializeBatch(metrics)
if err != nil {
return err
}

m := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.ByteEncoder(buf),
Key: sarama.StringEncoder(routingKey),
}

msgs = append(msgs, m)
}
msgs = append(msgs, m)
}

err := k.producer.SendMessages(msgs)
Expand All @@ -289,7 +306,6 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
}
return err
}

return nil
}

Expand Down

0 comments on commit 8993c75

Please sign in to comment.