Skip to content

Commit

Permalink
Add context.Context to WriteSpan (jaegertracing#2436)
Browse files Browse the repository at this point in the history
* Add context.Context to WriteSpan

Signed-off-by: Yuri Shkuro <[email protected]>

* fmt

Signed-off-by: Yuri Shkuro <[email protected]>

* Regen mocks and fix tests

Signed-off-by: Yuri Shkuro <[email protected]>

* Remove context field

Signed-off-by: Yuri Shkuro <[email protected]>

* add TODOs

Signed-off-by: Yuri Shkuro <[email protected]>

* remove Context field

Signed-off-by: Yuri Shkuro <[email protected]>
Signed-off-by: albertteoh <[email protected]>
  • Loading branch information
yurishkuro authored and albertteoh committed Sep 3, 2020
1 parent 6f5df2b commit 7d87736
Show file tree
Hide file tree
Showing 40 changed files with 166 additions and 144 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ install-mockery:
.PHONY: generate-mocks
generate-mocks: install-mockery
$(MOCKERY) -all -dir ./pkg/es/ -output ./pkg/es/mocks && rm pkg/es/mocks/ClientBuilder.go
$(MOCKERY) -all -dir ./storage/spanstore/ -output ./storage/spanstore/mocks

.PHONY: echo-version
echo-version:
Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package app

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -140,7 +141,8 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
}

startTime := time.Now()
if err := sp.spanWriter.WriteSpan(span); err != nil {
// TODO context should be propagated from upstream components
if err := sp.spanWriter.WriteSpan(context.TODO(), span); err != nil {
sp.logger.Error("Failed to save span", zap.Error(err))
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
} else {
Expand Down
5 changes: 3 additions & 2 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package app

import (
"context"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -166,7 +167,7 @@ type fakeSpanWriter struct {
err error
}

func (n *fakeSpanWriter) WriteSpan(span *model.Span) error {
func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
return n.err
}

Expand Down Expand Up @@ -273,7 +274,7 @@ type blockingWriter struct {
sync.Mutex
}

func (w *blockingWriter) WriteSpan(span *model.Span) error {
func (w *blockingWriter) WriteSpan(ctx context.Context, span *model.Span) error {
w.Lock()
defer w.Unlock()
return nil
Expand Down
6 changes: 4 additions & 2 deletions cmd/ingester/app/processor/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package processor

import (
"context"
"fmt"
"io"

Expand Down Expand Up @@ -58,9 +59,10 @@ func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor {

// Process unmarshals and writes a single kafka message
func (s KafkaSpanProcessor) Process(message Message) error {
mSpan, err := s.unmarshaller.Unmarshal(message.Value())
span, err := s.unmarshaller.Unmarshal(message.Value())
if err != nil {
return fmt.Errorf("cannot unmarshall byte array into span: %w", err)
}
return s.writer.WriteSpan(mSpan)
// TODO context should be propagated from upstream components
return s.writer.WriteSpan(context.TODO(), span)
}
3 changes: 2 additions & 1 deletion cmd/ingester/app/processor/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package processor

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -45,7 +46,7 @@ func TestSpanProcessor_Process(t *testing.T) {

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(span, nil)
writer.On("WriteSpan", span).Return(nil)
writer.On("WriteSpan", context.Background(), span).Return(nil)

assert.Nil(t, processor.Process(message))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand All @@ -61,13 +62,15 @@ type storageWrapper struct {
writer *esSpanWriter
}

func (s storageWrapper) WriteSpan(span *model.Span) error {
var _ spanstore.Writer = (*storageWrapper)(nil)

func (s storageWrapper) WriteSpan(ctx context.Context, span *model.Span) error {
// This fails because there is no binary tag type in OTEL and also OTEL span's status code is always created
//traces := jaegertranslator.ProtoBatchesToInternalTraces([]*model.Batch{{Process: span.Process, Spans: []*model.Span{span}}})
//_, err := s.writer.WriteTraces(context.Background(), traces)
converter := dbmodel.FromDomain{}
dbSpan := converter.FromDomainEmbedProcess(span)
_, err := s.writer.writeSpans(context.Background(), []*dbmodel.Span{dbSpan})
_, err := s.writer.writeSpans(ctx, []*dbmodel.Span{dbSpan})
return err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/opentelemetry/app/exporter/span_writer_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *store) traceDataPusher(ctx context.Context, td pdata.Traces) (droppedSp
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
err := s.Writer.WriteSpan(span)
err := s.Writer.WriteSpan(ctx, span)
if err != nil {
errs = append(errs, err)
dropped++
Expand Down
4 changes: 2 additions & 2 deletions cmd/opentelemetry/app/exporter/span_writer_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type spanWriter struct {
err error
}

func (w spanWriter) WriteSpan(span *model.Span) error {
func (w spanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
if span.GetOperationName() == "error" {
return w.err
}
Expand All @@ -139,7 +139,7 @@ func (spanWriter) Close() error {
type noClosableWriter struct {
}

func (noClosableWriter) WriteSpan(span *model.Span) error {
func (noClosableWriter) WriteSpan(ctx context.Context, span *model.Span) error {
return nil
}

Expand Down
50 changes: 26 additions & 24 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,27 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var (
_ spanstore.Writer = (*writer)(nil)
_ component.Receiver = (*kafkaReceiver)(nil)
)

type kafkaReceiver struct {
logger *zap.Logger
consumer *ingester.Consumer
}

type writer struct {
receiver string
nextConsumer consumer.TraceConsumer
}

func new(
config *Config,
nextConsumer consumer.TraceConsumer,
params component.ReceiverCreateParams,
) (component.TraceReceiver, error) {
ctx := obsreport.ReceiverContext(
context.Background(), config.Name(), "kafka", "kafka")
ctx = obsreport.StartTraceDataReceiveOp(
ctx, TypeStr, "kafka")
w := &writer{
ctx: ctx,
nextConsumer: nextConsumer,
}
w := &writer{receiver: config.Name(), nextConsumer: nextConsumer}
consumer, err := builder.CreateConsumer(
params.Logger,
metrics.NullFactory,
Expand All @@ -57,13 +65,6 @@ func new(
}, nil
}

type kafkaReceiver struct {
logger *zap.Logger
consumer *ingester.Consumer
}

var _ component.Receiver = (*kafkaReceiver)(nil)

// Start starts the receiver.
func (r kafkaReceiver) Start(_ context.Context, _ component.Host) error {
r.consumer.Start()
Expand All @@ -75,19 +76,20 @@ func (r kafkaReceiver) Shutdown(_ context.Context) error {
return r.consumer.Close()
}

type writer struct {
nextConsumer consumer.TraceConsumer
ctx context.Context
}

var _ spanstore.Writer = (*writer)(nil)

// WriteSpan writes a span to the next consumer.
func (w writer) WriteSpan(span *model.Span) error {
func (w writer) WriteSpan(ctx context.Context, span *model.Span) error {
batch := model.Batch{
Spans: []*model.Span{span},
Process: span.Process,
}
traces := jaegertranslator.ProtoBatchToInternalTraces(batch)
return w.nextConsumer.ConsumeTraces(w.ctx, traces)
return w.nextConsumer.ConsumeTraces(w.addContextMetrics(ctx), traces)
}

// addContextMetrics decorates the context with labels used in metrics later.
func (w writer) addContextMetrics(ctx context.Context) context.Context {
// TODO too many mallocs here, should be a cheaper way
ctx = obsreport.ReceiverContext(ctx, w.receiver, "kafka", "kafka")
ctx = obsreport.StartTraceDataReceiveOp(ctx, TypeStr, "kafka")
return ctx
}
4 changes: 2 additions & 2 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestArchiveTraceSuccessGRPC(t *testing.T) {
withServerAndClient(t, func(server *grpcServer, client *grpcClient) {
server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*model.Span")).
server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)

_, err := client.ArchiveTrace(context.Background(), &api_v2.ArchiveTraceRequest{
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestArchiveTraceFailureGRPC(t *testing.T) {

server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*model.Span")).
server.archiveSpanWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errStorageGRPC).Times(2)

_, err := client.ArchiveTrace(context.Background(), &api_v2.ArchiveTraceRequest{
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/handler_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestArchiveTrace_NoStorage(t *testing.T) {

func TestArchiveTrace_Success(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*model.Span")).
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Expand All @@ -116,7 +116,7 @@ func TestArchiveTrace_Success(t *testing.T) {

func TestArchiveTrace_WriteErrors(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*model.Span")).
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (qs QueryService) ArchiveTrace(ctx context.Context, traceID model.TraceID)

var writeErrors []error
for _, span := range trace.Spans {
err := qs.options.ArchiveSpanWriter.WriteSpan(span)
err := qs.options.ArchiveSpanWriter.WriteSpan(ctx, span)
if err != nil {
writeErrors = append(writeErrors, err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestArchiveTraceWithArchiveWriterError(t *testing.T) {
qs, readMock, _, _, writeMock := initializeTestServiceWithArchiveOptions()
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
writeMock.On("WriteSpan", mock.AnythingOfType("*model.Span")).
writeMock.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)

type contextKey string
Expand All @@ -228,7 +228,7 @@ func TestArchiveTraceSuccess(t *testing.T) {
qs, readMock, _, _, writeMock := initializeTestServiceWithArchiveOptions()
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
writeMock.On("WriteSpan", mock.AnythingOfType("*model.Span")).
writeMock.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)

type contextKey string
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDependencyReader(t *testing.T) {
if j > 0 {
s.References = []model.SpanRef{model.NewChildOfRef(s.TraceID, model.SpanID(j-1))}
}
err := sw.WriteSpan(&s)
err := sw.WriteSpan(context.Background(), &s)
assert.NoError(t, err)
}
}
Expand Down
16 changes: 8 additions & 8 deletions plugin/storage/badger/spanstore/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestWriteReadBack(t *testing.T) {
},
},
}
err := sw.WriteSpan(&s)
err := sw.WriteSpan(context.Background(), &s)
assert.NoError(t, err)
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestIndexSeeks(t *testing.T) {
},
}

err := sw.WriteSpan(&s)
err := sw.WriteSpan(context.Background(), &s)
assert.NoError(t, err)
}
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestWriteDuplicates(t *testing.T) {
StartTime: tid.Add(time.Duration(10)),
Duration: time.Duration(i + j),
}
err := sw.WriteSpan(&s)
err := sw.WriteSpan(context.Background(), &s)
assert.NoError(t, err)
}
}
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestMenuSeeks(t *testing.T) {
StartTime: tid.Add(time.Duration(i)),
Duration: time.Duration(i + j),
}
err := sw.WriteSpan(&s)
err := sw.WriteSpan(context.Background(), &s)
assert.NoError(t, err)
}
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestPersist(t *testing.T) {
StartTime: time.Now(),
Duration: time.Duration(1 * time.Hour),
}
err := sw.WriteSpan(&s)
err := sw.WriteSpan(context.Background(), &s)
assert.NoError(t, err)
})

Expand Down Expand Up @@ -504,7 +504,7 @@ func writeSpans(sw spanstore.Writer, tags []model.KeyValue, services, operations
StartTime: tid.Add(time.Duration(time.Millisecond)),
Duration: time.Duration(time.Millisecond * time.Duration(i+j)),
}
_ = sw.WriteSpan(&s)
_ = sw.WriteSpan(context.Background(), &s)
}
}
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func TestRandomTraceID(t *testing.T) {
StartTime: time.Now(),
Duration: 1 * time.Second,
}
err := sw.WriteSpan(&s1)
err := sw.WriteSpan(context.Background(), &s1)
assert.NoError(t, err)

s2 := model.Span{
Expand All @@ -711,7 +711,7 @@ func TestRandomTraceID(t *testing.T) {
StartTime: time.Now(),
Duration: 1 * time.Second,
}
err = sw.WriteSpan(&s2)
err = sw.WriteSpan(context.Background(), &s2)
assert.NoError(t, err)

params := &spanstore.TraceQueryParameters{
Expand Down
Loading

0 comments on commit 7d87736

Please sign in to comment.