Skip to content

Commit

Permalink
fix (v2): add support for ProfileTypes requests (#3541)
Browse files Browse the repository at this point in the history
* Add support for the profile types request in v2

* Fix query backend concurrency check

* Retrieve profile types from block metadata instead

* Add test for ProfileTypes in v2
  • Loading branch information
aleks-p authored Sep 5, 2024
1 parent 4bddf2c commit 26210f5
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ packages:
github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect:
interfaces:
QuerierServiceClient:
github.com/grafana/pyroscope/pkg/frontend:
interfaces:
Limits:
2 changes: 1 addition & 1 deletion pkg/experiment/query_backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ func (q *QueryBackend) read(
}

func (q *QueryBackend) withThrottling(fn func() (*queryv1.InvokeResponse, error)) (*queryv1.InvokeResponse, error) {
defer q.running.Dec()
if q.running.Inc() > q.concurrency {
return nil, status.Error(codes.ResourceExhausted, "all minions are busy, please try later")
}
defer q.running.Dec()
return fn()
}
5 changes: 2 additions & 3 deletions pkg/frontend/read_path/query_frontend/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
)

Expand Down Expand Up @@ -55,7 +54,7 @@ func isProfileTypeQuery(labels, matchers []string) bool {

func listProfileTypesFromMetadataAsSeriesLabels(
ctx context.Context,
client *metastoreclient.Client,
client metastorev1.MetastoreServiceClient,
tenants []string,
startTime int64,
endTime int64,
Expand All @@ -72,7 +71,7 @@ func listProfileTypesFromMetadataAsSeriesLabels(

func listProfileTypesFromMetadata(
ctx context.Context,
client *metastoreclient.Client,
client metastorev1.MetastoreServiceClient,
tenants []string,
startTime int64,
endTime int64,
Expand Down
5 changes: 2 additions & 3 deletions pkg/frontend/read_path/query_frontend/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect"
queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1"
metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client"
querybackend "github.com/grafana/pyroscope/pkg/experiment/query_backend"
querybackendclient "github.com/grafana/pyroscope/pkg/experiment/query_backend/client"
queryplan "github.com/grafana/pyroscope/pkg/experiment/query_backend/query_plan"
Expand All @@ -23,14 +22,14 @@ var _ querierv1connect.QuerierServiceClient = (*QueryFrontend)(nil)
type QueryFrontend struct {
logger log.Logger
limits frontend.Limits
metastore *metastoreclient.Client
metastore metastorev1.MetastoreServiceClient
querybackend *querybackendclient.Client
}

func NewQueryFrontend(
logger log.Logger,
limits frontend.Limits,
metastore *metastoreclient.Client,
metastore metastorev1.MetastoreServiceClient,
querybackend *querybackendclient.Client,
) *QueryFrontend {
return &QueryFrontend{
Expand Down
72 changes: 72 additions & 0 deletions pkg/frontend/read_path/query_frontend/query_profile_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package query_frontend

import (
"context"
"sort"

"connectrpc.com/connect"
"github.com/grafana/dskit/tenant"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/validation"
)

func (q *QueryFrontend) ProfileTypes(
ctx context.Context,
req *connect.Request[querierv1.ProfileTypesRequest],
) (*connect.Response[querierv1.ProfileTypesResponse], error) {

tenants, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
empty, err := validation.SanitizeTimeRange(q.limits, tenants, &req.Msg.Start, &req.Msg.End)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
if empty {
return connect.NewResponse(&querierv1.ProfileTypesResponse{}), nil
}

md, err := q.metastore.QueryMetadata(ctx, &metastorev1.QueryMetadataRequest{
TenantId: tenants,
StartTime: req.Msg.Start,
EndTime: req.Msg.End,
Query: "{}",
})

if err != nil {
return nil, err
}

pTypesFromMetadata := make(map[string]*typesv1.ProfileType)
for _, b := range md.Blocks {
for _, d := range b.Datasets {
for _, pType := range d.ProfileTypes {
if _, ok := pTypesFromMetadata[pType]; !ok {
profileType, err := phlaremodel.ParseProfileTypeSelector(pType)
if err != nil {
return nil, err
}
pTypesFromMetadata[pType] = profileType
}
}
}
}

var profileTypes []*typesv1.ProfileType
for _, pType := range pTypesFromMetadata {
profileTypes = append(profileTypes, pType)
}

sort.Slice(profileTypes, func(i, j int) bool {
return profileTypes[i].ID < profileTypes[j].ID
})

return connect.NewResponse(&querierv1.ProfileTypesResponse{
ProfileTypes: profileTypes,
}), nil
}
72 changes: 72 additions & 0 deletions pkg/frontend/read_path/query_frontend/query_profile_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package query_frontend

import (
"context"
"testing"
"time"

"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
"github.com/grafana/pyroscope/pkg/tenant"
"github.com/grafana/pyroscope/pkg/test/mocks/mockfrontend"
"github.com/grafana/pyroscope/pkg/test/mocks/mockmetastorev1"
)

func TestQueryFrontend_ProfileTypes(t *testing.T) {
metaClient := mockmetastorev1.NewMockMetastoreServiceClient(t)
limits := mockfrontend.NewMockLimits(t)
f := NewQueryFrontend(log.NewNopLogger(), limits, metaClient, nil)
require.NotNil(t, f)

limits.On("MaxQueryLookback", mock.Anything).Return(24 * time.Hour)
limits.On("MaxQueryLength", mock.Anything).Return(2 * time.Hour)
metaClient.On("QueryMetadata", mock.Anything, mock.Anything).Maybe().Return(&metastorev1.QueryMetadataResponse{
Blocks: []*metastorev1.BlockMeta{
{
Datasets: []*metastorev1.Dataset{
{
ProfileTypes: []string{
"memory:inuse_space:bytes:space:byte",
"process_cpu:cpu:nanoseconds:cpu:nanoseconds",
"mutex:delay:nanoseconds:mutex:count",
},
},
{
ProfileTypes: []string{
"memory:alloc_in_new_tlab_objects:count:space:bytes",
"process_cpu:cpu:nanoseconds:cpu:nanoseconds",
},
},
},
},
{
Datasets: []*metastorev1.Dataset{
{
ProfileTypes: []string{
"mutex:contentions:count:mutex:count",
"mutex:delay:nanoseconds:mutex:count",
},
},
},
},
},
}, nil)

ctx := tenant.InjectTenantID(context.Background(), "tenant")
types, err := f.ProfileTypes(ctx, connect.NewRequest(&querierv1.ProfileTypesRequest{
Start: time.Now().Add(-time.Hour).UnixMilli(),
End: time.Now().UnixMilli(),
}))
require.NoError(t, err)
require.Equal(t, 5, len(types.Msg.ProfileTypes))
require.Equal(t, "memory:alloc_in_new_tlab_objects:count:space:bytes", types.Msg.ProfileTypes[0].ID)
require.Equal(t, "memory:inuse_space:bytes:space:byte", types.Msg.ProfileTypes[1].ID)
require.Equal(t, "mutex:contentions:count:mutex:count", types.Msg.ProfileTypes[2].ID)
require.Equal(t, "mutex:delay:nanoseconds:mutex:count", types.Msg.ProfileTypes[3].ID)
require.Equal(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds", types.Msg.ProfileTypes[4].ID)
}
7 changes: 0 additions & 7 deletions pkg/frontend/read_path/query_frontend/query_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,3 @@ func (q *QueryFrontend) GetProfileStats(
) (*connect.Response[typesv1.GetProfileStatsResponse], error) {
return connect.NewResponse(&typesv1.GetProfileStatsResponse{}), nil
}

func (q *QueryFrontend) ProfileTypes(
context.Context,
*connect.Request[querierv1.ProfileTypesRequest],
) (*connect.Response[querierv1.ProfileTypesResponse], error) {
return connect.NewResponse(&querierv1.ProfileTypesResponse{}), nil
}
17 changes: 12 additions & 5 deletions pkg/frontend/read_path/query_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package read_path

import (
"context"
"slices"

"connectrpc.com/connect"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -203,10 +204,16 @@ func (r *Router) GetProfileStats(

func (r *Router) ProfileTypes(
ctx context.Context,
req *connect.Request[querierv1.ProfileTypesRequest],
c *connect.Request[querierv1.ProfileTypesRequest],
) (*connect.Response[querierv1.ProfileTypesResponse], error) {
if r.frontend != nil {
return r.frontend.ProfileTypes(ctx, req)
}
return connect.NewResponse(&querierv1.ProfileTypesResponse{}), nil
return Query[querierv1.ProfileTypesRequest, querierv1.ProfileTypesResponse](ctx, r, c,
func(a, b *querierv1.ProfileTypesResponse) (*querierv1.ProfileTypesResponse, error) {
pTypes := a.ProfileTypes
for _, pType := range b.ProfileTypes {
if !slices.Contains(pTypes, pType) {
pTypes = append(pTypes, pType)
}
}
return &querierv1.ProfileTypesResponse{ProfileTypes: pTypes}, nil
})
}
Loading

0 comments on commit 26210f5

Please sign in to comment.