Skip to content

Commit

Permalink
add profile tagging
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Welch <[email protected]>
  • Loading branch information
slim-bean committed Jun 17, 2024
1 parent eccf6a3 commit 34e775a
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path"
"path/filepath"
"runtime/pprof"
"sync"
"time"

Expand Down Expand Up @@ -869,6 +870,11 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return nil, ErrReadOnly
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write", "tenant", instanceID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return &logproto.PushResponse{}, err
Expand All @@ -884,6 +890,11 @@ func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRe
defer sp.LogKV("event", "ingester finished handling GetStreamRates")
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write"))
pprof.SetGoroutineLabels(ctx)

allRates := i.streamRateCalculator.Rates()
rates := make([]*logproto.StreamRate, len(allRates))
for idx := range allRates {
Expand Down Expand Up @@ -934,6 +945,11 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "log", "tenant", instanceID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return err
Expand Down Expand Up @@ -996,6 +1012,11 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
return err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "metric", "tenant", instanceID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return err
Expand Down Expand Up @@ -1067,6 +1088,11 @@ func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID))
pprof.SetGoroutineLabels(ctx)

asyncStoreMaxLookBack := i.asyncStoreMaxLookBack()
if asyncStoreMaxLookBack == 0 {
return &logproto.GetChunkIDsResponse{}, nil
Expand Down Expand Up @@ -1111,6 +1137,11 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "labels", "tenant", userID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(userID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1187,6 +1218,11 @@ func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*lo
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "series", "tenant", instanceID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return nil, err
Expand All @@ -1202,6 +1238,11 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "stats", "tenant", user))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(user)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1259,6 +1300,11 @@ func (i *Ingester) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "volume", "tenant", user))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(user)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1455,6 +1501,11 @@ func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.Detected
return nil, err
}

// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "detectedLabels", "tenant", userID))
pprof.SetGoroutineLabels(ctx)

instance, err := i.GetOrCreateInstance(userID)
if err != nil {
return nil, err
Expand Down

0 comments on commit 34e775a

Please sign in to comment.