From 64403a66838736b9b50f67fcdbba4404e023b964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Fri, 3 Jul 2020 15:41:46 +0200 Subject: [PATCH] Handle errors in exporters (#1259) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- exporter/jaegerexporter/exporter.go | 5 +++-- exporter/opencensusexporter/opencensus.go | 8 ++++---- exporter/otlpexporter/otlp.go | 6 +++--- exporter/zipkinexporter/zipkin.go | 13 ++++++------- processor/batchprocessor/batch_processor.go | 6 ++++-- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/exporter/jaegerexporter/exporter.go b/exporter/jaegerexporter/exporter.go index 0409ca5c054..ac4944cde0f 100644 --- a/exporter/jaegerexporter/exporter.go +++ b/exporter/jaegerexporter/exporter.go @@ -16,6 +16,7 @@ package jaegerexporter import ( "context" + "fmt" jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2" "google.golang.org/grpc" @@ -70,7 +71,7 @@ func (s *protoGRPCSender) pushTraceData( batches, err := jaegertranslator.InternalTracesToJaegerProto(td) if err != nil { - return td.SpanCount(), consumererror.Permanent(err) + return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)) } if s.metadata.Len() > 0 { @@ -83,7 +84,7 @@ func (s *protoGRPCSender) pushTraceData( ctx, &jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady)) if err != nil { - return td.SpanCount() - sentSpans, err + return td.SpanCount() - sentSpans, fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err) } sentSpans += len(batch.Spans) } diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index d25b88ff4f6..b1ee31a2a6a 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -156,7 +156,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T code: errAlreadyStopped, msg: "OpenCensus exporter was already stopped.", } - return len(td.Spans), err + return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err) } err := exporter.ExportTraceServiceRequest( @@ -168,7 +168,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T ) oce.exporters <- exporter if err != nil { - return len(td.Spans), err + return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err) } return 0, nil } @@ -181,7 +181,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata code: errAlreadyStopped, msg: "OpenCensus exporter was already stopped.", } - return exporterhelper.NumTimeSeries(md), err + return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err) } req := &agentmetricspb.ExportMetricsServiceRequest{ @@ -192,7 +192,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata err := exporter.ExportMetricsServiceRequest(req) oce.exporters <- exporter if err != nil { - return exporterhelper.NumTimeSeries(md), err + return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err) } return 0, nil } diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 7bd20608018..0a3e43aab92 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -158,7 +158,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in err := oce.exporter.exportTrace(ctx, request) if err != nil { - return td.SpanCount(), err + return td.SpanCount(), fmt.Errorf("failed to push trace data via OTLP exporter: %w", err) } return 0, nil } @@ -171,7 +171,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics) err := oce.exporter.exportMetrics(ctx, request) if err != nil { - return imd.MetricCount(), err + return imd.MetricCount(), fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err) } return 0, nil } @@ -183,7 +183,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int, err := oce.exporter.exportLogs(ctx, request) if err != nil { - return logs.LogRecordCount(), err + return logs.LogRecordCount(), fmt.Errorf("failed to push log data via OTLP exporter: %w", err) } return 0, nil } diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index defd3c8cd77..61dbc536520 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -83,33 +83,32 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) { return ze, nil } -func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) { +func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) { tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans)) - var resource *resourcepb.Resource = td.Resource for _, span := range td.Spans { zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName) if err != nil { - return len(td.Spans), consumererror.Permanent(err) + return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) } tbatch = append(tbatch, zs) } body, err := ze.serializer.Serialize(tbatch) if err != nil { - return len(td.Spans), consumererror.Permanent(err) + return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) } - req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body)) + req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body)) if err != nil { - return len(td.Spans), err + return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } req.Header.Set("Content-Type", ze.serializer.ContentType()) resp, err := ze.client.Do(req) if err != nil { - return len(td.Spans), err + return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } _ = resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index fe10d3d2301..87488596442 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -171,9 +171,11 @@ func (bp *batchTraceProcessor) resetTimer() { func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) { // Add that it came form the trace pipeline? statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)} - _ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1)) + stats.RecordWithTags(context.Background(), statsTags, measure.M(1)) - _ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()) + if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil { + bp.logger.Warn("Sender failed", zap.Error(err)) + } bp.batchTraces.reset() }