Skip to content

Commit

Permalink
Use queue retry per exporter (#2444)
Browse files Browse the repository at this point in the history
* Use queue retry per exporter

Signed-off-by: Pavol Loffay <[email protected]>

* enable qretry by default

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Sep 2, 2020
1 parent 8630a31 commit e558711
Show file tree
Hide file tree
Showing 17 changed files with 85 additions and 70 deletions.
4 changes: 0 additions & 4 deletions cmd/opentelemetry/app/defaultconfig/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/processor/queuedprocessor"
"go.opentelemetry.io/collector/processor/resourceprocessor"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
Expand Down Expand Up @@ -107,9 +106,6 @@ func createProcessors(factories component.Factories) (configmodels.Processors, [
batch := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config)
processors[batch.Name()] = batch
names = append(names, batch.Name())
queuedRetry := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config)
processors[queuedRetry.Name()] = queuedRetry
names = append(names, queuedRetry.Name())
return processors, names
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/opentelemetry/app/defaultconfig/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{"jaeger"},
},
},
Expand All @@ -71,7 +71,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"resource", "batch", "queued_retry"},
Processors: []string{"resource", "batch"},
Exporters: []string{elasticsearchexporter.TypeStr, kafkaexporter.TypeStr, memoryexporter.TypeStr},
},
},
Expand All @@ -88,7 +88,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
Expand All @@ -105,7 +105,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr},
},
},
Expand All @@ -123,7 +123,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger", "zipkin"},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
Expand Down
9 changes: 7 additions & 2 deletions cmd/opentelemetry/app/exporter/badgerexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package badgerexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config holds configuration of Jaeger Badger exporter/storage.
type Config struct {
badger.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

badger.Options `mapstructure:",squash"`
}
23 changes: 13 additions & 10 deletions cmd/opentelemetry/app/exporter/badgerexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package badgerexporter

import (
"context"
"fmt"
"sync"

"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand Down Expand Up @@ -78,11 +78,14 @@ func (f Factory) Type() configmodels.Type {
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.optionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
Options: *opts,
}
}

Expand All @@ -93,11 +96,15 @@ func (f Factory) CreateTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
factory, err := f.createStorageFactory(params, cfg)
config := cfg.(*Config)
factory, err := f.createStorageFactory(params, config)
if err != nil {
return nil, err
}
return exporter.NewSpanWriterExporter(cfg, factory)
return exporter.NewSpanWriterExporter(cfg, factory,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings))
}

// CreateMetricsExporter is not implemented.
Expand All @@ -110,18 +117,14 @@ func (f Factory) CreateMetricsExporter(
return nil, configerror.ErrDataTypeIsNotSupported
}

func (f Factory) createStorageFactory(params component.ExporterCreateParams, cfg configmodels.Exporter) (storage.Factory, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
func (f Factory) createStorageFactory(params component.ExporterCreateParams, cfg *Config) (storage.Factory, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if instance != nil {
return instance, nil
}
factory := badger.NewFactory()
factory.InitFromOptions(config.Options)
factory.InitFromOptions(cfg.Options)
err := factory.Initialize(metrics.NullFactory, params.Logger)
if err != nil {
return nil, err
Expand Down
7 changes: 0 additions & 7 deletions cmd/opentelemetry/app/exporter/badgerexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ func TestCreateTraceExporter(t *testing.T) {
assert.NotNil(t, exporter)
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, nil)
require.Nil(t, exporter)
assert.Contains(t, err.Error(), "could not cast configuration to jaeger_badger")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory(DefaultOptions)
cfg := factory.CreateDefaultConfig()
Expand Down
9 changes: 7 additions & 2 deletions cmd/opentelemetry/app/exporter/cassandraexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package cassandraexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// Config holds configuration of Jaeger Cassandra exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
cassandra.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

cassandra.Options `mapstructure:",squash"`
}
6 changes: 5 additions & 1 deletion cmd/opentelemetry/app/exporter/cassandraexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cassandraexporter
import (
"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
Expand All @@ -31,5 +32,8 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace
if err != nil {
return nil, err
}
return exporter.NewSpanWriterExporter(config, f)
return exporter.NewSpanWriterExporter(config, f,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings))
}
12 changes: 6 additions & 6 deletions cmd/opentelemetry/app/exporter/cassandraexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package cassandraexporter

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)
Expand Down Expand Up @@ -55,11 +55,14 @@ func (Factory) Type() configmodels.Type {
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
Options: *opts,
}
}

Expand All @@ -70,10 +73,7 @@ func (f Factory) CreateTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
config := cfg.(*Config)
return new(config, params)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ func TestCreateTraceExporter(t *testing.T) {
assert.Contains(t, err.Error(), "gocql: unable to create session")
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, nil)
require.Nil(t, exporter)
assert.Contains(t, err.Error(), "could not cast configuration to jaeger_cassandra")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package elasticsearchexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/es"
)

// Config holds configuration of Jaeger Elasticsearch exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
es.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

es.Options `mapstructure:",squash"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func new(ctx context.Context, config *Config, params component.ExporterCreatePar
return exporterhelper.NewTraceExporter(
config,
w.WriteTraces,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings),
exporterhelper.WithShutdown(func(ctx context.Context) error {
return esCfg.TLS.Close()
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/es"
)
Expand Down Expand Up @@ -55,11 +56,14 @@ var _ component.ExporterFactory = (*Factory)(nil)
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
Options: *opts,
}
}

Expand Down
9 changes: 7 additions & 2 deletions cmd/opentelemetry/app/exporter/grpcpluginexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package grpcpluginexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

grpcStorage "github.com/jaegertracing/jaeger/plugin/storage/grpc"
)

// Config holds configuration of Jaeger gRPC exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
grpcStorage.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

grpcStorage.Options `mapstructure:",squash"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpcpluginexporter
import (
"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
storageGrpc "github.com/jaegertracing/jaeger/plugin/storage/grpc"
Expand All @@ -30,5 +31,8 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(&config.ExporterSettings, factory)
return storageOtelExporter.NewSpanWriterExporter(&config.ExporterSettings, factory,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings))
}
13 changes: 7 additions & 6 deletions cmd/opentelemetry/app/exporter/grpcpluginexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package grpcpluginexporter

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

storageGrpc "github.com/jaegertracing/jaeger/plugin/storage/grpc"
)
Expand Down Expand Up @@ -53,11 +53,15 @@ func (f Factory) Type() configmodels.Type {
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),

Options: *opts,
}
}

Expand All @@ -68,10 +72,7 @@ func (f Factory) CreateTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
grpcCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
grpcCfg := cfg.(*Config)
return new(grpcCfg, params)
}

Expand Down
Loading

0 comments on commit e558711

Please sign in to comment.