Skip to content

Commit

Permalink
feat(blooms): record time spent resolving shards (#12636)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Apr 17, 2024
1 parent 0ee2a61 commit 9c25985
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 105 deletions.
1 change: 1 addition & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func RecordRangeAndInstantQueryMetrics(
"index_total_chunks", stats.Index.TotalChunks,
"index_post_bloom_filter_chunks", stats.Index.PostFilterChunks,
"index_bloom_filter_ratio", fmt.Sprintf("%.2f", bloomRatio),
"index_shard_resolver_duration", time.Duration(stats.Index.ShardsDuration),
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)
Expand Down
15 changes: 1 addition & 14 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ func (c *Context) Caches() Caches {
}
}

// Index returns the index statistics accumulated so far.
func (c *Context) Index() Index {
return c.index
}

// Reset clears the statistics.
func (c *Context) Reset() {
c.mtx.Lock()
Expand Down Expand Up @@ -170,15 +165,6 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
stats.ingester.Merge(inc)
}

// JoinIndex joins the index statistics in a concurrency-safe manner.
func JoinIndex(ctx context.Context, index Index) {
stats := FromContext(ctx)
stats.mtx.Lock()
defer stats.mtx.Unlock()

stats.index.Merge(index)
}

// ComputeSummary compute the summary of the statistics.
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
Expand Down Expand Up @@ -247,6 +233,7 @@ func (i *Ingester) Merge(m Ingester) {
func (i *Index) Merge(m Index) {
i.TotalChunks += m.TotalChunks
i.PostFilterChunks += m.PostFilterChunks
i.ShardsDuration += m.ShardsDuration
}

func (c *Caches) Merge(m Caches) {
Expand Down
217 changes: 130 additions & 87 deletions pkg/logqlmodel/stats/stats.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/logqlmodel/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ message Index {
int64 totalChunks = 1 [(gogoproto.jsontag) = "totalChunks"];
// Post-filtered chunks
int64 postFilterChunks = 2 [(gogoproto.jsontag) = "postFilterChunks"];
// Nanosecond duration spent fetching shards
int64 shardsDuration = 3 [(gogoproto.jsontag) = "shardsDuration"];
}

message Querier {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,8 @@ var (
},
"index": {
"postFilterChunks": 0,
"totalChunks": 0
"totalChunks": 0,
"shardsDuration": 0
},
"cache": {
"chunk": {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/queryrange/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
var emptyStats = `"stats": {
"index": {
"postFilterChunks": 0,
"totalChunks": 0
"totalChunks": 0,
"shardsDuration": 0
},
"ingester" : {
"store": {
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/stores/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package index

import (
"context"
"time"

"github.com/grafana/dskit/instrument"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -160,7 +161,18 @@ func (m MonitoredReaderWriter) GetShards(
var shards *logproto.ShardsResponse
if err := loki_instrument.TimeRequest(ctx, "shards", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
start := time.Now()
shards, err = m.rw.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate)

if err == nil {
// record duration here from caller to avoid needing to do this in two separate places:
// 1) when we resolve shards from the index alone
// 2) when we resolve shards from the index + blooms
// NB(owen-d): since this is measured by the callee, it does not include time in queue,
// over the wire, etc.
shards.Statistics.Index.ShardsDuration = int64(time.Since(start))
}

return err
}); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/marshal/legacy/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ var queryTests = []struct {
"stats" : {
"index": {
"postFilterChunks": 0,
"totalChunks": 0
"totalChunks": 0,
"shardsDuration": 0
},
"ingester" : {
"store": {
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/marshal/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
const emptyStats = `{
"index": {
"postFilterChunks": 0,
"totalChunks": 0
"totalChunks": 0,
"shardsDuration": 0
},
"ingester" : {
"store": {
Expand Down

0 comments on commit 9c25985

Please sign in to comment.