diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 464204334afc..1f7b2483f0fb 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -870,16 +870,16 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro return nil, ErrReadOnly } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write", "tenant", instanceID)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(instanceID) if err != nil { return &logproto.PushResponse{}, err } - - pprof.Do(ctx, pprof.Labels("path", "write", "tenant", instanceID), func(c context.Context) { - err = instance.Push(ctx, req) - }) - - return &logproto.PushResponse{}, err + return &logproto.PushResponse{}, instance.Push(ctx, req) } // GetStreamRates returns a response containing all streams and their current rate @@ -890,11 +890,12 @@ func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRe defer sp.LogKV("event", "ingester finished handling GetStreamRates") } - var allRates []logproto.StreamRate - pprof.Do(ctx, pprof.Labels("path", "write"), func(c context.Context) { - allRates = i.streamRateCalculator.Rates() - }) + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write")) + pprof.SetGoroutineLabels(ctx) + allRates := i.streamRateCalculator.Rates() rates := make([]*logproto.StreamRate, len(allRates)) for idx := range allRates { rates[idx] = &allRates[idx] @@ -944,49 +945,48 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "log", "tenant", instanceID)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(instanceID) if err != nil { return err } + it, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) + if err != nil { + return err + } - pprof.Do(ctx, pprof.Labels("path", "read", "type", "log", "tenant", instanceID), func(c context.Context) { - var it iter.EntryIterator - it, err = instance.Query(ctx, logql.SelectLogParams{QueryRequest: req}) + if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { + storeReq := logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{ + Selector: req.Selector, + Direction: req.Direction, + Start: start, + End: end, + Limit: req.Limit, + Shards: req.Shards, + Deletes: req.Deletes, + Plan: req.Plan, + }} + storeItr, err := i.store.SelectLogs(ctx, storeReq) if err != nil { - return - } - - if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { - storeReq := logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{ - Selector: req.Selector, - Direction: req.Direction, - Start: start, - End: end, - Limit: req.Limit, - Shards: req.Shards, - Deletes: req.Deletes, - Plan: req.Plan, - }} - var storeItr iter.EntryIterator - storeItr, err = i.store.SelectLogs(ctx, storeReq) - if err != nil { - util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return - } - it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction) + util.LogErrorWithContext(ctx, "closing iterator", it.Close) + return err } + it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction) + } - defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) + defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) - // sendBatches uses -1 to specify no limit. - batchLimit := int32(req.Limit) - if batchLimit == 0 { - batchLimit = -1 - } - err = sendBatches(ctx, it, queryServer, batchLimit) - }) + // sendBatches uses -1 to specify no limit. + batchLimit := int32(req.Limit) + if batchLimit == 0 { + batchLimit = -1 + } - return err + return sendBatches(ctx, it, queryServer, batchLimit) } // QuerySample the ingesters for series from logs matching a set of matchers. @@ -1012,46 +1012,45 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log return err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "metric", "tenant", instanceID)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(instanceID) if err != nil { return err } - pprof.Do(ctx, pprof.Labels("path", "read", "type", "metric", "tenant", instanceID), func(c context.Context) { - var it iter.SampleIterator - it, err = instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) - if err != nil { - return - } - if sp != nil { - sp.LogKV("event", "finished instance query sample", "selector", req.Selector, "start", req.Start, "end", req.End) - } - - if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { - storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ - Start: start, - End: end, - Selector: req.Selector, - Shards: req.Shards, - Deletes: req.Deletes, - Plan: req.Plan, - }} - var storeItr iter.SampleIterator - storeItr, err = i.store.SelectSamples(ctx, storeReq) - if err != nil { - util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return - } + it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) + if err != nil { + return err + } + if sp != nil { + sp.LogKV("event", "finished instance query sample", "selector", req.Selector, "start", req.Start, "end", req.End) + } - it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) + if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { + storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ + Start: start, + End: end, + Selector: req.Selector, + Shards: req.Shards, + Deletes: req.Deletes, + Plan: req.Plan, + }} + storeItr, err := i.store.SelectSamples(ctx, storeReq) + if err != nil { + util.LogErrorWithContext(ctx, "closing iterator", it.Close) + return err } - defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) + it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) + } - err = sendSampleBatches(ctx, it, queryServer) - }) + defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return err + return sendSampleBatches(ctx, it, queryServer) } // asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`. @@ -1089,6 +1088,11 @@ func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq return nil, err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID)) + pprof.SetGoroutineLabels(ctx) + asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() if asyncStoreMaxLookBack == 0 { return &logproto.GetChunkIDsResponse{}, nil @@ -1104,27 +1108,24 @@ func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq return nil, err } - var resp logproto.GetChunkIDsResponse - pprof.Do(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID), func(c context.Context) { - // get chunk references - chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil) - if err != nil { - return - } + // get chunk references + chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil) + if err != nil { + return nil, err + } - // todo (Callum) ingester should maybe store the whole schema config? - s := config.SchemaConfig{ - Configs: i.periodicConfigs, - } + // todo (Callum) ingester should maybe store the whole schema config? + s := config.SchemaConfig{ + Configs: i.periodicConfigs, + } - // build the response - resp = logproto.GetChunkIDsResponse{ChunkIDs: []string{}} - for _, chunks := range chunksGroups { - for _, chk := range chunks { - resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef)) - } + // build the response + resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}} + for _, chunks := range chunksGroups { + for _, chk := range chunks { + resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef)) } - }) + } return &resp, nil } @@ -1136,6 +1137,11 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp return nil, err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "labels", "tenant", userID)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(userID) if err != nil { return nil, err @@ -1149,59 +1155,49 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp } } - var resp *logproto.LabelResponse - var storeValues []string - pprof.Do(ctx, pprof.Labels("path", "read", "type", "labels", "tenant", userID), func(c context.Context) { - resp, err = instance.Label(ctx, req, matchers...) - if err != nil { - return - } - if req.Start == nil { - return - } + resp, err := instance.Label(ctx, req, matchers...) + if err != nil { + return nil, err + } - // Only continue if the active index type is one of async index store types or QueryStore flag is true. - asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() - if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore { - return - } + if req.Start == nil { + return resp, nil + } - var cs storage.Store - var ok bool - if cs, ok = i.store.(storage.Store); !ok { - return - } + // Only continue if the active index type is one of async index store types or QueryStore flag is true. + asyncStoreMaxLookBack := i.asyncStoreMaxLookBack() + if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore { + return resp, nil + } - maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod - if asyncStoreMaxLookBack != 0 { - maxLookBackPeriod = asyncStoreMaxLookBack - } - // Adjust the start time based on QueryStoreMaxLookBackPeriod. - start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) - if start.After(*req.End) { - // The request is older than we are allowed to query the store, just return what we have. - return - } - from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) + var cs storage.Store + var ok bool + if cs, ok = i.store.(storage.Store); !ok { + return resp, nil + } - if req.Values { - storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...) - if err != nil { - return - } - } else { - storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...) - if err != nil { - return - } + maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod + if asyncStoreMaxLookBack != 0 { + maxLookBackPeriod = asyncStoreMaxLookBack + } + // Adjust the start time based on QueryStoreMaxLookBackPeriod. + start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now()) + if start.After(*req.End) { + // The request is older than we are allowed to query the store, just return what we have. + return resp, nil + } + from, through := model.TimeFromUnixNano(start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) + var storeValues []string + if req.Values { + storeValues, err = cs.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...) + if err != nil { + return nil, err + } + } else { + storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...) + if err != nil { + return nil, err } - }) - - // When wrapping the work above in the pprof.Do function we created a possible scenario where resp could - // be populated with values but an error occurred later on, prior to this profiling wrapper we would have - // always exited with a nil response and the error message, this is here to keep that behavior. - if err != nil { - return nil, err } return &logproto.LabelResponse{ @@ -1222,17 +1218,16 @@ func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*lo return nil, err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "series", "tenant", instanceID)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(instanceID) if err != nil { return nil, err } - - var series *logproto.SeriesResponse - pprof.Do(ctx, pprof.Labels("path", "read", "type", "series", "tenant", instanceID), func(c context.Context) { - series, err = instance.Series(ctx, req) - }) - - return series, err + return instance.Series(ctx, req) } func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { @@ -1243,6 +1238,11 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest return nil, err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "stats", "tenant", user)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(user) if err != nil { return nil, err @@ -1253,47 +1253,43 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest return nil, err } - var merged logproto.IndexStatsResponse - pprof.Do(ctx, pprof.Labels("path", "read", "type", "stats", "tenant", user), func(c context.Context) { - - type f func() (*logproto.IndexStatsResponse, error) - jobs := []f{ - f(func() (*logproto.IndexStatsResponse, error) { - return instance.GetStats(ctx, req) - }), - f(func() (*logproto.IndexStatsResponse, error) { - return i.store.Stats(ctx, user, req.From, req.Through, matchers...) - }), - } - resps := make([]*logproto.IndexStatsResponse, len(jobs)) - - if err := concurrency.ForEachJob( - ctx, - len(jobs), - 2, - func(_ context.Context, idx int) error { - res, err := jobs[idx]() - resps[idx] = res - return err - }, - ); err != nil { - return - } + type f func() (*logproto.IndexStatsResponse, error) + jobs := []f{ + f(func() (*logproto.IndexStatsResponse, error) { + return instance.GetStats(ctx, req) + }), + f(func() (*logproto.IndexStatsResponse, error) { + return i.store.Stats(ctx, user, req.From, req.Through, matchers...) + }), + } + resps := make([]*logproto.IndexStatsResponse, len(jobs)) + + if err := concurrency.ForEachJob( + ctx, + len(jobs), + 2, + func(_ context.Context, idx int) error { + res, err := jobs[idx]() + resps[idx] = res + return err + }, + ); err != nil { + return nil, err + } - merged = index_stats.MergeStats(resps...) - if sp != nil { - sp.LogKV( - "user", user, - "from", req.From.Time(), - "through", req.Through.Time(), - "matchers", syntax.MatchersString(matchers), - "streams", merged.Streams, - "chunks", merged.Chunks, - "bytes", merged.Bytes, - "entries", merged.Entries, - ) - } - }) + merged := index_stats.MergeStats(resps...) + if sp != nil { + sp.LogKV( + "user", user, + "from", req.From.Time(), + "through", req.Through.Time(), + "matchers", syntax.MatchersString(matchers), + "streams", merged.Streams, + "chunks", merged.Chunks, + "bytes", merged.Bytes, + "entries", merged.Entries, + ) + } return &merged, nil } @@ -1304,6 +1300,11 @@ func (i *Ingester) GetVolume(ctx context.Context, req *logproto.VolumeRequest) ( return nil, err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "volume", "tenant", user)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(user) if err != nil { return nil, err @@ -1314,33 +1315,31 @@ func (i *Ingester) GetVolume(ctx context.Context, req *logproto.VolumeRequest) ( return nil, err } - var merged *logproto.VolumeResponse - pprof.Do(ctx, pprof.Labels("path", "read", "type", "volume", "tenant", user), func(c context.Context) { - type f func() (*logproto.VolumeResponse, error) - jobs := []f{ - f(func() (*logproto.VolumeResponse, error) { - return instance.GetVolume(ctx, req) - }), - f(func() (*logproto.VolumeResponse, error) { - return i.store.Volume(ctx, user, req.From, req.Through, req.Limit, req.TargetLabels, req.AggregateBy, matchers...) - }), - } - resps := make([]*logproto.VolumeResponse, len(jobs)) - - if err := concurrency.ForEachJob( - ctx, - len(jobs), - 2, - func(_ context.Context, idx int) error { - res, err := jobs[idx]() - resps[idx] = res - return err - }, - ); err != nil { - return - } - merged = seriesvolume.Merge(resps, req.Limit) - }) + type f func() (*logproto.VolumeResponse, error) + jobs := []f{ + f(func() (*logproto.VolumeResponse, error) { + return instance.GetVolume(ctx, req) + }), + f(func() (*logproto.VolumeResponse, error) { + return i.store.Volume(ctx, user, req.From, req.Through, req.Limit, req.TargetLabels, req.AggregateBy, matchers...) + }), + } + resps := make([]*logproto.VolumeResponse, len(jobs)) + + if err := concurrency.ForEachJob( + ctx, + len(jobs), + 2, + func(_ context.Context, idx int) error { + res, err := jobs[idx]() + resps[idx] = res + return err + }, + ); err != nil { + return nil, err + } + + merged := seriesvolume.Merge(resps, req.Limit) return merged, nil } @@ -1502,6 +1501,11 @@ func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.Detected return nil, err } + // Set profiling tags + defer pprof.SetGoroutineLabels(ctx) + ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "detectedLabels", "tenant", userID)) + pprof.SetGoroutineLabels(ctx) + instance, err := i.GetOrCreateInstance(userID) if err != nil { return nil, err @@ -1514,22 +1518,19 @@ func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.Detected } } - var result map[string]*logproto.UniqueLabelValues - pprof.Do(ctx, pprof.Labels("path", "read", "type", "detectedLabels", "tenant", userID), func(c context.Context) { - labelMap, err := instance.LabelsWithValues(ctx, req.Start, matchers...) - if err != nil { - return - } - result = make(map[string]*logproto.UniqueLabelValues) - for label, values := range labelMap { - var uniqueValues []string - for v := range values { - uniqueValues = append(uniqueValues, v) - } + labelMap, err := instance.LabelsWithValues(ctx, req.Start, matchers...) - result[label] = &logproto.UniqueLabelValues{Values: uniqueValues} + if err != nil { + return nil, err + } + result := make(map[string]*logproto.UniqueLabelValues) + for label, values := range labelMap { + var uniqueValues []string + for v := range values { + uniqueValues = append(uniqueValues, v) } - }) + result[label] = &logproto.UniqueLabelValues{Values: uniqueValues} + } return &logproto.LabelToValuesResponse{Labels: result}, nil }