diff --git a/consumer/consumererror/partialerror.go b/consumer/consumererror/partialerror.go index c2356e63a52..254af292021 100644 --- a/consumer/consumererror/partialerror.go +++ b/consumer/consumererror/partialerror.go @@ -20,7 +20,9 @@ import "go.opentelemetry.io/collector/consumer/pdata" // The preceding components in the pipeline can use this information for partial retries. type PartialError struct { error - failed pdata.Traces + failed pdata.Traces + failedLogs pdata.Logs + failedMetrics pdata.Metrics } // PartialTracesError creates PartialError for failed traces. @@ -36,3 +38,31 @@ func PartialTracesError(err error, failed pdata.Traces) error { func (err PartialError) GetTraces() pdata.Traces { return err.failed } + +// PartialLogsError creates PartialError for failed logs. +// Use this error type only when a subset of received data set failed to be processed or sent. +func PartialLogsError(err error, failedLogs pdata.Logs) error { + return PartialError{ + error: err, + failedLogs: failedLogs, + } +} + +// GetLogs returns failed logs. +func (err PartialError) GetLogs() pdata.Logs { + return err.failedLogs +} + +// PartialMetricsError creates PartialError for failed metrics. +// Use this error type only when a subset of received data set failed to be processed or sent. +func PartialMetricsError(err error, failedMetrics pdata.Metrics) error { + return PartialError{ + error: err, + failedMetrics: failedMetrics, + } +} + +// GetMetrics returns failed metrics. +func (err PartialError) GetMetrics() pdata.Metrics { + return err.failedMetrics +} diff --git a/consumer/consumererror/partialerror_test.go b/consumer/consumererror/partialerror_test.go index 4143724053a..9a293bcfde8 100644 --- a/consumer/consumererror/partialerror_test.go +++ b/consumer/consumererror/partialerror_test.go @@ -30,3 +30,19 @@ func TestPartialError(t *testing.T) { assert.Equal(t, err.Error(), partialErr.Error()) assert.Equal(t, td, partialErr.(PartialError).failed) } + +func TestPartialErrorLogs(t *testing.T) { + td := testdata.GenerateLogDataOneLog() + err := fmt.Errorf("some error") + partialErr := PartialLogsError(err, td) + assert.Equal(t, err.Error(), partialErr.Error()) + assert.Equal(t, td, partialErr.(PartialError).failedLogs) +} + +func TestPartialErrorMetrics(t *testing.T) { + td := testdata.GenerateMetricsOneMetric() + err := fmt.Errorf("some error") + partialErr := PartialMetricsError(err, td) + assert.Equal(t, err.Error(), partialErr.Error()) + assert.Equal(t, td, partialErr.(PartialError).failedMetrics) +} diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index af78f0bb921..8aa7a764546 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -44,9 +44,8 @@ func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher PushLogsData) req } } -func (req *logsRequest) onPartialError(consumererror.PartialError) request { - // TODO: Implement this - return req +func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) request { + return newLogsRequest(req.ctx, partialErr.GetLogs(), req.pusher) } func (req *logsRequest) export(ctx context.Context) (int, error) { diff --git a/exporter/exporterhelper/logshelper_test.go b/exporter/exporterhelper/logshelper_test.go index 7466b3e5a1b..6df45fea65b 100644 --- a/exporter/exporterhelper/logshelper_test.go +++ b/exporter/exporterhelper/logshelper_test.go @@ -46,10 +46,14 @@ var ( ) func TestLogsRequest(t *testing.T) { - mr := newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil) - - partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) - assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) + mr := newLogsRequest(context.Background(), testdata.GenerateLogDataOneLog(), nil) + + partialErr := consumererror.PartialLogsError(errors.New("some error"), testdata.GenerateLogDataEmpty()) + assert.EqualValues( + t, + newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil), + mr.onPartialError(partialErr.(consumererror.PartialError)), + ) } func TestLogsExporter_InvalidName(t *testing.T) { diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index cb5d511357d..c26b9fd338f 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -54,9 +54,8 @@ func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher PushMetrics } } -func (req *metricsRequest) onPartialError(consumererror.PartialError) request { - // TODO: implement this. - return req +func (req *metricsRequest) onPartialError(partialErr consumererror.PartialError) request { + return newMetricsRequest(req.ctx, partialErr.GetMetrics(), req.pusher) } func (req *metricsRequest) export(ctx context.Context) (int, error) { diff --git a/exporter/exporterhelper/metricshelper_test.go b/exporter/exporterhelper/metricshelper_test.go index 16032c491ad..a9bbbeab5d2 100644 --- a/exporter/exporterhelper/metricshelper_test.go +++ b/exporter/exporterhelper/metricshelper_test.go @@ -46,11 +46,14 @@ var ( ) func TestMetricsRequest(t *testing.T) { - mr := newMetricsRequest(context.Background(), testdata.GenerateMetricsEmpty(), nil) - - partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) - assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) - assert.Equal(t, 0, mr.count()) + mr := newMetricsRequest(context.Background(), testdata.GenerateMetricsOneMetric(), nil) + + partialErr := consumererror.PartialMetricsError(errors.New("some error"), testdata.GenerateMetricsEmpty()) + assert.EqualValues( + t, + newMetricsRequest(context.Background(), testdata.GenerateMetricsEmpty(), nil), + mr.onPartialError(partialErr.(consumererror.PartialError)), + ) } func TestMetricsExporter_InvalidName(t *testing.T) {