Skip to content

Commit

Permalink
Major changes to LogQL data structure and refactoring (#52)
Browse files Browse the repository at this point in the history
* Follow LogQL data structure so when we write span data, loki will be able to read and
  do complex queries
* refactor writer to use only one type of data instead
  of separating between operations, services and spans
  • Loading branch information
muhammadn authored Nov 20, 2021
1 parent f5f6a8f commit 4bb034c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 71 deletions.
52 changes: 36 additions & 16 deletions s3store/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
var _ spanstore.Reader = (*Reader)(nil)

var (
userCtx = user.InjectOrgID(context.Background(), "data")
userCtx = user.InjectOrgID(context.Background(), "fake")
)

// Reader can query for and load traces from your object store.
Expand All @@ -52,33 +52,53 @@ func (r *Reader) GetServices(ctx context.Context) ([]string, error) {
r.logger.Warn("GetServices called")

//var fooLabelsWithName = "{__name__=\"service\", env=\"prod\"}"
var fooLabelsWithName = "{env=\"prod\", __name__=\"services\"}"
var fooLabelsWithName = "{env=\"prod\", __name__=\"spans\"}"

chunks, err := r.store.Get(userCtx, "data", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...)
chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...)
//log.Println("chunks get: %s", chunks)
/* for i := 0; i < len(chunks); i++ {
log.Println(chunks[i].Metric[9].Value)
} */

ret := removeDuplicateValues(chunks, "service_name")
ret := removeServiceDuplicateValues(chunks, "service_name")

return ret, err
}

func removeDuplicateValues(a []chunk.Chunk, b string) []string {
func removeServiceDuplicateValues(a []chunk.Chunk, b string) []string {
keys := make(map[string]bool)
list := []string{}


// If the key(values of the slice) is not equal
// to the already present value in new slice (list)
// then we append it. else we jump on another element.
for _, entry := range a {
if _, value := keys[entry.Metric[8].Value]; !value {
// data type: service_name, operation_name, etc
if entry.Metric[8].Name == b {
// assign key value to list
keys[entry.Metric[8].Value] = true
list = append(list, entry.Metric[8].Value)
}
}
}
return list
}

func removeOperationDuplicateValues(a []chunk.Chunk, b string) []string {
keys := make(map[string]bool)
list := []string{}

// If the key(values of the slice) is not equal
// to the already present value in new slice (list)
// then we append it. else we jump on another element.
for _, entry := range a {
if _, value := keys[entry.Metric[2].Value]; !value {
if _, value := keys[entry.Metric[5].Value]; !value {
// data type: service_name, operation_name, etc
if entry.Metric[2].Name == b {
if entry.Metric[5].Name == b {
// assign key value to list
keys[entry.Metric[2].Value] = true
list = append(list, entry.Metric[2].Value)
keys[entry.Metric[5].Value] = true
list = append(list, entry.Metric[5].Value)
}
}
}
Expand All @@ -88,10 +108,10 @@ func removeDuplicateValues(a []chunk.Chunk, b string) []string {
// GetOperations returns all operations for a specific service traced by Jaeger
func (r *Reader) GetOperations(ctx context.Context, param spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {

var fooLabelsWithName = "{env=\"prod\", __name__=\"operations\"}"
var fooLabelsWithName = "{env=\"prod\", __name__=\"spans\"}"

chunks, err := r.store.Get(userCtx, "data", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...)
operations := removeDuplicateValues(chunks, "operation_name")
chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(time.Now().Add(-1 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...)
operations := removeOperationDuplicateValues(chunks, "operation_name")

ret := make([]spanstore.Operation, 0, len(operations))
for _, operation := range operations {
Expand All @@ -110,7 +130,7 @@ func (r *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Tr
//var fooLabelsWithName = fmt.Sprintf("{env=\"prod\", __name__=\"spans\", trace_id_low=\"%d\", trace_id_high=\"%d\"}", traceID.Low, traceID.Low)
var fooLabelsWithName = fmt.Sprintf("{env=\"prod\", __name__=\"spans\", trace_id_low=\"%d\"}", traceID.Low)

chunks, err := r.store.Get(userCtx, "data", timeToModelTime(time.Now().Add(-24 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...)
chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(time.Now().Add(-24 * time.Hour)), timeToModelTime(time.Now()), newMatchers(fooLabelsWithName)...)

ret := make([]*model.Span, 0, len(chunks))
ret2 := make([]model.Trace_ProcessMapping, 0, len(chunks))
Expand Down Expand Up @@ -199,7 +219,7 @@ func (r *Reader) FindTraces(ctx context.Context, query *spanstore.TraceQueryPara
builder, _, _ := buildTraceWhere(query)
var fooLabelsWithName = builder

chunks, err := r.store.Get(userCtx, "data", timeToModelTime(query.StartTimeMin), timeToModelTime(query.StartTimeMax), newMatchers(fooLabelsWithName)...)
chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(query.StartTimeMin), timeToModelTime(query.StartTimeMax), newMatchers(fooLabelsWithName)...)
ret := make([]*model.Trace, 0, len(chunks))
if err != nil {
return ret, err
Expand Down Expand Up @@ -262,7 +282,7 @@ func (r *Reader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryPa

var fooLabelsWithName = builder

chunks, err := r.store.Get(userCtx, "data", timeToModelTime(timeMin), timeToModelTime(timeMax), newMatchers(fooLabelsWithName)...)
chunks, err := r.store.Get(userCtx, "fake", timeToModelTime(timeMin), timeToModelTime(timeMax), newMatchers(fooLabelsWithName)...)
if err != nil {
log.Println("store error: %s", err)
}
Expand Down
71 changes: 16 additions & 55 deletions s3store/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ io.Closer = (*Writer)(nil)
//var fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}"

var (
ctx = user.InjectOrgID(context.Background(), "data")
ctx = user.InjectOrgID(context.Background(), "fake")
)

// Writer handles all writes to object store for the Jaeger data model
Expand All @@ -52,7 +52,7 @@ type timeRange struct {
from, to time.Time
}

func buildTestStreams(labels string, tr timeRange) logproto.Stream {
func buildTestStreams(labels string, tr timeRange, line string) logproto.Stream {
stream := logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{},
Expand All @@ -61,7 +61,7 @@ func buildTestStreams(labels string, tr timeRange) logproto.Stream {
for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) {
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: from,
Line: "Hello there! I'm Jack Sparrow",
Line: line,
})
}

Expand All @@ -75,7 +75,8 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
}
if !lbs.Has(labels.MetricName) {
builder := labels.NewBuilder(lbs)
builder.Set(labels.MetricName, "logs")
//builder.Set(labels.MetricName, "logs")
builder.Set(labels.MetricName, "spans")
lbs = builder.Labels()
}
from, through := pmodel.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), pmodel.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano())
Expand All @@ -90,7 +91,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
_ = chk.Append(&e)
}
chk.Close()
c := chunk.NewChunk("data", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through)
c := chunk.NewChunk("fake", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through)
// force the checksum creation
if err := c.Encode(); err != nil {
panic(err)
Expand Down Expand Up @@ -143,61 +144,21 @@ func (w *Writer) WriteSpan(span *model.Span) error {
},
}

pchk := []chunk.Chunk{}
addedSpansChunkIDs := map[string]struct{}{}
addedChunkIDs := map[string]struct{}{}
plogline := fmt.Sprintf("level=info caller=jaeger component=chunks latency=\"%s\"", span.Duration)
for _, tr := range chunksToBuildForTimeRanges {

// span chunk
spanChk := newChunk(buildTestStreams(spanLabelsWithName, tr))
addedSpansChunkIDs[spanChk.ExternalKey()] = struct{}{}
pchk = append(pchk, spanChk)
}

// upload the span chunks
err := w.store.Put(ctx, pchk)
if err != nil {
log.Println("store spans Put error: %s", err)
}

// services label
var serviceLabelsWithName = fmt.Sprintf("{__name__=\"services\", env=\"prod\", service_name=\"%s\"}", span.Process.ServiceName)

// services chunk
schk := []chunk.Chunk{}
addedServicesChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {

// add slice to services chunk
serviceChk := newChunk(buildTestStreams(serviceLabelsWithName, tr))
addedServicesChunkIDs[serviceChk.ExternalKey()] = struct{}{}
schk = append(schk, serviceChk)
}

// upload the service chunks
err = w.store.Put(ctx, schk)
if err != nil {
log.Println("store services Put error: %s", err)
}

// operations label
var operationLabelsWithName = fmt.Sprintf("{__name__=\"operations\", env=\"prod\", operation_name=\"%s\"}", span.OperationName)
chk := newChunk(buildTestStreams(spanLabelsWithName, tr, plogline))

// services chunk
ochk := []chunk.Chunk{}
addedOperationsChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {

// add slice to services chunk
operationChk := newChunk(buildTestStreams(operationLabelsWithName, tr))
addedOperationsChunkIDs[operationChk.ExternalKey()] = struct{}{}
ochk = append(ochk, operationChk)
}
// upload the span chunks
err := w.store.PutOne(ctx, chk.From, chk.Through, chk)
if err != nil {
log.Println("store spans Put error: %s", err)
}

// upload the operation chunks
err = w.store.Put(ctx, ochk)
if err != nil {
log.Println("store operations Put error: %s", err)
}
addedChunkIDs[chk.ExternalKey()] = struct{}{}
}

return nil
}
Expand Down

0 comments on commit 4bb034c

Please sign in to comment.