Skip to content

Commit

Permalink
Handle errors in exporters (#1259)
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling authored Jul 3, 2020
1 parent 117cf7a commit 64403a6
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
5 changes: 3 additions & 2 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jaegerexporter

import (
"context"
"fmt"

jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
"google.golang.org/grpc"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *protoGRPCSender) pushTraceData(

batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err))
}

if s.metadata.Len() > 0 {
Expand All @@ -83,7 +84,7 @@ func (s *protoGRPCSender) pushTraceData(
ctx,
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))
if err != nil {
return td.SpanCount() - sentSpans, err
return td.SpanCount() - sentSpans, fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)
}
sentSpans += len(batch.Spans)
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err)
}

err := exporter.ExportTraceServiceRequest(
Expand All @@ -168,7 +168,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
)
oce.exporters <- exporter
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err)
}
return 0, nil
}
Expand All @@ -181,7 +181,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err)
}

req := &agentmetricspb.ExportMetricsServiceRequest{
Expand All @@ -192,7 +192,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
err := exporter.ExportMetricsServiceRequest(req)
oce.exporters <- exporter
if err != nil {
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err)
}
return 0, nil
}
6 changes: 3 additions & 3 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
err := oce.exporter.exportTrace(ctx, request)

if err != nil {
return td.SpanCount(), err
return td.SpanCount(), fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
}
return 0, nil
}
Expand All @@ -171,7 +171,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
err := oce.exporter.exportMetrics(ctx, request)

if err != nil {
return imd.MetricCount(), err
return imd.MetricCount(), fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err)
}
return 0, nil
}
Expand All @@ -183,7 +183,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
err := oce.exporter.exportLogs(ctx, request)

if err != nil {
return logs.LogRecordCount(), err
return logs.LogRecordCount(), fmt.Errorf("failed to push log data via OTLP exporter: %w", err)
}
return 0, nil
}
13 changes: 6 additions & 7 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,33 +83,32 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) {
return ze, nil
}

func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) {
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) {
tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans))

var resource *resourcepb.Resource = td.Resource

for _, span := range td.Spans {
zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
tbatch = append(tbatch, zs)
}

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}

req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
req.Header.Set("Content-Type", ze.serializer.ContentType())

resp, err := ze.client.Do(req)
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
Expand Down
6 changes: 4 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ func (bp *batchTraceProcessor) resetTimer() {
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
stats.RecordWithTags(context.Background(), statsTags, measure.M(1))

_ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData())
if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
}
bp.batchTraces.reset()
}

Expand Down

0 comments on commit 64403a6

Please sign in to comment.