Skip to content

Commit

Permalink
Improve configuration parameters for Kafka #1359
Browse files Browse the repository at this point in the history
Signed-off-by: chandresh-pancholi <[email protected]>
  • Loading branch information
chandresh-pancholi committed Sep 10, 2019
1 parent 464acd0 commit 0c22184
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 11 deletions.
10 changes: 8 additions & 2 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
ProtocolVersion string
Brokers []string
RequiredAcks sarama.RequiredAcks
Compression sarama.CompressionCodec
CompressionLevel int
ProtocolVersion string
auth.AuthenticationConfig
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = c.RequiredAcks
saramaConfig.Producer.Compression = c.Compression
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true
c.AuthenticationConfig.SetConfiguration(saramaConfig)
if len(c.ProtocolVersion) > 0 {
Expand Down
140 changes: 132 additions & 8 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package kafka
import (
"flag"
"fmt"
"log"
"strings"

"github.com/Shopify/sarama"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
Expand All @@ -33,19 +35,69 @@ const (
// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
EncodingZipkinThrift = "zipkin-thrift"

configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixProtocolVersion = ".protocol-version"
suffixEncoding = ".encoding"
defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
suffixRequiredAcks = ".required-acks"
suffixCompression = ".compression"
suffixCompressionLevel = ".compression-level"
suffixProtocolVersion = ".protocol-version"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultRequiredAcks = "local"
defaultCompression = "none"
defaultCompressionLevel = 0
)

var (
// AllEncodings is a list of all supported encodings.
AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}

//requiredAcks is mapping of sarama supported requiredAcks
requiredAcks = map[string]sarama.RequiredAcks{
"noack": sarama.NoResponse,
"local": sarama.WaitForLocal,
"all": sarama.WaitForAll,
}

// compressionModes is a mapping of supported CompressionType to compressionCodec along with default, min, max compression level
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Allow+fine-grained+configuration+for+compression
compressionModes = map[string]struct {
compressor sarama.CompressionCodec
defaultCompressionLevel int
minCompressionLevel int
maxCompressionLevel int
}{
"none": {
compressor: sarama.CompressionNone,
defaultCompressionLevel: 0,
},
"gzip": {
compressor: sarama.CompressionGZIP,
defaultCompressionLevel: 6,
minCompressionLevel: 1,
maxCompressionLevel: 9,
},
"snappy": {
compressor: sarama.CompressionSnappy,
defaultCompressionLevel: 0,
},
"lz4": {
compressor: sarama.CompressionLZ4,
defaultCompressionLevel: 9,
minCompressionLevel: 1,
maxCompressionLevel: 17,
},
"zstd": {
compressor: sarama.CompressionZSTD,
defaultCompressionLevel: 3,
minCompressionLevel: -131072,
maxCompressionLevel: 22,
},
}
)

// Options stores the configuration options for Kafka
Expand Down Expand Up @@ -74,15 +126,50 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
flagSet.String(
configPrefix+suffixRequiredAcks,
defaultRequiredAcks,
"(experimental) Required kafka broker acknowledgement. i.e. noack, local, all",
)
flagSet.String(
configPrefix+suffixCompression,
defaultCompression,
"(experimental) Type of compression (none, gzip, snappy, lz4, zstd) to use on messages",
)
flagSet.Int(
configPrefix+suffixCompressionLevel,
defaultCompressionLevel,
"(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)",
)
auth.AddFlags(configPrefix, flagSet)
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
authenticationOptions := auth.AuthenticationConfig{}
authenticationOptions.InitFromViper(configPrefix, v)

requiredAcks, err := getRequiredAcks(v.GetString(configPrefix + suffixRequiredAcks))
if err != nil {
log.Fatal(err)
}

compressionMode := strings.ToLower(v.GetString(configPrefix + suffixCompression))
compressionModeCodec, err := getCompressionMode(compressionMode)
if err != nil {
log.Fatal(err)
}

compressionLevel, err := getCompressionLevel(compressionMode, v.GetInt(configPrefix+suffixCompressionLevel))
if err != nil {
log.Fatal(err)
}

opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
RequiredAcks: requiredAcks,
Compression: compressionModeCodec,
CompressionLevel: compressionLevel,
ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion),
AuthenticationConfig: authenticationOptions,
}
Expand All @@ -94,3 +181,40 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
}

// getCompressionLevel to get compression level from compression type
func getCompressionLevel(mode string, compressionLevel int) (int, error) {
compressionModeData, ok := compressionModes[mode]
if !ok {
return 0, fmt.Errorf("cannot find compression mode for compressionMode %v", mode)
}

if compressionLevel == defaultCompressionLevel {
return compressionModeData.defaultCompressionLevel, nil
}

if compressionModeData.minCompressionLevel > compressionLevel || compressionModeData.maxCompressionLevel < compressionLevel {
return 0, fmt.Errorf("compression level %d for '%s' is not within valid range [%d, %d]", compressionLevel, mode, compressionModeData.minCompressionLevel, compressionModeData.maxCompressionLevel)
}

return compressionLevel, nil
}

//getCompressionMode maps input modes to sarama CompressionCodec
func getCompressionMode(mode string) (sarama.CompressionCodec, error) {
compressionMode, ok := compressionModes[mode]
if !ok {
return 0, fmt.Errorf("unknown compression mode: %v", mode)
}

return compressionMode.compressor, nil
}

//getRequiredAcks maps input ack values to sarama requiredAcks
func getRequiredAcks(acks string) (sarama.RequiredAcks, error) {
requiredAcks, ok := requiredAcks[strings.ToLower(acks)]
if !ok {
return 0, fmt.Errorf("unknown Required Ack: %s", acks)
}
return requiredAcks, nil
}
106 changes: 105 additions & 1 deletion plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package kafka
import (
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
)
Expand All @@ -28,12 +30,18 @@ func TestOptionsWithFlags(t *testing.T) {
command.ParseFlags([]string{
"--kafka.producer.topic=topic1",
"--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.producer.encoding=protobuf"})
"--kafka.producer.encoding=protobuf",
"--kafka.producer.required-acks=local",
"--kafka.producer.compression=gzip",
"--kafka.producer.compression-level=6"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression)
assert.Equal(t, 6, opts.config.CompressionLevel)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -45,4 +53,100 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, defaultTopic, opts.topic)
assert.Equal(t, []string{defaultBroker}, opts.config.Brokers)
assert.Equal(t, defaultEncoding, opts.encoding)
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionNone, opts.config.Compression)
assert.Equal(t, 0, opts.config.CompressionLevel)
}

func TestCompressionLevelDefaults(t *testing.T) {
compressionLevel, err := getCompressionLevel("none", defaultCompressionLevel)
require.NoError(t, err)
assert.Equal(t, compressionModes["none"].defaultCompressionLevel, compressionLevel)

compressionLevel, err = getCompressionLevel("gzip", defaultCompressionLevel)
require.NoError(t, err)
assert.Equal(t, compressionModes["gzip"].defaultCompressionLevel, compressionLevel)

compressionLevel, err = getCompressionLevel("snappy", defaultCompressionLevel)
require.NoError(t, err)
assert.Equal(t, compressionModes["snappy"].defaultCompressionLevel, compressionLevel)

compressionLevel, err = getCompressionLevel("lz4", defaultCompressionLevel)
require.NoError(t, err)
assert.Equal(t, compressionModes["lz4"].defaultCompressionLevel, compressionLevel)

compressionLevel, err = getCompressionLevel("zstd", defaultCompressionLevel)
require.NoError(t, err)
assert.Equal(t, compressionModes["zstd"].defaultCompressionLevel, compressionLevel)
}

func TestCompressionLevel(t *testing.T) {
compressionLevel, err := getCompressionLevel("none", 0)
require.NoError(t, err)
assert.Equal(t, compressionModes["none"].defaultCompressionLevel, compressionLevel)

compressionLevel, err = getCompressionLevel("gzip", 4)
require.NoError(t, err)
assert.Equal(t, 4, compressionLevel)

compressionLevel, err = getCompressionLevel("snappy", 0)
require.NoError(t, err)
assert.Equal(t, compressionModes["snappy"].defaultCompressionLevel, compressionLevel)

compressionLevel, err = getCompressionLevel("lz4", 10)
require.NoError(t, err)
assert.Equal(t, 10, compressionLevel)

compressionLevel, err = getCompressionLevel("zstd", 20)
require.NoError(t, err)
assert.Equal(t, 20, compressionLevel)
}

func TestFailedCompressionLevelScenario(t *testing.T) {
_, err := getCompressionLevel("gzip", 14)
assert.Error(t, err)

_, err = getCompressionLevel("lz4", 18)
assert.Error(t, err)

_, err = getCompressionLevel("zstd", 25)
assert.Error(t, err)
}

func TestCompressionModes(t *testing.T) {
compressionModes, err := getCompressionMode("gzip")
require.NoError(t, err)
assert.Equal(t, sarama.CompressionGZIP, compressionModes)

compressionModes, err = getCompressionMode("snappy")
require.NoError(t, err)
assert.Equal(t, sarama.CompressionSnappy, compressionModes)

compressionModes, err = getCompressionMode("none")
require.NoError(t, err)
assert.Equal(t, sarama.CompressionNone, compressionModes)
}

func TestCompressionModeFailures(t *testing.T) {
_, err := getCompressionMode("test")
assert.Error(t, err)
}

func TestRequiredAcks(t *testing.T) {
acks, err := getRequiredAcks("noack")
require.NoError(t, err)
assert.Equal(t, sarama.NoResponse, acks)

acks, err = getRequiredAcks("local")
require.NoError(t, err)
assert.Equal(t, sarama.WaitForLocal, acks)

acks, err = getRequiredAcks("all")
require.NoError(t, err)
assert.Equal(t, sarama.WaitForAll, acks)
}

func TestRequiredAcksFailures(t *testing.T) {
_, err := getRequiredAcks("test")
assert.Error(t, err)
}

0 comments on commit 0c22184

Please sign in to comment.