Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic tag options to kafka output #7142

Merged
merged 2 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"log"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -83,6 +85,7 @@ const (
defaultMaxUndeliveredMessages = 1000
defaultMaxMessageLen = 1000000
defaultConsumerGroup = "telegraf_metrics_consumers"
reconnectDelay = 5 * time.Second
)

type empty struct{}
Expand Down Expand Up @@ -259,6 +262,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
err := k.consumer.Consume(ctx, k.Topics, handler)
if err != nil {
acc.AddError(err)
internal.SleepContext(ctx, reconnectDelay)
}
}
err = k.consumer.Close()
Expand Down
7 changes: 7 additions & 0 deletions plugins/outputs/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## Kafka topic for producer messages
topic = "telegraf"

## The value of this tag will be used as the topic. If not set the 'topic'
## option is used.
# topic_tag = ""

## If true, the 'topic_tag' will be removed from to the metric.
# exclude_topic_tag = false

## Optional Client id
# client_id = "Telegraf"

Expand Down
57 changes: 43 additions & 14 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ var zeroTime = time.Unix(0, 0)

type (
Kafka struct {
Brokers []string
Topic string
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
TopicTag string `toml:"topic_tag"`
ExcludeTopicTag bool `toml:"exclude_topic_tag"`
ClientID string `toml:"client_id"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
CompressionCodec int
RequiredAcks int
MaxRetry int
MaxMessageBytes int `toml:"max_message_bytes"`
CompressionCodec int `toml:"compression_codec"`
RequiredAcks int `toml:"required_acks"`
MaxRetry int `toml:"max_retry"`
MaxMessageBytes int `toml:"max_message_bytes"`

Version string `toml:"version"`

Expand All @@ -57,7 +59,9 @@ type (
Log telegraf.Logger `toml:"-"`

tlsConfig tls.Config
producer sarama.SyncProducer

producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error)
producer sarama.SyncProducer

serializer serializers.Serializer
}
Expand Down Expand Up @@ -94,6 +98,13 @@ var sampleConfig = `
## Kafka topic for producer messages
topic = "telegraf"

## The value of this tag will be used as the topic. If not set the 'topic'
## option is used.
# topic_tag = ""

## If true, the 'topic_tag' will be removed from to the metric.
# exclude_topic_tag = false

## Optional Client id
# client_id = "Telegraf"

Expand Down Expand Up @@ -212,14 +223,29 @@ func ValidateTopicSuffixMethod(method string) error {
return fmt.Errorf("Unknown topic suffix method provided: %s", method)
}

func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
topic := k.Topic
if k.TopicTag != "" {
if t, ok := metric.GetTag(k.TopicTag); ok {
topic = t

// If excluding the topic tag, a copy is required to avoid modifying
// the metric buffer.
if k.ExcludeTopicTag {
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(k.TopicTag)
}
}
}

var topicName string
switch k.TopicSuffix.Method {
case "measurement":
topicName = k.Topic + k.TopicSuffix.Separator + metric.Name()
topicName = topic + k.TopicSuffix.Separator + metric.Name()
case "tags":
var topicNameComponents []string
topicNameComponents = append(topicNameComponents, k.Topic)
topicNameComponents = append(topicNameComponents, topic)
for _, tag := range k.TopicSuffix.Keys {
tagValue := metric.Tags()[tag]
if tagValue != "" {
Expand All @@ -228,9 +254,9 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string {
}
topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator)
default:
topicName = k.Topic
topicName = topic
}
return topicName
return metric, topicName
}

func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
Expand Down Expand Up @@ -306,7 +332,7 @@ func (k *Kafka) Connect() error {
config.Net.SASL.Version = version
}

producer, err := sarama.NewSyncProducer(k.Brokers, config)
producer, err := k.producerFunc(k.Brokers, config)
if err != nil {
return err
}
Expand Down Expand Up @@ -348,14 +374,16 @@ func (k *Kafka) routingKey(metric telegraf.Metric) (string, error) {
func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for _, metric := range metrics {
metric, topic := k.GetTopicName(metric)

buf, err := k.serializer.Serialize(metric)
if err != nil {
k.Log.Debugf("Could not serialize metric: %v", err)
continue
}

m := &sarama.ProducerMessage{
Topic: k.GetTopicName(metric),
Topic: topic,
Value: sarama.ByteEncoder(buf),
}

Expand Down Expand Up @@ -403,6 +431,7 @@ func init() {
return &Kafka{
MaxRetry: 3,
RequiredAcks: -1,
producerFunc: sarama.NewSyncProducer,
}
})
}
146 changes: 145 additions & 1 deletion plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestTopicSuffixes(t *testing.T) {
TopicSuffix: topicSuffix,
}

topic := k.GetTopicName(metric)
_, topic := k.GetTopicName(metric)
require.Equal(t, expectedTopic, topic)
}
}
Expand Down Expand Up @@ -156,3 +157,146 @@ func TestRoutingKey(t *testing.T) {
})
}
}

type MockProducer struct {
sent []*sarama.ProducerMessage
}

func (p *MockProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
p.sent = append(p.sent, msg)
return 0, 0, nil
}

func (p *MockProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
p.sent = append(p.sent, msgs...)
return nil
}

func (p *MockProducer) Close() error {
return nil
}

func NewMockProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) {
return &MockProducer{}, nil
}

func TestTopicTag(t *testing.T) {
tests := []struct {
name string
plugin *Kafka
input []telegraf.Metric
topic string
value string
}{
{
name: "static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "telegraf",
value: "cpu time_idle=42 0\n",
},
{
name: "topic tag overrides static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "xyzzy",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "xyzzy",
value: "cpu,topic=xyzzy time_idle=42 0\n",
},
{
name: "missing topic tag falls back to static topic",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "telegraf",
value: "cpu time_idle=42 0\n",
},
{
name: "exclude topic tag removes tag",
plugin: &Kafka{
Brokers: []string{"127.0.0.1"},
Topic: "telegraf",
TopicTag: "topic",
ExcludeTopicTag: true,
producerFunc: NewMockProducer,
},
input: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "xyzzy",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
},
topic: "xyzzy",
value: "cpu time_idle=42 0\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
tt.plugin.SetSerializer(s)

err = tt.plugin.Connect()
require.NoError(t, err)

producer := &MockProducer{}
tt.plugin.producer = producer

err = tt.plugin.Write(tt.input)
require.NoError(t, err)

require.Equal(t, tt.topic, producer.sent[0].Topic)

encoded, err := producer.sent[0].Value.Encode()
require.NoError(t, err)
require.Equal(t, tt.value, string(encoded))
})
}
}