From 0c22184722bcfb1de955d95fdc25090c169496e9 Mon Sep 17 00:00:00 2001 From: chandresh-pancholi Date: Wed, 11 Sep 2019 01:19:45 +0530 Subject: [PATCH] Improve configuration parameters for Kafka #1359 Signed-off-by: chandresh-pancholi --- pkg/kafka/producer/config.go | 10 +- plugin/storage/kafka/options.go | 140 +++++++++++++++++++++++++-- plugin/storage/kafka/options_test.go | 106 +++++++++++++++++++- 3 files changed, 245 insertions(+), 11 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 4e254e12b1a..729332ba716 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -27,14 +27,20 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string - ProtocolVersion string + Brokers []string + RequiredAcks sarama.RequiredAcks + Compression sarama.CompressionCodec + CompressionLevel int + ProtocolVersion string auth.AuthenticationConfig } // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() + saramaConfig.Producer.RequiredAcks = c.RequiredAcks + saramaConfig.Producer.Compression = c.Compression + saramaConfig.Producer.CompressionLevel = c.CompressionLevel saramaConfig.Producer.Return.Successes = true c.AuthenticationConfig.SetConfiguration(saramaConfig) if len(c.ProtocolVersion) > 0 { diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 8b8ff0760fa..0fa1867e3e4 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -17,8 +17,10 @@ package kafka import ( "flag" "fmt" + "log" "strings" + "github.com/Shopify/sarama" "github.com/spf13/viper" "github.com/jaegertracing/jaeger/pkg/kafka/auth" @@ -33,19 +35,69 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka.producer" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixProtocolVersion = ".protocol-version" - suffixEncoding = ".encoding" - defaultBroker = "127.0.0.1:9092" - defaultTopic = "jaeger-spans" - defaultEncoding = EncodingProto + configPrefix = "kafka.producer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixEncoding = ".encoding" + suffixRequiredAcks = ".required-acks" + suffixCompression = ".compression" + suffixCompressionLevel = ".compression-level" + suffixProtocolVersion = ".protocol-version" + + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-spans" + defaultEncoding = EncodingProto + defaultRequiredAcks = "local" + defaultCompression = "none" + defaultCompressionLevel = 0 ) var ( // AllEncodings is a list of all supported encodings. AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift} + + //requiredAcks is mapping of sarama supported requiredAcks + requiredAcks = map[string]sarama.RequiredAcks{ + "noack": sarama.NoResponse, + "local": sarama.WaitForLocal, + "all": sarama.WaitForAll, + } + + // compressionModes is a mapping of supported CompressionType to compressionCodec along with default, min, max compression level + // https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Allow+fine-grained+configuration+for+compression + compressionModes = map[string]struct { + compressor sarama.CompressionCodec + defaultCompressionLevel int + minCompressionLevel int + maxCompressionLevel int + }{ + "none": { + compressor: sarama.CompressionNone, + defaultCompressionLevel: 0, + }, + "gzip": { + compressor: sarama.CompressionGZIP, + defaultCompressionLevel: 6, + minCompressionLevel: 1, + maxCompressionLevel: 9, + }, + "snappy": { + compressor: sarama.CompressionSnappy, + defaultCompressionLevel: 0, + }, + "lz4": { + compressor: sarama.CompressionLZ4, + defaultCompressionLevel: 9, + minCompressionLevel: 1, + maxCompressionLevel: 17, + }, + "zstd": { + compressor: sarama.CompressionZSTD, + defaultCompressionLevel: 3, + minCompressionLevel: -131072, + maxCompressionLevel: 22, + }, + } ) // Options stores the configuration options for Kafka @@ -74,6 +126,21 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) + flagSet.String( + configPrefix+suffixRequiredAcks, + defaultRequiredAcks, + "(experimental) Required kafka broker acknowledgement. i.e. noack, local, all", + ) + flagSet.String( + configPrefix+suffixCompression, + defaultCompression, + "(experimental) Type of compression (none, gzip, snappy, lz4, zstd) to use on messages", + ) + flagSet.Int( + configPrefix+suffixCompressionLevel, + defaultCompressionLevel, + "(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)", + ) auth.AddFlags(configPrefix, flagSet) } @@ -81,8 +148,28 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { func (opt *Options) InitFromViper(v *viper.Viper) { authenticationOptions := auth.AuthenticationConfig{} authenticationOptions.InitFromViper(configPrefix, v) + + requiredAcks, err := getRequiredAcks(v.GetString(configPrefix + suffixRequiredAcks)) + if err != nil { + log.Fatal(err) + } + + compressionMode := strings.ToLower(v.GetString(configPrefix + suffixCompression)) + compressionModeCodec, err := getCompressionMode(compressionMode) + if err != nil { + log.Fatal(err) + } + + compressionLevel, err := getCompressionLevel(compressionMode, v.GetInt(configPrefix+suffixCompressionLevel)) + if err != nil { + log.Fatal(err) + } + opt.config = producer.Configuration{ Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + RequiredAcks: requiredAcks, + Compression: compressionModeCodec, + CompressionLevel: compressionLevel, ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, } @@ -94,3 +181,40 @@ func (opt *Options) InitFromViper(v *viper.Viper) { func stripWhiteSpace(str string) string { return strings.Replace(str, " ", "", -1) } + +// getCompressionLevel to get compression level from compression type +func getCompressionLevel(mode string, compressionLevel int) (int, error) { + compressionModeData, ok := compressionModes[mode] + if !ok { + return 0, fmt.Errorf("cannot find compression mode for compressionMode %v", mode) + } + + if compressionLevel == defaultCompressionLevel { + return compressionModeData.defaultCompressionLevel, nil + } + + if compressionModeData.minCompressionLevel > compressionLevel || compressionModeData.maxCompressionLevel < compressionLevel { + return 0, fmt.Errorf("compression level %d for '%s' is not within valid range [%d, %d]", compressionLevel, mode, compressionModeData.minCompressionLevel, compressionModeData.maxCompressionLevel) + } + + return compressionLevel, nil +} + +//getCompressionMode maps input modes to sarama CompressionCodec +func getCompressionMode(mode string) (sarama.CompressionCodec, error) { + compressionMode, ok := compressionModes[mode] + if !ok { + return 0, fmt.Errorf("unknown compression mode: %v", mode) + } + + return compressionMode.compressor, nil +} + +//getRequiredAcks maps input ack values to sarama requiredAcks +func getRequiredAcks(acks string) (sarama.RequiredAcks, error) { + requiredAcks, ok := requiredAcks[strings.ToLower(acks)] + if !ok { + return 0, fmt.Errorf("unknown Required Ack: %s", acks) + } + return requiredAcks, nil +} diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index c54a767dd71..624bb3f8256 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -17,7 +17,9 @@ package kafka import ( "testing" + "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" ) @@ -28,12 +30,18 @@ func TestOptionsWithFlags(t *testing.T) { command.ParseFlags([]string{ "--kafka.producer.topic=topic1", "--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234", - "--kafka.producer.encoding=protobuf"}) + "--kafka.producer.encoding=protobuf", + "--kafka.producer.required-acks=local", + "--kafka.producer.compression=gzip", + "--kafka.producer.compression-level=6"}) opts.InitFromViper(v) assert.Equal(t, "topic1", opts.topic) assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers) assert.Equal(t, "protobuf", opts.encoding) + assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) + assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression) + assert.Equal(t, 6, opts.config.CompressionLevel) } func TestFlagDefaults(t *testing.T) { @@ -45,4 +53,100 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, defaultTopic, opts.topic) assert.Equal(t, []string{defaultBroker}, opts.config.Brokers) assert.Equal(t, defaultEncoding, opts.encoding) + assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) + assert.Equal(t, sarama.CompressionNone, opts.config.Compression) + assert.Equal(t, 0, opts.config.CompressionLevel) +} + +func TestCompressionLevelDefaults(t *testing.T) { + compressionLevel, err := getCompressionLevel("none", defaultCompressionLevel) + require.NoError(t, err) + assert.Equal(t, compressionModes["none"].defaultCompressionLevel, compressionLevel) + + compressionLevel, err = getCompressionLevel("gzip", defaultCompressionLevel) + require.NoError(t, err) + assert.Equal(t, compressionModes["gzip"].defaultCompressionLevel, compressionLevel) + + compressionLevel, err = getCompressionLevel("snappy", defaultCompressionLevel) + require.NoError(t, err) + assert.Equal(t, compressionModes["snappy"].defaultCompressionLevel, compressionLevel) + + compressionLevel, err = getCompressionLevel("lz4", defaultCompressionLevel) + require.NoError(t, err) + assert.Equal(t, compressionModes["lz4"].defaultCompressionLevel, compressionLevel) + + compressionLevel, err = getCompressionLevel("zstd", defaultCompressionLevel) + require.NoError(t, err) + assert.Equal(t, compressionModes["zstd"].defaultCompressionLevel, compressionLevel) +} + +func TestCompressionLevel(t *testing.T) { + compressionLevel, err := getCompressionLevel("none", 0) + require.NoError(t, err) + assert.Equal(t, compressionModes["none"].defaultCompressionLevel, compressionLevel) + + compressionLevel, err = getCompressionLevel("gzip", 4) + require.NoError(t, err) + assert.Equal(t, 4, compressionLevel) + + compressionLevel, err = getCompressionLevel("snappy", 0) + require.NoError(t, err) + assert.Equal(t, compressionModes["snappy"].defaultCompressionLevel, compressionLevel) + + compressionLevel, err = getCompressionLevel("lz4", 10) + require.NoError(t, err) + assert.Equal(t, 10, compressionLevel) + + compressionLevel, err = getCompressionLevel("zstd", 20) + require.NoError(t, err) + assert.Equal(t, 20, compressionLevel) +} + +func TestFailedCompressionLevelScenario(t *testing.T) { + _, err := getCompressionLevel("gzip", 14) + assert.Error(t, err) + + _, err = getCompressionLevel("lz4", 18) + assert.Error(t, err) + + _, err = getCompressionLevel("zstd", 25) + assert.Error(t, err) +} + +func TestCompressionModes(t *testing.T) { + compressionModes, err := getCompressionMode("gzip") + require.NoError(t, err) + assert.Equal(t, sarama.CompressionGZIP, compressionModes) + + compressionModes, err = getCompressionMode("snappy") + require.NoError(t, err) + assert.Equal(t, sarama.CompressionSnappy, compressionModes) + + compressionModes, err = getCompressionMode("none") + require.NoError(t, err) + assert.Equal(t, sarama.CompressionNone, compressionModes) +} + +func TestCompressionModeFailures(t *testing.T) { + _, err := getCompressionMode("test") + assert.Error(t, err) +} + +func TestRequiredAcks(t *testing.T) { + acks, err := getRequiredAcks("noack") + require.NoError(t, err) + assert.Equal(t, sarama.NoResponse, acks) + + acks, err = getRequiredAcks("local") + require.NoError(t, err) + assert.Equal(t, sarama.WaitForLocal, acks) + + acks, err = getRequiredAcks("all") + require.NoError(t, err) + assert.Equal(t, sarama.WaitForAll, acks) +} + +func TestRequiredAcksFailures(t *testing.T) { + _, err := getRequiredAcks("test") + assert.Error(t, err) }