diff --git a/s3store/reader.go b/s3store/reader.go index dc558b41..005c0b91 100644 --- a/s3store/reader.go +++ b/s3store/reader.go @@ -27,7 +27,7 @@ import ( var _ spanstore.Reader = (*Reader)(nil) var ( - userCtx = user.InjectOrgID(context.Background(), "data") + userCtx = user.InjectOrgID(context.Background(), "fake") ) // Reader can query for and load traces from your object store. @@ -52,33 +52,53 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) { r.logger.Warn("GetServices called") //var fooLabelsWithName = "{__name__=\"service\", env=\"prod\"}" - var fooLabelsWithName = "{env=\"prod\", __name__=\"services\"}" + var fooLabelsWithName = "{env=\"prod\", __name__=\"spans\"}" - chunks, err := r.store.Get(userCtx, "data", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...) + chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...) //log.Println("chunks get: %s", chunks) /* for i := 0; i < len(chunks); i++ { log.Println(chunks[i].Metric[9].Value) } */ - ret := removeDuplicateValues(chunks, "service_name") + ret := removeServiceDuplicateValues(chunks, "service_name") return ret, err } -func removeDuplicateValues(a []chunk.Chunk, b string) []string { +func removeServiceDuplicateValues(a []chunk.Chunk, b string) []string { keys := make(map[string]bool) list := []string{} - + + // If the key(values of the slice) is not equal + // to the already present value in new slice (list) + // then we append it. else we jump on another element. + for _, entry := range a { + if _, value := keys[entry.Metric[8].Value]; !value { + // data type: service_name, operation_name, etc + if entry.Metric[8].Name == b { + // assign key value to list + keys[entry.Metric[8].Value] = true + list = append(list, entry.Metric[8].Value) + } + } + } + return list +} + +func removeOperationDuplicateValues(a []chunk.Chunk, b string) []string { + keys := make(map[string]bool) + list := []string{} + // If the key(values of the slice) is not equal // to the already present value in new slice (list) // then we append it. else we jump on another element. for _, entry := range a { - if _, value := keys[entry.Metric[2].Value]; !value { + if _, value := keys[entry.Metric[5].Value]; !value { // data type: service_name, operation_name, etc - if entry.Metric[2].Name == b { + if entry.Metric[5].Name == b { // assign key value to list - keys[entry.Metric[2].Value] = true - list = append(list, entry.Metric[2].Value) + keys[entry.Metric[5].Value] = true + list = append(list, entry.Metric[5].Value) } } } @@ -88,10 +108,10 @@ func removeDuplicateValues(a []chunk.Chunk, b string) []string { // GetOperations returns all operations for a specific service traced by Jaeger func (r *Reader) GetOperations(ctx context.Context, param spanstore.OperationQueryParameters) ([]spanstore.Operation, error) { - var fooLabelsWithName = "{env=\"prod\", __name__=\"operations\"}" + var fooLabelsWithName = "{env=\"prod\", __name__=\"spans\"}" - chunks, err := r.store.Get(userCtx, "data", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...) - operations := removeDuplicateValues(chunks, "operation_name") + chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...) + operations := removeOperationDuplicateValues(chunks, "operation_name") ret := make([]spanstore.Operation, 0, len(operations)) for _, operation := range operations { @@ -110,7 +130,7 @@ func (r *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Tr //var fooLabelsWithName = fmt.Sprintf("{env=\"prod\", __name__=\"spans\", trace_id_low=\"%d\", trace_id_high=\"%d\"}", traceID.Low, traceID.Low) var fooLabelsWithName = fmt.Sprintf("{env=\"prod\", __name__=\"spans\", trace_id_low=\"%d\"}", traceID.Low) - chunks, err := r.store.Get(userCtx, "data", timeToModelTime(time.Now().Add(-24 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...) + chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(time.Now().Add(-24 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...) ret := make([]*model.Span, 0, len(chunks)) ret2 := make([]model.Trace_ProcessMapping, 0, len(chunks)) @@ -199,7 +219,7 @@ func (r *Reader) FindTraces(ctx context.Context, query *spanstore.TraceQueryPara builder, _, _ := buildTraceWhere(query) var fooLabelsWithName = builder - chunks, err := r.store.Get(userCtx, "data", timeToModelTime(query.StartTimeMin), timeToModelTime(query.StartTimeMax), newMatchers(fooLabelsWithName)...) + chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(query.StartTimeMin), timeToModelTime(query.StartTimeMax), newMatchers(fooLabelsWithName)...) ret := make([]*model.Trace, 0, len(chunks)) if err != nil { return ret, err @@ -262,7 +282,7 @@ func (r *Reader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa var fooLabelsWithName = builder - chunks, err := r.store.Get(userCtx, "data", timeToModelTime(timeMin), timeToModelTime(timeMax), newMatchers(fooLabelsWithName)...) + chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(timeMin), timeToModelTime(timeMax), newMatchers(fooLabelsWithName)...) if err != nil { log.Println("store error: %s", err) } diff --git a/s3store/writer.go b/s3store/writer.go index fd027860..b1f200b7 100644 --- a/s3store/writer.go +++ b/s3store/writer.go @@ -34,7 +34,7 @@ var _ io.Closer = (*Writer)(nil) //var fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}" var ( - ctx = user.InjectOrgID(context.Background(), "data") + ctx = user.InjectOrgID(context.Background(), "fake") ) // Writer handles all writes to object store for the Jaeger data model @@ -52,7 +52,7 @@ type timeRange struct { from, to time.Time } -func buildTestStreams(labels string, tr timeRange) logproto.Stream { +func buildTestStreams(labels string, tr timeRange, line string) logproto.Stream { stream := logproto.Stream{ Labels: labels, Entries: []logproto.Entry{}, @@ -61,7 +61,7 @@ func buildTestStreams(labels string, tr timeRange) logproto.Stream { for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) { stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: from, - Line: "Hello there! I'm Jack Sparrow", + Line: line, }) } @@ -75,7 +75,8 @@ func newChunk(stream logproto.Stream) chunk.Chunk { } if !lbs.Has(labels.MetricName) { builder := labels.NewBuilder(lbs) - builder.Set(labels.MetricName, "logs") + //builder.Set(labels.MetricName, "logs") + builder.Set(labels.MetricName, "spans") lbs = builder.Labels() } from, through := pmodel.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), pmodel.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()) @@ -90,7 +91,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk { _ = chk.Append(&e) } chk.Close() - c := chunk.NewChunk("data", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through) + c := chunk.NewChunk("fake", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through) // force the checksum creation if err := c.Encode(); err != nil { panic(err) @@ -143,61 +144,21 @@ func (w *Writer) WriteSpan(span *model.Span) error { }, } - pchk := []chunk.Chunk{} - addedSpansChunkIDs := map[string]struct{}{} + addedChunkIDs := map[string]struct{}{} + plogline := fmt.Sprintf("level=info caller=jaeger component=chunks latency=\"%s\"", span.Duration) for _, tr := range chunksToBuildForTimeRanges { // span chunk - spanChk := newChunk(buildTestStreams(spanLabelsWithName, tr)) - addedSpansChunkIDs[spanChk.ExternalKey()] = struct{}{} - pchk = append(pchk, spanChk) - } - - // upload the span chunks - err := w.store.Put(ctx, pchk) - if err != nil { - log.Println("store spans Put error: %s", err) - } - - // services label - var serviceLabelsWithName = fmt.Sprintf("{__name__=\"services\", env=\"prod\", service_name=\"%s\"}", span.Process.ServiceName) - - // services chunk - schk := []chunk.Chunk{} - addedServicesChunkIDs := map[string]struct{}{} - for _, tr := range chunksToBuildForTimeRanges { - - // add slice to services chunk - serviceChk := newChunk(buildTestStreams(serviceLabelsWithName, tr)) - addedServicesChunkIDs[serviceChk.ExternalKey()] = struct{}{} - schk = append(schk, serviceChk) - } - - // upload the service chunks - err = w.store.Put(ctx, schk) - if err != nil { - log.Println("store services Put error: %s", err) - } - - // operations label - var operationLabelsWithName = fmt.Sprintf("{__name__=\"operations\", env=\"prod\", operation_name=\"%s\"}", span.OperationName) + chk := newChunk(buildTestStreams(spanLabelsWithName, tr, plogline)) - // services chunk - ochk := []chunk.Chunk{} - addedOperationsChunkIDs := map[string]struct{}{} - for _, tr := range chunksToBuildForTimeRanges { - - // add slice to services chunk - operationChk := newChunk(buildTestStreams(operationLabelsWithName, tr)) - addedOperationsChunkIDs[operationChk.ExternalKey()] = struct{}{} - ochk = append(ochk, operationChk) - } + // upload the span chunks + err := w.store.PutOne(ctx, chk.From, chk.Through, chk) + if err != nil { + log.Println("store spans Put error: %s", err) + } - // upload the operation chunks - err = w.store.Put(ctx, ochk) - if err != nil { - log.Println("store operations Put error: %s", err) - } + addedChunkIDs[chk.ExternalKey()] = struct{}{} + } return nil }