Skip to content

Commit

Permalink
[exporter/loadbalancing] Tests for shutdown/consume concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Mar 5, 2024
1 parent 0bd3c27 commit adf3474
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 21 deletions.
67 changes: 60 additions & 7 deletions exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) {
assert.Len(t, sink.AllLogs(), 1)
}

// this test validates that exporter is can concurrently change the endpoints while consuming logs.
func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) {
consumeStarted := make(chan struct{})
consumeDone := make(chan struct{})

// imitate a slow exporter
te := &mockLogsExporter{Component: mockComponent{}}
te.consumelogsfn = func(ctx context.Context, td plog.Logs) error {
close(consumeStarted)
time.Sleep(50 * time.Millisecond)
return te.consumeErr
}
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return te, nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig())
require.NotNil(t, p)
require.NoError(t, err)

endpoints := []string{"endpoint-1"}
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(ctx context.Context) ([]string, error) {
return endpoints, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

go func() {
assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs()))
close(consumeDone)
}()

// update endpoint while consuming logs
<-consumeStarted
endpoints = []string{"endpoint-2"}
endpoint, err := lb.res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, endpoints, endpoint)
<-consumeDone
}

func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331")

Expand Down Expand Up @@ -458,15 +510,21 @@ func simpleLogWithoutID() plog.Logs {
type mockLogsExporter struct {
component.Component
consumelogsfn func(ctx context.Context, ld plog.Logs) error
consumeErr error
}

func (e *mockLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *mockLogsExporter) Shutdown(context.Context) error {
e.consumeErr = errors.New("exporter is shut down")
return nil
}

func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if e.consumelogsfn == nil {
return nil
return e.consumeErr
}
return e.consumelogsfn(ctx, ld)
}
Expand All @@ -484,10 +542,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e
}

func newNopMockLogsExporter() exporter.Logs {
return &mockLogsExporter{
Component: mockComponent{},
consumelogsfn: func(ctx context.Context, ld plog.Logs) error {
return nil
},
}
return &mockLogsExporter{Component: mockComponent{}}
}
67 changes: 60 additions & 7 deletions exporter/loadbalancingexporter/metrics_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,58 @@ func TestConsumeMetrics(t *testing.T) {

}

// this test validates that exporter is can concurrently change the endpoints while consuming metrics.
func TestConsumeMetrics_ConcurrentResolverChange(t *testing.T) {
consumeStarted := make(chan struct{})
consumeDone := make(chan struct{})

// imitate a slow exporter
te := &mockMetricsExporter{Component: mockComponent{}}
te.ConsumeMetricsFn = func(ctx context.Context, td pmetric.Metrics) error {
close(consumeStarted)
time.Sleep(50 * time.Millisecond)
return te.consumeErr
}
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return te, nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newMetricsExporter(exportertest.NewNopCreateSettings(), simpleConfig())
require.NotNil(t, p)
require.NoError(t, err)

endpoints := []string{"endpoint-1"}
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(ctx context.Context) ([]string, error) {
return endpoints, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

go func() {
assert.NoError(t, p.ConsumeMetrics(context.Background(), simpleMetricsWithResource()))
close(consumeDone)
}()

// update endpoint while consuming logs
<-consumeStarted
endpoints = []string{"endpoint-2"}
endpoint, err := lb.res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, endpoints, endpoint)
<-consumeDone
}

func TestConsumeMetricsServiceBased(t *testing.T) {
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return newNopMockMetricsExporter(), nil
Expand Down Expand Up @@ -849,6 +901,7 @@ func appendSimpleMetricWithID(dest pmetric.ResourceMetrics, id string) {
type mockMetricsExporter struct {
component.Component
ConsumeMetricsFn func(ctx context.Context, td pmetric.Metrics) error
consumeErr error
}

func newMockMetricsExporter(consumeMetricsFn func(ctx context.Context, td pmetric.Metrics) error) exporter.Metrics {
Expand All @@ -859,21 +912,21 @@ func newMockMetricsExporter(consumeMetricsFn func(ctx context.Context, td pmetri
}

func newNopMockMetricsExporter() exporter.Metrics {
return &mockMetricsExporter{
Component: mockComponent{},
ConsumeMetricsFn: func(ctx context.Context, md pmetric.Metrics) error {
return nil
},
}
return &mockMetricsExporter{Component: mockComponent{}}
}

func (e *mockMetricsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *mockMetricsExporter) Shutdown(context.Context) error {
e.consumeErr = errors.New("exporter is shut down")
return nil
}

func (e *mockMetricsExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if e.ConsumeMetricsFn == nil {
return nil
return e.consumeErr
}
return e.ConsumeMetricsFn(ctx, md)
}
68 changes: 61 additions & 7 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,59 @@ func TestConsumeTraces(t *testing.T) {
assert.Nil(t, res)
}

// This test validates that exporter is can concurrently change the endpoints while consuming traces.
func TestConsumeTraces_ConcurrentResolverChange(t *testing.T) {
consumeStarted := make(chan struct{})
consumeDone := make(chan struct{})

// imitate a slow exporter
te := &mockTracesExporter{Component: mockComponent{}}
te.ConsumeTracesFn = func(ctx context.Context, td ptrace.Traces) error {
close(consumeStarted)
time.Sleep(50 * time.Millisecond)
return te.consumeErr
}
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return te, nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, traceIDRouting)

endpoints := []string{"endpoint-1"}
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(ctx context.Context) ([]string, error) {
return endpoints, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

go func() {
assert.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces()))
close(consumeDone)
}()

// update endpoint while consuming traces
<-consumeStarted
endpoints = []string{"endpoint-2"}
endpoint, err := lb.res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, endpoints, endpoint)
<-consumeDone
}

func TestConsumeTracesServiceBased(t *testing.T) {
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return newNopMockTracesExporter(), nil
Expand Down Expand Up @@ -688,6 +741,7 @@ func serviceBasedRoutingConfig() *Config {
type mockTracesExporter struct {
component.Component
ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error
consumeErr error
}

func newMockTracesExporter(consumeTracesFn func(ctx context.Context, td ptrace.Traces) error) exporter.Traces {
Expand All @@ -698,12 +752,12 @@ func newMockTracesExporter(consumeTracesFn func(ctx context.Context, td ptrace.T
}

func newNopMockTracesExporter() exporter.Traces {
return &mockTracesExporter{
Component: mockComponent{},
ConsumeTracesFn: func(ctx context.Context, td ptrace.Traces) error {
return nil
},
}
return &mockTracesExporter{Component: mockComponent{}}
}

func (e *mockTracesExporter) Shutdown(context.Context) error {
e.consumeErr = errors.New("exporter is shut down")
return nil
}

func (e *mockTracesExporter) Capabilities() consumer.Capabilities {
Expand All @@ -712,7 +766,7 @@ func (e *mockTracesExporter) Capabilities() consumer.Capabilities {

func (e *mockTracesExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if e.ConsumeTracesFn == nil {
return nil
return e.consumeErr
}
return e.ConsumeTracesFn(ctx, td)
}

0 comments on commit adf3474

Please sign in to comment.