diff --git a/pkg/kafka/config/.nocover b/pkg/kafka/producer/.nocover similarity index 100% rename from pkg/kafka/config/.nocover rename to pkg/kafka/producer/.nocover diff --git a/pkg/kafka/config/config.go b/pkg/kafka/producer/config.go similarity index 92% rename from pkg/kafka/config/config.go rename to pkg/kafka/producer/config.go index ed45f640650..5079ca8fef2 100644 --- a/pkg/kafka/config/config.go +++ b/pkg/kafka/producer/config.go @@ -12,22 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package config +package producer import ( "github.com/Shopify/sarama" ) +// Builder builds a new kafka producer +type Builder interface { + NewProducer() (sarama.AsyncProducer, error) +} + // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { Brokers []string } -// ProducerBuilder builds a new kafka producer -type ProducerBuilder interface { - NewProducer() (sarama.AsyncProducer, error) -} - // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() diff --git a/plugin/storage/kafka/factory.go b/plugin/storage/kafka/factory.go index 7ec6aca5d1a..7d1ffe91908 100644 --- a/plugin/storage/kafka/factory.go +++ b/plugin/storage/kafka/factory.go @@ -23,7 +23,7 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/pkg/kafka/config" + "github.com/jaegertracing/jaeger/pkg/kafka/producer" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -35,9 +35,9 @@ type Factory struct { metricsFactory metrics.Factory logger *zap.Logger - config config.ProducerBuilder producer sarama.AsyncProducer marshaller Marshaller + producer.Builder } // NewFactory creates a new Factory. @@ -53,16 +53,16 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper) { f.options.InitFromViper(v) - f.config = &f.options.config + f.Builder = &f.options.config } // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - logger.Info("Kafka storage configuration", - zap.Any("producer config", f.config), + logger.Info("Kafka factory", + zap.Any("producer builder", f.Builder), zap.Any("topic", f.options.topic)) - p, err := f.config.NewProducer() + p, err := f.NewProducer() if err != nil { return err } diff --git a/plugin/storage/kafka/factory_test.go b/plugin/storage/kafka/factory_test.go index 054e4af5496..544c69e2c37 100644 --- a/plugin/storage/kafka/factory_test.go +++ b/plugin/storage/kafka/factory_test.go @@ -25,7 +25,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/config" - kafkaConfig "github.com/jaegertracing/jaeger/pkg/kafka/config" + kafkaConfig "github.com/jaegertracing/jaeger/pkg/kafka/producer" "github.com/jaegertracing/jaeger/storage" ) @@ -51,13 +51,13 @@ func TestKafkaFactory(t *testing.T) { command.ParseFlags([]string{}) f.InitFromViper(v) - f.config = &mockProducerBuilder{ + f.Builder = &mockProducerBuilder{ err: errors.New("made-up error"), t: t, } assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - f.config = &mockProducerBuilder{t: t} + f.Builder = &mockProducerBuilder{t: t} assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) assert.IsType(t, &protobufMarshaller{}, f.marshaller) @@ -86,7 +86,7 @@ func TestKafkaFactoryEncoding(t *testing.T) { command.ParseFlags([]string{"--kafka.encoding=" + test.encoding}) f.InitFromViper(v) - f.config = &mockProducerBuilder{t: t} + f.Builder = &mockProducerBuilder{t: t} assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) assert.IsType(t, test.marshaller, f.marshaller) }) @@ -99,6 +99,6 @@ func TestKafkaFactoryMarshallerErr(t *testing.T) { command.ParseFlags([]string{"--kafka.encoding=bad-input"}) f.InitFromViper(v) - f.config = &mockProducerBuilder{t: t} + f.Builder = &mockProducerBuilder{t: t} assert.Error(t, f.Initialize(metrics.NullFactory, zap.NewNop())) } diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index a2d14183100..e93ea8a75f9 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -21,7 +21,7 @@ import ( "github.com/spf13/viper" - "github.com/jaegertracing/jaeger/pkg/kafka/config" + "github.com/jaegertracing/jaeger/pkg/kafka/producer" ) const ( @@ -40,7 +40,7 @@ const ( // Options stores the configuration options for Kafka type Options struct { - config config.Configuration + config producer.Configuration topic string encoding string } @@ -64,7 +64,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { - opt.config = config.Configuration{ + opt.config = producer.Configuration{ Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","), } opt.topic = v.GetString(configPrefix + suffixTopic)