Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use queue retry per exporter #2444

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/opentelemetry/app/defaultconfig/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/processor/queuedprocessor"
"go.opentelemetry.io/collector/processor/resourceprocessor"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
Expand Down Expand Up @@ -107,9 +106,6 @@ func createProcessors(factories component.Factories) (configmodels.Processors, [
batch := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config)
processors[batch.Name()] = batch
names = append(names, batch.Name())
queuedRetry := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config)
processors[queuedRetry.Name()] = queuedRetry
names = append(names, queuedRetry.Name())
return processors, names
}

Expand Down
10 changes: 5 additions & 5 deletions cmd/opentelemetry/app/defaultconfig/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{"jaeger"},
},
},
Expand All @@ -71,7 +71,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"resource", "batch", "queued_retry"},
Processors: []string{"resource", "batch"},
Exporters: []string{elasticsearchexporter.TypeStr, kafkaexporter.TypeStr, memoryexporter.TypeStr},
},
},
Expand All @@ -88,7 +88,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
Expand All @@ -105,7 +105,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr},
},
},
Expand All @@ -123,7 +123,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger", "zipkin"},
Processors: []string{"batch", "queued_retry"},
Processors: []string{"batch"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
Expand Down
9 changes: 7 additions & 2 deletions cmd/opentelemetry/app/exporter/badgerexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package badgerexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config holds configuration of Jaeger Badger exporter/storage.
type Config struct {
badger.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

badger.Options `mapstructure:",squash"`
}
26 changes: 16 additions & 10 deletions cmd/opentelemetry/app/exporter/badgerexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package badgerexporter

import (
"context"
"fmt"
"sync"

"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand Down Expand Up @@ -76,13 +76,19 @@ func (f Factory) Type() configmodels.Type {
// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL component.ExporterFactoryBase interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings by default.
qs := exporterhelper.CreateDefaultQueueSettings()
qs.Enabled = false
opts := f.optionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
Options: *opts,
}
}

Expand All @@ -93,11 +99,15 @@ func (f Factory) CreateTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
factory, err := f.createStorageFactory(params, cfg)
config := cfg.(*Config)
factory, err := f.createStorageFactory(params, config)
if err != nil {
return nil, err
}
return exporter.NewSpanWriterExporter(cfg, factory)
return exporter.NewSpanWriterExporter(cfg, factory,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings))
}

// CreateMetricsExporter is not implemented.
Expand All @@ -110,17 +120,13 @@ func (f Factory) CreateMetricsExporter(
return nil, configerror.ErrDataTypeIsNotSupported
}

func (f Factory) createStorageFactory(params component.ExporterCreateParams, cfg configmodels.Exporter) (storage.Factory, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
func (f Factory) createStorageFactory(params component.ExporterCreateParams, cfg *Config) (storage.Factory, error) {
f.mutex.Lock()
if instance != nil {
return instance, nil
}
factory := badger.NewFactory()
factory.InitFromOptions(config.Options)
factory.InitFromOptions(cfg.Options)
err := factory.Initialize(metrics.NullFactory, params.Logger)
if err != nil {
return nil, err
Expand Down
7 changes: 0 additions & 7 deletions cmd/opentelemetry/app/exporter/badgerexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ func TestCreateTraceExporter(t *testing.T) {
assert.NotNil(t, exporter)
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, nil)
require.Nil(t, exporter)
assert.Contains(t, err.Error(), "could not cast configuration to jaeger_badger")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory(DefaultOptions)
cfg := factory.CreateDefaultConfig()
Expand Down
9 changes: 7 additions & 2 deletions cmd/opentelemetry/app/exporter/cassandraexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package cassandraexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)

// Config holds configuration of Jaeger Cassandra exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
cassandra.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

cassandra.Options `mapstructure:",squash"`
}
6 changes: 5 additions & 1 deletion cmd/opentelemetry/app/exporter/cassandraexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cassandraexporter
import (
"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
Expand All @@ -31,5 +32,8 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace
if err != nil {
return nil, err
}
return exporter.NewSpanWriterExporter(config, f)
return exporter.NewSpanWriterExporter(config, f,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings))
}
16 changes: 10 additions & 6 deletions cmd/opentelemetry/app/exporter/cassandraexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package cassandraexporter

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
)
Expand Down Expand Up @@ -53,13 +53,20 @@ func (Factory) Type() configmodels.Type {
// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL component.ExporterFactoryBase interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings by default.
qs := exporterhelper.CreateDefaultQueueSettings()
qs.Enabled = false

opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
Options: *opts,
}
}

Expand All @@ -70,10 +77,7 @@ func (f Factory) CreateTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
config := cfg.(*Config)
return new(config, params)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ func TestCreateTraceExporter(t *testing.T) {
assert.Contains(t, err.Error(), "gocql: unable to create session")
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, nil)
require.Nil(t, exporter)
assert.Contains(t, err.Error(), "could not cast configuration to jaeger_cassandra")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{OptionsFactory: DefaultOptions}
cfg := factory.CreateDefaultConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package elasticsearchexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/es"
)

// Config holds configuration of Jaeger Elasticsearch exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
es.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

es.Options `mapstructure:",squash"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func new(ctx context.Context, config *Config, params component.ExporterCreatePar
return exporterhelper.NewTraceExporter(
config,
w.WriteTraces,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings),
exporterhelper.WithShutdown(func(ctx context.Context) error {
return esCfg.TLS.Close()
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/plugin/storage/es"
)
Expand Down Expand Up @@ -53,13 +54,19 @@ var _ component.ExporterFactory = (*Factory)(nil)
// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL component.ExporterFactoryBase interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings by default.
qs := exporterhelper.CreateDefaultQueueSettings()
qs.Enabled = false
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
Options: *opts,
}
}

Expand Down
9 changes: 7 additions & 2 deletions cmd/opentelemetry/app/exporter/grpcpluginexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package grpcpluginexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

grpcStorage "github.com/jaegertracing/jaeger/plugin/storage/grpc"
)

// Config holds configuration of Jaeger gRPC exporter/storage.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
grpcStorage.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

grpcStorage.Options `mapstructure:",squash"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpcpluginexporter
import (
"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"

storageOtelExporter "github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter"
storageGrpc "github.com/jaegertracing/jaeger/plugin/storage/grpc"
Expand All @@ -30,5 +31,8 @@ func new(config *Config, params component.ExporterCreateParams) (component.Trace
if err != nil {
return nil, err
}
return storageOtelExporter.NewSpanWriterExporter(&config.ExporterSettings, factory)
return storageOtelExporter.NewSpanWriterExporter(&config.ExporterSettings, factory,
exporterhelper.WithTimeout(config.TimeoutSettings),
exporterhelper.WithQueue(config.QueueSettings),
exporterhelper.WithRetry(config.RetrySettings))
}
Loading