From 8607bc7cb67074fccc59c9964420e18b4840239b Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Thu, 19 Jul 2018 00:36:34 -0400 Subject: [PATCH 01/12] Export consumer Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 106 +++++++++++++----- cmd/ingester/app/consumer/consumer_metrics.go | 4 +- cmd/ingester/app/consumer/consumer_test.go | 8 +- 3 files changed, 82 insertions(+), 36 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index eee1f6d3e5e..3d03ec21341 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -26,7 +26,31 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" ) -type consumer struct { +// SaramaConsumer is an interface to features of Sarama that we use +type SaramaConsumer interface { + Partitions() <-chan sc.PartitionConsumer + MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + io.Closer +} + +// Config stores the configuration for a Consumer +type Config struct { + Topic string `yaml:"topic"` + GroupID string `yaml:"group_id"` + Brokers []string `yaml:"brokers"` + Parallelism int `yaml:"parallelism"` +} + +// Params are the parameters of a Consumer +type Params struct { + Config Config + Processor processor.SpanProcessor + Factory metrics.Factory `name:"service_metrics"` + Logger *zap.Logger +} + +// Consumer uses sarama to consume messages from kafka and handle +type Consumer struct { metricsFactory metrics.Factory logger *zap.Logger processorFactory processorFactory @@ -37,34 +61,62 @@ type consumer struct { SaramaConsumer } -// SaramaConsumer is an interface to features of Sarama that we use -type SaramaConsumer interface { - Partitions() <-chan sc.PartitionConsumer - MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) - io.Closer +// New is a constructor for a Consumer +func New(params Params) (Consumer, error) { + saramaConfig := sc.NewConfig() + saramaConfig.Group.Mode = sc.ConsumerModePartitions + saramaConsumer, err := sc.NewConsumer(params.Config.Brokers, params.Config.GroupID, []string{params.Config.Topic}, saramaConfig) + if err != nil { + return Consumer{}, err + } + return Consumer{ + metricsFactory: params.Factory, + logger: params.Logger, + close: make(chan struct{}, 1), + isClosed: sync.WaitGroup{}, + SaramaConsumer: saramaConsumer, + processorFactory: processorFactory{ + topic: params.Config.Topic, + consumer: saramaConsumer, + metricsFactory: params.Factory, + logger: params.Logger, + baseProcessor: params.Processor, + parallelism: params.Config.Parallelism, + }, + }, nil } -func (c *consumer) mainLoop() { +// Start begins consuming messages in a go routine +func (c *Consumer) Start() { c.isClosed.Add(1) c.logger.Info("Starting main loop") - go func() { - for { - select { - case pc := <-c.Partitions(): - c.isClosed.Add(2) - - go c.handleMessages(pc) - go c.handleErrors(pc.Partition(), pc.Errors()) - - case <-c.close: - c.isClosed.Done() - return - } + go c.mainLoop() +} + +// Close closes the Consumer and underlying sarama consumer +func (c *Consumer) Close() error { + close(c.close) + c.isClosed.Wait() + return c.SaramaConsumer.Close() +} + +func (c *Consumer) mainLoop() { + for { + select { + case pc := <-c.Partitions(): + c.isClosed.Add(2) + + go c.handleMessages(pc) + go c.handleErrors(pc.Partition(), pc.Errors()) + + case <-c.close: + c.isClosed.Done() + return } - }() + } } -func (c *consumer) handleMessages(pc sc.PartitionConsumer) { +func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Info("Starting message handler") defer c.isClosed.Done() defer c.closePartition(pc) @@ -87,13 +139,13 @@ func (c *consumer) handleMessages(pc sc.PartitionConsumer) { } } -func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) { +func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) partitionConsumer.Close() // blocks until messages channel is drained c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) } -func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { +func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { c.logger.Info("Starting error handler") defer c.isClosed.Done() @@ -103,9 +155,3 @@ func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.Consumer c.logger.Error("Error consuming from Kafka", zap.Error(err)) } } - -func (c *consumer) Close() error { - close(c.close) - c.isClosed.Wait() - return c.SaramaConsumer.Close() -} diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go index 526856a2d54..4d760978e11 100644 --- a/cmd/ingester/app/consumer/consumer_metrics.go +++ b/cmd/ingester/app/consumer/consumer_metrics.go @@ -30,7 +30,7 @@ type errMetrics struct { errCounter metrics.Counter } -func (c *consumer) newMsgMetrics(partition int32) msgMetrics { +func (c *Consumer) newMsgMetrics(partition int32) msgMetrics { f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) return msgMetrics{ counter: f.Counter("messages", nil), @@ -39,7 +39,7 @@ func (c *consumer) newMsgMetrics(partition int32) msgMetrics { } } -func (c *consumer) newErrMetrics(partition int32) errMetrics { +func (c *Consumer) newErrMetrics(partition int32) errMetrics { f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) return errMetrics{errCounter: f.Counter("errors", nil)} diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index e85eb284234..39432c71e56 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -36,7 +36,7 @@ import ( type consumerTest struct { saramaConsumer *kmocks.SaramaConsumer - consumer *consumer + consumer *Consumer partitionConsumer *kmocks.PartitionConsumer } @@ -46,7 +46,7 @@ func withWrappedConsumer(fn func(c *consumerTest)) { metricsFactory := metrics.NewLocalFactory(0) c := &consumerTest{ saramaConsumer: sc, - consumer: &consumer{ + consumer: &Consumer{ metricsFactory: metricsFactory, logger: logger, close: make(chan struct{}), @@ -106,7 +106,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { mp.On("Process", &saramaMessageWrapper{msg}).Return(nil) c.consumer.processorFactory.baseProcessor = mp - c.consumer.mainLoop() + c.consumer.Start() time.Sleep(100 * time.Millisecond) close(msgCh) close(errCh) @@ -149,7 +149,7 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { c.partitionConsumer.On("Messages").Return((<-chan *sarama.ConsumerMessage)(msgCh)) c.partitionConsumer.On("Close").Return(nil) - c.consumer.mainLoop() + c.consumer.Start() time.Sleep(100 * time.Millisecond) close(msgCh) close(errCh) From ca26378bf0f6792f26a81936f120f5ab869b6e49 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Thu, 19 Jul 2018 12:39:38 -0400 Subject: [PATCH 02/12] Configure consumer with viper Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 20 ++----- cmd/ingester/app/consumer/options.go | 72 +++++++++++++++++++++++ cmd/ingester/app/consumer/options_test.go | 51 ++++++++++++++++ 3 files changed, 129 insertions(+), 14 deletions(-) create mode 100644 cmd/ingester/app/consumer/options.go create mode 100644 cmd/ingester/app/consumer/options_test.go diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 3d03ec21341..5502878be58 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -26,26 +26,18 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" ) -// SaramaConsumer is an interface to features of Sarama that we use +// SaramaConsumer is an interface to features of Sarama that are necessary for the consumer type SaramaConsumer interface { Partitions() <-chan sc.PartitionConsumer MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) io.Closer } -// Config stores the configuration for a Consumer -type Config struct { - Topic string `yaml:"topic"` - GroupID string `yaml:"group_id"` - Brokers []string `yaml:"brokers"` - Parallelism int `yaml:"parallelism"` -} - // Params are the parameters of a Consumer type Params struct { - Config Config + Options Options Processor processor.SpanProcessor - Factory metrics.Factory `name:"service_metrics"` + Factory metrics.Factory Logger *zap.Logger } @@ -65,7 +57,7 @@ type Consumer struct { func New(params Params) (Consumer, error) { saramaConfig := sc.NewConfig() saramaConfig.Group.Mode = sc.ConsumerModePartitions - saramaConsumer, err := sc.NewConsumer(params.Config.Brokers, params.Config.GroupID, []string{params.Config.Topic}, saramaConfig) + saramaConsumer, err := sc.NewConsumer(params.Options.Brokers, params.Options.GroupID, []string{params.Options.Topic}, saramaConfig) if err != nil { return Consumer{}, err } @@ -76,12 +68,12 @@ func New(params Params) (Consumer, error) { isClosed: sync.WaitGroup{}, SaramaConsumer: saramaConsumer, processorFactory: processorFactory{ - topic: params.Config.Topic, + topic: params.Options.Topic, consumer: saramaConsumer, metricsFactory: params.Factory, logger: params.Logger, baseProcessor: params.Processor, - parallelism: params.Config.Parallelism, + parallelism: params.Options.Parallelism, }, }, nil } diff --git a/cmd/ingester/app/consumer/options.go b/cmd/ingester/app/consumer/options.go new file mode 100644 index 00000000000..25bc6201625 --- /dev/null +++ b/cmd/ingester/app/consumer/options.go @@ -0,0 +1,72 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "flag" + "strconv" + "strings" + + "github.com/spf13/viper" +) + +const ( + configPrefix = "ingester-consumer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixGroupID = ".group-id" + suffixParallelism = ".parallelism" + + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-ingester-spans" + defaultGroupID = "jaeger-ingester" + defaultParallelism = 1000 +) + +// Options stores the configuration options for a Kafka consumer +type Options struct { + Topic string + GroupID string + Brokers []string + Parallelism int +} + +// AddFlags adds flags for Options +func (opt *Options) AddFlags(flagSet *flag.FlagSet) { + flagSet.String( + configPrefix+suffixBrokers, + defaultBroker, + "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") + flagSet.String( + configPrefix+suffixTopic, + defaultTopic, + "The name of the kafka topic to consume from") + flagSet.String( + configPrefix+suffixGroupID, + defaultGroupID, + "The Consumer Group that ingester will be consuming on behalf of") + flagSet.String( + configPrefix+suffixParallelism, + strconv.Itoa(defaultParallelism), + "The number of messages to process in parallel") +} + +// InitFromViper initializes Options with properties from viper +func (opt *Options) InitFromViper(v *viper.Viper) { + opt.Brokers = strings.Split(v.GetString(configPrefix+suffixBrokers), ",") + opt.Topic = v.GetString(configPrefix + suffixTopic) + opt.GroupID = v.GetString(configPrefix + suffixGroupID) + opt.Parallelism = v.GetInt(configPrefix + suffixParallelism) +} diff --git a/cmd/ingester/app/consumer/options_test.go b/cmd/ingester/app/consumer/options_test.go new file mode 100644 index 00000000000..61fe62208a8 --- /dev/null +++ b/cmd/ingester/app/consumer/options_test.go @@ -0,0 +1,51 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/pkg/config" +) + +func TestOptionsWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{ + "--ingester-consumer.topic=topic1", + "--ingester-consumer.brokers=127.0.0.1:9092,0.0.0:1234", + "--ingester-consumer.group-id=group1", + "--ingester-consumer.parallelism=5"}) + opts.InitFromViper(v) + + assert.Equal(t, "topic1", opts.Topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Brokers) + assert.Equal(t, "group1", opts.GroupID) + assert.Equal(t, 5, opts.Parallelism) +} + +func TestFlagDefaults(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{}) + opts.InitFromViper(v) + + assert.Equal(t, defaultTopic, opts.Topic) + assert.Equal(t, []string{defaultBroker}, opts.Brokers) + assert.Equal(t, defaultGroupID, opts.GroupID) + assert.Equal(t, defaultParallelism, opts.Parallelism) +} From 6c12667dc01d794e55a42ae9b9fd01d3b38a2296 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Fri, 20 Jul 2018 17:07:48 -0400 Subject: [PATCH 03/12] Add kafka ConsumerBuilder Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 26 ++++------ cmd/ingester/app/consumer/consumer_test.go | 50 ++++++++++++++++--- .../mocks/{SaramaConsumer.go => Consumer.go} | 12 ++--- cmd/ingester/app/consumer/options.go | 8 +-- cmd/ingester/app/consumer/options_test.go | 4 +- .../app/consumer/processor_factory.go | 3 +- .../app/consumer/processor_factory_test.go | 2 +- plugin/storage/kafka/factory_test.go | 2 +- 8 files changed, 67 insertions(+), 40 deletions(-) rename cmd/ingester/app/consumer/mocks/{SaramaConsumer.go => Consumer.go} (79%) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 5502878be58..23de2771452 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -15,7 +15,6 @@ package consumer import ( - "io" "sync" "github.com/Shopify/sarama" @@ -24,21 +23,16 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" + "github.com/jaegertracing/jaeger/pkg/kafka/config" ) -// SaramaConsumer is an interface to features of Sarama that are necessary for the consumer -type SaramaConsumer interface { - Partitions() <-chan sc.PartitionConsumer - MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) - io.Closer -} - // Params are the parameters of a Consumer type Params struct { Options Options Processor processor.SpanProcessor Factory metrics.Factory Logger *zap.Logger + config.ConsumerBuilder } // Consumer uses sarama to consume messages from kafka and handle @@ -50,23 +44,21 @@ type Consumer struct { close chan struct{} isClosed sync.WaitGroup - SaramaConsumer + config.Consumer } // New is a constructor for a Consumer -func New(params Params) (Consumer, error) { - saramaConfig := sc.NewConfig() - saramaConfig.Group.Mode = sc.ConsumerModePartitions - saramaConsumer, err := sc.NewConsumer(params.Options.Brokers, params.Options.GroupID, []string{params.Options.Topic}, saramaConfig) +func New(params Params) (*Consumer, error) { + saramaConsumer, err := params.ConsumerBuilder.NewConsumer() if err != nil { - return Consumer{}, err + return nil, err } - return Consumer{ + return &Consumer{ metricsFactory: params.Factory, logger: params.Logger, close: make(chan struct{}, 1), isClosed: sync.WaitGroup{}, - SaramaConsumer: saramaConsumer, + Consumer: saramaConsumer, processorFactory: processorFactory{ topic: params.Options.Topic, consumer: saramaConsumer, @@ -89,7 +81,7 @@ func (c *Consumer) Start() { func (c *Consumer) Close() error { close(c.close) c.isClosed.Wait() - return c.SaramaConsumer.Close() + return c.Consumer.Close() } func (c *Consumer) mainLoop() { diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 39432c71e56..721f45c0838 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -22,26 +22,61 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/uber/jaeger-lib/metrics" "github.com/uber/jaeger-lib/metrics/testutils" "go.uber.org/zap" kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" - "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" + pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" + "github.com/jaegertracing/jaeger/pkg/kafka/config" ) -//go:generate mockery -name SaramaConsumer +//go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer //go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer type consumerTest struct { - saramaConsumer *kmocks.SaramaConsumer + saramaConsumer *kmocks.Consumer consumer *Consumer partitionConsumer *kmocks.PartitionConsumer } +type mockConsumerConfiguration struct { + config.ConsumerConfiguration + err error +} + +func (m *mockConsumerConfiguration) NewConsumer() (config.Consumer, error) { + return &kmocks.Consumer{}, m.err +} + +func TestConstructor(t *testing.T) { + params := Params{ + Options: Options{ + Parallelism: 1, + ConsumerConfiguration: config.ConsumerConfiguration{ + Brokers: []string{"someBroker"}, + Topic: "someTopic", + GroupID: "someGroup", + }, + }, + } + params.ConsumerBuilder = &mockConsumerConfiguration{} + consumer, err := New(params) + assert.NoError(t, err) + assert.NotNil(t, consumer) + assert.NotNil(t, consumer.processorFactory) + + params.ConsumerBuilder = &mockConsumerConfiguration{ + err: errors.New("consumerBuilder error"), + } + _, err = New(params) + assert.Error(t, err, "consumerBuilder error") +} + func withWrappedConsumer(fn func(c *consumerTest)) { - sc := &kmocks.SaramaConsumer{} + sc := &kmocks.Consumer{} logger, _ := zap.NewDevelopment() metricsFactory := metrics.NewLocalFactory(0) c := &consumerTest{ @@ -51,13 +86,13 @@ func withWrappedConsumer(fn func(c *consumerTest)) { logger: logger, close: make(chan struct{}), isClosed: sync.WaitGroup{}, - SaramaConsumer: sc, + Consumer: sc, processorFactory: processorFactory{ topic: "topic", consumer: sc, metricsFactory: metricsFactory, logger: logger, - baseProcessor: &mocks.SpanProcessor{}, + baseProcessor: &pmocks.SpanProcessor{}, parallelism: 1, }, }, @@ -74,7 +109,6 @@ func withWrappedConsumer(fn func(c *consumerTest)) { } func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { - withWrappedConsumer(func(c *consumerTest) { topic := "morekuzambu" partition := int32(316) @@ -102,7 +136,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { c.partitionConsumer.On("HighWaterMarkOffset").Return(int64(1234)) c.partitionConsumer.On("Close").Return(nil) - mp := &mocks.SpanProcessor{} + mp := &pmocks.SpanProcessor{} mp.On("Process", &saramaMessageWrapper{msg}).Return(nil) c.consumer.processorFactory.baseProcessor = mp diff --git a/cmd/ingester/app/consumer/mocks/SaramaConsumer.go b/cmd/ingester/app/consumer/mocks/Consumer.go similarity index 79% rename from cmd/ingester/app/consumer/mocks/SaramaConsumer.go rename to cmd/ingester/app/consumer/mocks/Consumer.go index 5c3c0fdedbf..a40cf181b81 100644 --- a/cmd/ingester/app/consumer/mocks/SaramaConsumer.go +++ b/cmd/ingester/app/consumer/mocks/Consumer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. // Copyright (c) 2018 The Jaeger Authors. // @@ -20,13 +20,13 @@ import cluster "github.com/bsm/sarama-cluster" import mock "github.com/stretchr/testify/mock" -// SaramaConsumer is an autogenerated mock type for the SaramaConsumer type -type SaramaConsumer struct { +// Consumer is an autogenerated mock type for the Consumer type +type Consumer struct { mock.Mock } // Close provides a mock function with given fields: -func (_m *SaramaConsumer) Close() error { +func (_m *Consumer) Close() error { ret := _m.Called() var r0 error @@ -40,12 +40,12 @@ func (_m *SaramaConsumer) Close() error { } // MarkPartitionOffset provides a mock function with given fields: topic, partition, offset, metadata -func (_m *SaramaConsumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { +func (_m *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { _m.Called(topic, partition, offset, metadata) } // Partitions provides a mock function with given fields: -func (_m *SaramaConsumer) Partitions() <-chan cluster.PartitionConsumer { +func (_m *Consumer) Partitions() <-chan cluster.PartitionConsumer { ret := _m.Called() var r0 <-chan cluster.PartitionConsumer diff --git a/cmd/ingester/app/consumer/options.go b/cmd/ingester/app/consumer/options.go index 25bc6201625..6b8f72d51d0 100644 --- a/cmd/ingester/app/consumer/options.go +++ b/cmd/ingester/app/consumer/options.go @@ -20,6 +20,8 @@ import ( "strings" "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/pkg/kafka/config" ) const ( @@ -37,14 +39,12 @@ const ( // Options stores the configuration options for a Kafka consumer type Options struct { - Topic string - GroupID string - Brokers []string + config.ConsumerConfiguration Parallelism int } // AddFlags adds flags for Options -func (opt *Options) AddFlags(flagSet *flag.FlagSet) { +func AddFlags(flagSet *flag.FlagSet) { flagSet.String( configPrefix+suffixBrokers, defaultBroker, diff --git a/cmd/ingester/app/consumer/options_test.go b/cmd/ingester/app/consumer/options_test.go index 61fe62208a8..5bd38604937 100644 --- a/cmd/ingester/app/consumer/options_test.go +++ b/cmd/ingester/app/consumer/options_test.go @@ -24,7 +24,7 @@ import ( func TestOptionsWithFlags(t *testing.T) { opts := &Options{} - v, command := config.Viperize(opts.AddFlags) + v, command := config.Viperize(AddFlags) command.ParseFlags([]string{ "--ingester-consumer.topic=topic1", "--ingester-consumer.brokers=127.0.0.1:9092,0.0.0:1234", @@ -40,7 +40,7 @@ func TestOptionsWithFlags(t *testing.T) { func TestFlagDefaults(t *testing.T) { opts := &Options{} - v, command := config.Viperize(opts.AddFlags) + v, command := config.Viperize(AddFlags) command.ParseFlags([]string{}) opts.InitFromViper(v) diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index 34f7e1b37b5..b935a9a3dbc 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -23,11 +23,12 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator" + "github.com/jaegertracing/jaeger/pkg/kafka/config" ) type processorFactory struct { topic string - consumer SaramaConsumer + consumer config.Consumer metricsFactory metrics.Factory logger *zap.Logger baseProcessor processor.SpanProcessor diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go index 622460772e7..45b2c3c8854 100644 --- a/cmd/ingester/app/consumer/processor_factory_test.go +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -29,7 +29,7 @@ import ( func Test_new(t *testing.T) { - mockConsumer := &kmocks.SaramaConsumer{} + mockConsumer := &kmocks.Consumer{} mockConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) topic := "coelacanth" diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index 544c69e2c37..323ac4ff8ac 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -33,7 +33,7 @@ import ( var _ storage.Factory = new(Factory) type mockProducerBuilder struct { - kafkaConfig.Configuration + kafkaConfig.ProducerConfiguration err error t *testing.T } From b8d4194b105bf304a264afdccda5f7bb50391058 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 13:11:41 -0400 Subject: [PATCH 04/12] Remove ProcessorFactory initialization from Consumer Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 28 +++++++------------ cmd/ingester/app/consumer/consumer_test.go | 13 ++------- .../app/consumer/processor_factory.go | 27 ++++++++++++++++-- .../app/consumer/processor_factory_test.go | 2 +- 4 files changed, 38 insertions(+), 32 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 23de2771452..ec5cb929f39 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -28,10 +28,9 @@ import ( // Params are the parameters of a Consumer type Params struct { - Options Options - Processor processor.SpanProcessor - Factory metrics.Factory - Logger *zap.Logger + ProcessorFactory ProcessorFactory + Factory metrics.Factory + Logger *zap.Logger config.ConsumerBuilder } @@ -39,7 +38,7 @@ type Params struct { type Consumer struct { metricsFactory metrics.Factory logger *zap.Logger - processorFactory processorFactory + processorFactory ProcessorFactory close chan struct{} isClosed sync.WaitGroup @@ -54,19 +53,12 @@ func New(params Params) (*Consumer, error) { return nil, err } return &Consumer{ - metricsFactory: params.Factory, - logger: params.Logger, - close: make(chan struct{}, 1), - isClosed: sync.WaitGroup{}, - Consumer: saramaConsumer, - processorFactory: processorFactory{ - topic: params.Options.Topic, - consumer: saramaConsumer, - metricsFactory: params.Factory, - logger: params.Logger, - baseProcessor: params.Processor, - parallelism: params.Options.Parallelism, - }, + metricsFactory: params.Factory, + logger: params.Logger, + close: make(chan struct{}, 1), + isClosed: sync.WaitGroup{}, + Consumer: saramaConsumer, + processorFactory: params.ProcessorFactory, }, nil } diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 721f45c0838..0089b8c9891 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -52,16 +52,7 @@ func (m *mockConsumerConfiguration) NewConsumer() (config.Consumer, error) { } func TestConstructor(t *testing.T) { - params := Params{ - Options: Options{ - Parallelism: 1, - ConsumerConfiguration: config.ConsumerConfiguration{ - Brokers: []string{"someBroker"}, - Topic: "someTopic", - GroupID: "someGroup", - }, - }, - } + params := Params{} params.ConsumerBuilder = &mockConsumerConfiguration{} consumer, err := New(params) assert.NoError(t, err) @@ -87,7 +78,7 @@ func withWrappedConsumer(fn func(c *consumerTest)) { close: make(chan struct{}), isClosed: sync.WaitGroup{}, Consumer: sc, - processorFactory: processorFactory{ + processorFactory: ProcessorFactory{ topic: "topic", consumer: sc, metricsFactory: metricsFactory, diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index b935a9a3dbc..7623154e04d 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -26,7 +26,18 @@ import ( "github.com/jaegertracing/jaeger/pkg/kafka/config" ) -type processorFactory struct { +// FactoryParams are the parameters of a ProcessorFactory +type FactoryParams struct { + Parallelism int + Topic string + BaseProcessor processor.SpanProcessor + Consumer *Consumer + Factory metrics.Factory + Logger *zap.Logger +} + +// ProcessorFactory is a factory for creating startedProcessors +type ProcessorFactory struct { topic string consumer config.Consumer metricsFactory metrics.Factory @@ -35,7 +46,19 @@ type processorFactory struct { parallelism int } -func (c *processorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { +// NewFactory constructs a new ProcessorFactory +func (c *ProcessorFactory) NewFactory(params FactoryParams) *ProcessorFactory { + return &ProcessorFactory{ + topic: params.Topic, + consumer: params.Consumer, + metricsFactory: params.Factory, + logger: params.Logger, + baseProcessor: params.BaseProcessor, + parallelism: params.Parallelism, + } +} + +func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { c.logger.Info("Creating new processors", zap.Int32("partition", partition)) markOffset := func(offset int64) { diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go index 45b2c3c8854..e33e5f9ab38 100644 --- a/cmd/ingester/app/consumer/processor_factory_test.go +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -36,7 +36,7 @@ func Test_new(t *testing.T) { partition := int32(21) offset := int64(555) - pf := processorFactory{ + pf := ProcessorFactory{ topic: topic, consumer: mockConsumer, metricsFactory: metrics.NullFactory, From e4a5a40df1cde2c1bb4675233e991287b144397c Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 13:11:51 -0400 Subject: [PATCH 05/12] Move options from consumer to ingester Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/{consumer => }/options.go | 4 ++-- cmd/ingester/app/{consumer => }/options_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) rename cmd/ingester/app/{consumer => }/options.go (97%) rename cmd/ingester/app/{consumer => }/options_test.go (87%) diff --git a/cmd/ingester/app/consumer/options.go b/cmd/ingester/app/options.go similarity index 97% rename from cmd/ingester/app/consumer/options.go rename to cmd/ingester/app/options.go index 6b8f72d51d0..fce5bb4f9fc 100644 --- a/cmd/ingester/app/consumer/options.go +++ b/cmd/ingester/app/options.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package consumer +package app import ( "flag" @@ -25,7 +25,7 @@ import ( ) const ( - configPrefix = "ingester-consumer" + configPrefix = "ingester" suffixBrokers = ".brokers" suffixTopic = ".topic" suffixGroupID = ".group-id" diff --git a/cmd/ingester/app/consumer/options_test.go b/cmd/ingester/app/options_test.go similarity index 87% rename from cmd/ingester/app/consumer/options_test.go rename to cmd/ingester/app/options_test.go index 5bd38604937..272e6e52df5 100644 --- a/cmd/ingester/app/consumer/options_test.go +++ b/cmd/ingester/app/options_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package consumer +package app import ( "testing" @@ -26,10 +26,10 @@ func TestOptionsWithFlags(t *testing.T) { opts := &Options{} v, command := config.Viperize(AddFlags) command.ParseFlags([]string{ - "--ingester-consumer.topic=topic1", - "--ingester-consumer.brokers=127.0.0.1:9092,0.0.0:1234", - "--ingester-consumer.group-id=group1", - "--ingester-consumer.parallelism=5"}) + "--ingester.topic=topic1", + "--ingester.brokers=127.0.0.1:9092,0.0.0:1234", + "--ingester.group-id=group1", + "--ingester.parallelism=5"}) opts.InitFromViper(v) assert.Equal(t, "topic1", opts.Topic) From aa58856d774d6129863db4f4ad617e2f14ddc3b4 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 13:21:21 -0400 Subject: [PATCH 06/12] Consumer minor fixes Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index ec5cb929f39..fa33c327ef7 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -34,7 +34,7 @@ type Params struct { config.ConsumerBuilder } -// Consumer uses sarama to consume messages from kafka and handle +// Consumer uses sarama to consume and handle messages from kafka type Consumer struct { metricsFactory metrics.Factory logger *zap.Logger @@ -48,7 +48,7 @@ type Consumer struct { // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { - saramaConsumer, err := params.ConsumerBuilder.NewConsumer() + saramaConsumer, err := params.NewConsumer() if err != nil { return nil, err } From f52ab9bb930a81f9f9f6138c61356c56764fc643 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 13:38:49 -0400 Subject: [PATCH 07/12] Split kafka config into consumer and producer Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 6 +-- cmd/ingester/app/consumer/consumer_test.go | 16 +++---- .../app/consumer/processor_factory.go | 4 +- cmd/ingester/app/options.go | 4 +- pkg/kafka/config/consumer/config.go | 48 +++++++++++++++++++ pkg/kafka/config/producer/.nocover | 1 + pkg/kafka/config/producer/config.go | 36 ++++++++++++++ 7 files changed, 100 insertions(+), 15 deletions(-) create mode 100644 pkg/kafka/config/consumer/config.go create mode 100644 pkg/kafka/config/producer/.nocover create mode 100644 pkg/kafka/config/producer/config.go diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index fa33c327ef7..11c035695f5 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" - "github.com/jaegertracing/jaeger/pkg/kafka/config" + "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" ) // Params are the parameters of a Consumer @@ -31,7 +31,7 @@ type Params struct { ProcessorFactory ProcessorFactory Factory metrics.Factory Logger *zap.Logger - config.ConsumerBuilder + consumer.Builder } // Consumer uses sarama to consume and handle messages from kafka @@ -43,7 +43,7 @@ type Consumer struct { close chan struct{} isClosed sync.WaitGroup - config.Consumer + consumer.Consumer } // New is a constructor for a Consumer diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 0089b8c9891..7d0384ea738 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -30,7 +30,7 @@ import ( kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" - "github.com/jaegertracing/jaeger/pkg/kafka/config" + "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" ) //go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer @@ -43,23 +43,23 @@ type consumerTest struct { } type mockConsumerConfiguration struct { - config.ConsumerConfiguration + consumer.Configuration err error } -func (m *mockConsumerConfiguration) NewConsumer() (config.Consumer, error) { +func (m *mockConsumerConfiguration) NewConsumer() (consumer.Consumer, error) { return &kmocks.Consumer{}, m.err } func TestConstructor(t *testing.T) { params := Params{} - params.ConsumerBuilder = &mockConsumerConfiguration{} - consumer, err := New(params) + params.Builder = &mockConsumerConfiguration{} + newConsumer, err := New(params) assert.NoError(t, err) - assert.NotNil(t, consumer) - assert.NotNil(t, consumer.processorFactory) + assert.NotNil(t, newConsumer) + assert.NotNil(t, newConsumer.processorFactory) - params.ConsumerBuilder = &mockConsumerConfiguration{ + params.Builder = &mockConsumerConfiguration{ err: errors.New("consumerBuilder error"), } _, err = New(params) diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index 7623154e04d..9ea465f5b6f 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -23,7 +23,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator" - "github.com/jaegertracing/jaeger/pkg/kafka/config" + "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" ) // FactoryParams are the parameters of a ProcessorFactory @@ -39,7 +39,7 @@ type FactoryParams struct { // ProcessorFactory is a factory for creating startedProcessors type ProcessorFactory struct { topic string - consumer config.Consumer + consumer consumer.Consumer metricsFactory metrics.Factory logger *zap.Logger baseProcessor processor.SpanProcessor diff --git a/cmd/ingester/app/options.go b/cmd/ingester/app/options.go index fce5bb4f9fc..f0699e200e7 100644 --- a/cmd/ingester/app/options.go +++ b/cmd/ingester/app/options.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/viper" - "github.com/jaegertracing/jaeger/pkg/kafka/config" + "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" ) const ( @@ -39,7 +39,7 @@ const ( // Options stores the configuration options for a Kafka consumer type Options struct { - config.ConsumerConfiguration + consumer.Configuration Parallelism int } diff --git a/pkg/kafka/config/consumer/config.go b/pkg/kafka/config/consumer/config.go new file mode 100644 index 00000000000..c1f542f0e8e --- /dev/null +++ b/pkg/kafka/config/consumer/config.go @@ -0,0 +1,48 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "io" + + "github.com/bsm/sarama-cluster" +) + +// Consumer is an interface to features of Sarama that are necessary for the consumer +type Consumer interface { + Partitions() <-chan cluster.PartitionConsumer + MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + io.Closer +} + +// Builder builds a new kafka consumer +type Builder interface { + NewConsumer() (Consumer, error) +} + +// Configuration describes the configuration properties needed to create a Kafka consumer +type Configuration struct { + Brokers []string + Topic string + GroupID string + Consumer +} + +// NewConsumer creates a new kafka consumer +func (c *Configuration) NewConsumer() (Consumer, error) { + saramaConfig := cluster.NewConfig() + saramaConfig.Group.Mode = cluster.ConsumerModePartitions + return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) +} diff --git a/pkg/kafka/config/producer/.nocover b/pkg/kafka/config/producer/.nocover new file mode 100644 index 00000000000..98344a6f8ba --- /dev/null +++ b/pkg/kafka/config/producer/.nocover @@ -0,0 +1 @@ +requires connection to Kafka diff --git a/pkg/kafka/config/producer/config.go b/pkg/kafka/config/producer/config.go new file mode 100644 index 00000000000..5079ca8fef2 --- /dev/null +++ b/pkg/kafka/config/producer/config.go @@ -0,0 +1,36 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producer + +import ( + "github.com/Shopify/sarama" +) + +// Builder builds a new kafka producer +type Builder interface { + NewProducer() (sarama.AsyncProducer, error) +} + +// Configuration describes the configuration properties needed to create a Kafka producer +type Configuration struct { + Brokers []string +} + +// NewProducer creates a new asynchronous kafka producer +func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.Producer.Return.Successes = true + return sarama.NewAsyncProducer(c.Brokers, saramaConfig) +} From 9d6a4d7b056e19251dd846f434ca65b2bd324b76 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 13:44:14 -0400 Subject: [PATCH 08/12] Add test for ProcessorFactory Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer_test.go | 1 - cmd/ingester/app/consumer/processor_factory.go | 4 ++-- cmd/ingester/app/consumer/processor_factory_test.go | 7 +++++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 7d0384ea738..e9bc84ed211 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -57,7 +57,6 @@ func TestConstructor(t *testing.T) { newConsumer, err := New(params) assert.NoError(t, err) assert.NotNil(t, newConsumer) - assert.NotNil(t, newConsumer.processorFactory) params.Builder = &mockConsumerConfiguration{ err: errors.New("consumerBuilder error"), diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index 9ea465f5b6f..cc923448185 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -47,7 +47,7 @@ type ProcessorFactory struct { } // NewFactory constructs a new ProcessorFactory -func (c *ProcessorFactory) NewFactory(params FactoryParams) *ProcessorFactory { +func NewFactory(params FactoryParams) (*ProcessorFactory, error) { return &ProcessorFactory{ topic: params.Topic, consumer: params.Consumer, @@ -55,7 +55,7 @@ func (c *ProcessorFactory) NewFactory(params FactoryParams) *ProcessorFactory { logger: params.Logger, baseProcessor: params.BaseProcessor, parallelism: params.Parallelism, - } + }, nil } func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go index e33e5f9ab38..25b0099a909 100644 --- a/cmd/ingester/app/consumer/processor_factory_test.go +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -27,6 +27,13 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" ) +func Test_NewFactory(t *testing.T) { + params := FactoryParams{} + newFactory, err := NewFactory(params) + assert.NoError(t, err) + assert.NotNil(t, newFactory) +} + func Test_new(t *testing.T) { mockConsumer := &kmocks.Consumer{} From ef09c37e5000db51cbda1c27c1c64782f04451e6 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 13:56:22 -0400 Subject: [PATCH 09/12] Remove Options Removed to reduce the scope of this PR. Will be added once producer and consumer are merged. Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/options.go | 72 -------------------------------- cmd/ingester/app/options_test.go | 51 ---------------------- 2 files changed, 123 deletions(-) delete mode 100644 cmd/ingester/app/options.go delete mode 100644 cmd/ingester/app/options_test.go diff --git a/cmd/ingester/app/options.go b/cmd/ingester/app/options.go deleted file mode 100644 index f0699e200e7..00000000000 --- a/cmd/ingester/app/options.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) 2018 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "flag" - "strconv" - "strings" - - "github.com/spf13/viper" - - "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" -) - -const ( - configPrefix = "ingester" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixGroupID = ".group-id" - suffixParallelism = ".parallelism" - - defaultBroker = "127.0.0.1:9092" - defaultTopic = "jaeger-ingester-spans" - defaultGroupID = "jaeger-ingester" - defaultParallelism = 1000 -) - -// Options stores the configuration options for a Kafka consumer -type Options struct { - consumer.Configuration - Parallelism int -} - -// AddFlags adds flags for Options -func AddFlags(flagSet *flag.FlagSet) { - flagSet.String( - configPrefix+suffixBrokers, - defaultBroker, - "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") - flagSet.String( - configPrefix+suffixTopic, - defaultTopic, - "The name of the kafka topic to consume from") - flagSet.String( - configPrefix+suffixGroupID, - defaultGroupID, - "The Consumer Group that ingester will be consuming on behalf of") - flagSet.String( - configPrefix+suffixParallelism, - strconv.Itoa(defaultParallelism), - "The number of messages to process in parallel") -} - -// InitFromViper initializes Options with properties from viper -func (opt *Options) InitFromViper(v *viper.Viper) { - opt.Brokers = strings.Split(v.GetString(configPrefix+suffixBrokers), ",") - opt.Topic = v.GetString(configPrefix + suffixTopic) - opt.GroupID = v.GetString(configPrefix + suffixGroupID) - opt.Parallelism = v.GetInt(configPrefix + suffixParallelism) -} diff --git a/cmd/ingester/app/options_test.go b/cmd/ingester/app/options_test.go deleted file mode 100644 index 272e6e52df5..00000000000 --- a/cmd/ingester/app/options_test.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) 2018 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/jaegertracing/jaeger/pkg/config" -) - -func TestOptionsWithFlags(t *testing.T) { - opts := &Options{} - v, command := config.Viperize(AddFlags) - command.ParseFlags([]string{ - "--ingester.topic=topic1", - "--ingester.brokers=127.0.0.1:9092,0.0.0:1234", - "--ingester.group-id=group1", - "--ingester.parallelism=5"}) - opts.InitFromViper(v) - - assert.Equal(t, "topic1", opts.Topic) - assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Brokers) - assert.Equal(t, "group1", opts.GroupID) - assert.Equal(t, 5, opts.Parallelism) -} - -func TestFlagDefaults(t *testing.T) { - opts := &Options{} - v, command := config.Viperize(AddFlags) - command.ParseFlags([]string{}) - opts.InitFromViper(v) - - assert.Equal(t, defaultTopic, opts.Topic) - assert.Equal(t, []string{defaultBroker}, opts.Brokers) - assert.Equal(t, defaultGroupID, opts.GroupID) - assert.Equal(t, defaultParallelism, opts.Parallelism) -} From dd63ebd7a78c0edfbeb6970319136c3a6cffd0a3 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 23 Jul 2018 18:48:27 -0400 Subject: [PATCH 10/12] Take SaramaConsumer as parameter to Consumer Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 20 +++++++--------- cmd/ingester/app/consumer/consumer_test.go | 24 +++---------------- .../app/consumer/processor_factory.go | 18 +++++++------- 3 files changed, 20 insertions(+), 42 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 11c035695f5..c633f85344a 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -31,33 +31,29 @@ type Params struct { ProcessorFactory ProcessorFactory Factory metrics.Factory Logger *zap.Logger - consumer.Builder + SaramaConsumer consumer.Consumer } // Consumer uses sarama to consume and handle messages from kafka type Consumer struct { - metricsFactory metrics.Factory - logger *zap.Logger + metricsFactory metrics.Factory + logger *zap.Logger + + saramaConsumer consumer.Consumer processorFactory ProcessorFactory close chan struct{} isClosed sync.WaitGroup - - consumer.Consumer } // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { - saramaConsumer, err := params.NewConsumer() - if err != nil { - return nil, err - } return &Consumer{ metricsFactory: params.Factory, logger: params.Logger, close: make(chan struct{}, 1), isClosed: sync.WaitGroup{}, - Consumer: saramaConsumer, + saramaConsumer: params.SaramaConsumer, processorFactory: params.ProcessorFactory, }, nil } @@ -73,13 +69,13 @@ func (c *Consumer) Start() { func (c *Consumer) Close() error { close(c.close) c.isClosed.Wait() - return c.Consumer.Close() + return c.saramaConsumer.Close() } func (c *Consumer) mainLoop() { for { select { - case pc := <-c.Partitions(): + case pc := <-c.saramaConsumer.Partitions(): c.isClosed.Add(2) go c.handleMessages(pc) diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index e9bc84ed211..36ef5765c81 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -30,7 +30,6 @@ import ( kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" - "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" ) //go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer @@ -42,27 +41,10 @@ type consumerTest struct { partitionConsumer *kmocks.PartitionConsumer } -type mockConsumerConfiguration struct { - consumer.Configuration - err error -} - -func (m *mockConsumerConfiguration) NewConsumer() (consumer.Consumer, error) { - return &kmocks.Consumer{}, m.err -} - func TestConstructor(t *testing.T) { - params := Params{} - params.Builder = &mockConsumerConfiguration{} - newConsumer, err := New(params) + newConsumer, err := New(Params{}) assert.NoError(t, err) assert.NotNil(t, newConsumer) - - params.Builder = &mockConsumerConfiguration{ - err: errors.New("consumerBuilder error"), - } - _, err = New(params) - assert.Error(t, err, "consumerBuilder error") } func withWrappedConsumer(fn func(c *consumerTest)) { @@ -76,7 +58,7 @@ func withWrappedConsumer(fn func(c *consumerTest)) { logger: logger, close: make(chan struct{}), isClosed: sync.WaitGroup{}, - Consumer: sc, + saramaConsumer: sc, processorFactory: ProcessorFactory{ topic: "topic", consumer: sc, @@ -106,7 +88,7 @@ func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { metadata := "meatbag" c.saramaConsumer.On("MarkPartitionOffset", topic, partition, offset, metadata).Return() - c.consumer.MarkPartitionOffset(topic, partition, offset, metadata) + c.saramaConsumer.MarkPartitionOffset(topic, partition, offset, metadata) c.saramaConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset, metadata) }) diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index cc923448185..d3a7dab883d 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -28,12 +28,12 @@ import ( // FactoryParams are the parameters of a ProcessorFactory type FactoryParams struct { - Parallelism int - Topic string - BaseProcessor processor.SpanProcessor - Consumer *Consumer - Factory metrics.Factory - Logger *zap.Logger + Parallelism int + Topic string + BaseProcessor processor.SpanProcessor + SaramaConsumer consumer.Consumer + Factory metrics.Factory + Logger *zap.Logger } // ProcessorFactory is a factory for creating startedProcessors @@ -47,10 +47,10 @@ type ProcessorFactory struct { } // NewFactory constructs a new ProcessorFactory -func NewFactory(params FactoryParams) (*ProcessorFactory, error) { - return &ProcessorFactory{ +func NewFactory(params FactoryParams) (ProcessorFactory, error) { + return ProcessorFactory{ topic: params.Topic, - consumer: params.Consumer, + consumer: params.SaramaConsumer, metricsFactory: params.Factory, logger: params.Logger, baseProcessor: params.BaseProcessor, From 63b6772f2f7262f799fd737c6ce654966b5c2c36 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Thu, 26 Jul 2018 13:36:32 -0400 Subject: [PATCH 11/12] Move config consumer folder Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 2 +- .../app/consumer/processor_factory.go | 2 +- pkg/kafka/config/producer/.nocover | 1 - pkg/kafka/config/producer/config.go | 36 ------------------- pkg/kafka/{config => }/consumer/config.go | 0 plugin/storage/kafka/factory_test.go | 2 +- 6 files changed, 3 insertions(+), 40 deletions(-) delete mode 100644 pkg/kafka/config/producer/.nocover delete mode 100644 pkg/kafka/config/producer/config.go rename pkg/kafka/{config => }/consumer/config.go (100%) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index c633f85344a..eb354110bf3 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" - "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" + "github.com/jaegertracing/jaeger/pkg/kafka/consumer" ) // Params are the parameters of a Consumer diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index d3a7dab883d..97ea1573a42 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -23,7 +23,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator" - "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer" + "github.com/jaegertracing/jaeger/pkg/kafka/consumer" ) // FactoryParams are the parameters of a ProcessorFactory diff --git a/pkg/kafka/config/producer/.nocover b/pkg/kafka/config/producer/.nocover deleted file mode 100644 index 98344a6f8ba..00000000000 --- a/pkg/kafka/config/producer/.nocover +++ /dev/null @@ -1 +0,0 @@ -requires connection to Kafka diff --git a/pkg/kafka/config/producer/config.go b/pkg/kafka/config/producer/config.go deleted file mode 100644 index 5079ca8fef2..00000000000 --- a/pkg/kafka/config/producer/config.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2018 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package producer - -import ( - "github.com/Shopify/sarama" -) - -// Builder builds a new kafka producer -type Builder interface { - NewProducer() (sarama.AsyncProducer, error) -} - -// Configuration describes the configuration properties needed to create a Kafka producer -type Configuration struct { - Brokers []string -} - -// NewProducer creates a new asynchronous kafka producer -func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { - saramaConfig := sarama.NewConfig() - saramaConfig.Producer.Return.Successes = true - return sarama.NewAsyncProducer(c.Brokers, saramaConfig) -} diff --git a/pkg/kafka/config/consumer/config.go b/pkg/kafka/consumer/config.go similarity index 100% rename from pkg/kafka/config/consumer/config.go rename to pkg/kafka/consumer/config.go diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index 323ac4ff8ac..544c69e2c37 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -33,7 +33,7 @@ import ( var _ storage.Factory = new(Factory) type mockProducerBuilder struct { - kafkaConfig.ProducerConfiguration + kafkaConfig.Configuration err error t *testing.T } From 10eda2762da8155ea32413d18b432e546c1ef008 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Thu, 26 Jul 2018 14:16:42 -0400 Subject: [PATCH 12/12] Rename structs Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/consumer/consumer.go | 10 +++++----- cmd/ingester/app/consumer/consumer_test.go | 10 +++++----- cmd/ingester/app/consumer/processor_factory.go | 10 +++++----- cmd/ingester/app/consumer/processor_factory_test.go | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index eb354110bf3..433f37921bb 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -31,7 +31,7 @@ type Params struct { ProcessorFactory ProcessorFactory Factory metrics.Factory Logger *zap.Logger - SaramaConsumer consumer.Consumer + InternalConsumer consumer.Consumer } // Consumer uses sarama to consume and handle messages from kafka @@ -39,7 +39,7 @@ type Consumer struct { metricsFactory metrics.Factory logger *zap.Logger - saramaConsumer consumer.Consumer + internalConsumer consumer.Consumer processorFactory ProcessorFactory close chan struct{} @@ -53,7 +53,7 @@ func New(params Params) (*Consumer, error) { logger: params.Logger, close: make(chan struct{}, 1), isClosed: sync.WaitGroup{}, - saramaConsumer: params.SaramaConsumer, + internalConsumer: params.InternalConsumer, processorFactory: params.ProcessorFactory, }, nil } @@ -69,13 +69,13 @@ func (c *Consumer) Start() { func (c *Consumer) Close() error { close(c.close) c.isClosed.Wait() - return c.saramaConsumer.Close() + return c.internalConsumer.Close() } func (c *Consumer) mainLoop() { for { select { - case pc := <-c.saramaConsumer.Partitions(): + case pc := <-c.internalConsumer.Partitions(): c.isClosed.Add(2) go c.handleMessages(pc) diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 36ef5765c81..747b6f10c96 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -54,11 +54,11 @@ func withWrappedConsumer(fn func(c *consumerTest)) { c := &consumerTest{ saramaConsumer: sc, consumer: &Consumer{ - metricsFactory: metricsFactory, - logger: logger, - close: make(chan struct{}), - isClosed: sync.WaitGroup{}, - saramaConsumer: sc, + metricsFactory: metricsFactory, + logger: logger, + close: make(chan struct{}), + isClosed: sync.WaitGroup{}, + internalConsumer: sc, processorFactory: ProcessorFactory{ topic: "topic", consumer: sc, diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index 97ea1573a42..32cba107057 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -26,8 +26,8 @@ import ( "github.com/jaegertracing/jaeger/pkg/kafka/consumer" ) -// FactoryParams are the parameters of a ProcessorFactory -type FactoryParams struct { +// ProcessorFactoryParams are the parameters of a ProcessorFactory +type ProcessorFactoryParams struct { Parallelism int Topic string BaseProcessor processor.SpanProcessor @@ -46,9 +46,9 @@ type ProcessorFactory struct { parallelism int } -// NewFactory constructs a new ProcessorFactory -func NewFactory(params FactoryParams) (ProcessorFactory, error) { - return ProcessorFactory{ +// NewProcessorFactory constructs a new ProcessorFactory +func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) { + return &ProcessorFactory{ topic: params.Topic, consumer: params.SaramaConsumer, metricsFactory: params.Factory, diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go index 25b0099a909..e9cf040c4d1 100644 --- a/cmd/ingester/app/consumer/processor_factory_test.go +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -28,8 +28,8 @@ import ( ) func Test_NewFactory(t *testing.T) { - params := FactoryParams{} - newFactory, err := NewFactory(params) + params := ProcessorFactoryParams{} + newFactory, err := NewProcessorFactory(params) assert.NoError(t, err) assert.NotNil(t, newFactory) }