Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of partial requests for logs and metrics to the exporterhelper #2059

Merged
merged 8 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion consumer/consumererror/partialerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import "go.opentelemetry.io/collector/consumer/pdata"
// The preceding components in the pipeline can use this information for partial retries.
type PartialError struct {
error
failed pdata.Traces
failed pdata.Traces
failedLogs pdata.Logs
failedMetrics pdata.Metrics
}

// PartialTracesError creates PartialError for failed traces.
Expand All @@ -36,3 +38,31 @@ func PartialTracesError(err error, failed pdata.Traces) error {
func (err PartialError) GetTraces() pdata.Traces {
return err.failed
}

// PartialLogsError creates PartialError for failed logs.
// Use this error type only when a subset of received data set failed to be processed or sent.
func PartialLogsError(err error, failedLogs pdata.Logs) error {
return PartialError{
error: err,
failedLogs: failedLogs,
}
}

// GetLogs returns failed logs.
func (err PartialError) GetLogs() pdata.Logs {
return err.failedLogs
}

// PartialMetricsError creates PartialError for failed metrics.
// Use this error type only when a subset of received data set failed to be processed or sent.
func PartialMetricsError(err error, failedMetrics pdata.Metrics) error {
return PartialError{
error: err,
failedMetrics: failedMetrics,
}
}

// GetMetrics returns failed metrics.
func (err PartialError) GetMetrics() pdata.Metrics {
return err.failedMetrics
}
16 changes: 16 additions & 0 deletions consumer/consumererror/partialerror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,19 @@ func TestPartialError(t *testing.T) {
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failed)
}

func TestPartialErrorLogs(t *testing.T) {
td := testdata.GenerateLogDataOneLog()
err := fmt.Errorf("some error")
partialErr := PartialLogsError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failedLogs)
}

func TestPartialErrorMetrics(t *testing.T) {
td := testdata.GenerateMetricsOneMetric()
err := fmt.Errorf("some error")
partialErr := PartialMetricsError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failedMetrics)
}
5 changes: 2 additions & 3 deletions exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher PushLogsData) req
}
}

func (req *logsRequest) onPartialError(consumererror.PartialError) request {
// TODO: Implement this
return req
func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) request {
return newLogsRequest(req.ctx, partialErr.GetLogs(), req.pusher)
}

func (req *logsRequest) export(ctx context.Context) (int, error) {
Expand Down
12 changes: 8 additions & 4 deletions exporter/exporterhelper/logshelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ var (
)

func TestLogsRequest(t *testing.T) {
mr := newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil)

partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan())
assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError)))
mr := newLogsRequest(context.Background(), testdata.GenerateLogDataOneLog(), nil)

partialErr := consumererror.PartialLogsError(errors.New("some error"), testdata.GenerateLogDataEmpty())
assert.EqualValues(
t,
newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil),
mr.onPartialError(partialErr.(consumererror.PartialError)),
)
}

func TestLogsExporter_InvalidName(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher PushMetrics
}
}

func (req *metricsRequest) onPartialError(consumererror.PartialError) request {
// TODO: implement this.
return req
func (req *metricsRequest) onPartialError(partialErr consumererror.PartialError) request {
return newMetricsRequest(req.ctx, partialErr.GetMetrics(), req.pusher)
}

func (req *metricsRequest) export(ctx context.Context) (int, error) {
Expand Down
13 changes: 8 additions & 5 deletions exporter/exporterhelper/metricshelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ var (
)

func TestMetricsRequest(t *testing.T) {
mr := newMetricsRequest(context.Background(), testdata.GenerateMetricsEmpty(), nil)

partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan())
assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError)))
assert.Equal(t, 0, mr.count())
mr := newMetricsRequest(context.Background(), testdata.GenerateMetricsOneMetric(), nil)

partialErr := consumererror.PartialMetricsError(errors.New("some error"), testdata.GenerateMetricsEmpty())
assert.EqualValues(
t,
newMetricsRequest(context.Background(), testdata.GenerateMetricsEmpty(), nil),
mr.onPartialError(partialErr.(consumererror.PartialError)),
)
}

func TestMetricsExporter_InvalidName(t *testing.T) {
Expand Down