Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new metrics API to generator and protobuf definition #2424

Merged
merged 31 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ecfcdd6
Add proto for new API span-summary
zalegrala May 3, 2023
c26bc47
Add breadcumbs for HTTP
zalegrala May 3, 2023
15cd408
Breadcrumbs for generator to serve a span summary
zalegrala May 3, 2023
f6b664c
Convert the HTTP request into a tempopb request
zalegrala May 3, 2023
a9f7e63
Whitespace
zalegrala May 3, 2023
7c6040d
Fix variable reference
zalegrala May 3, 2023
32d4e88
Update proto for generator specific API, final results to be handled …
zalegrala May 4, 2023
7e2e99c
Update http api for proto
zalegrala May 4, 2023
68c7d6c
Breadcumbs and a rename
zalegrala May 9, 2023
c43b357
Enable API
zalegrala May 10, 2023
02ef5fc
Return on http error
zalegrala May 10, 2023
1587842
Update proto
zalegrala May 10, 2023
b73536d
Begin handling series in API
zalegrala May 10, 2023
e2c872c
Update proto and avoid empty buckets
zalegrala May 10, 2023
e1e3b6d
Adhere to spanlimit only when non zero
zalegrala May 10, 2023
b7042bf
Accept the limit param
zalegrala May 10, 2023
5ad71fa
Whitespace
zalegrala May 10, 2023
8c9c4c5
Allocate early
zalegrala May 10, 2023
c21a4e6
Avoid wishful data types
zalegrala May 10, 2023
90d44ab
Adjust estimated for spanlimit unset
zalegrala May 11, 2023
b6fbaaf
Add error count to metrics
zalegrala May 11, 2023
0801ca7
Include doc
zalegrala May 11, 2023
e6de31f
Avoid creating a generator instance
zalegrala May 11, 2023
9946301
Reintroduce query timeout
zalegrala May 11, 2023
9c036cb
Fix instance creation
zalegrala May 12, 2023
07a5a1b
Adjust API path
zalegrala May 12, 2023
1a34efa
Rename handler
zalegrala May 12, 2023
9fb409a
Align function names
zalegrala May 12, 2023
1e4a3bd
Update proto for interface rename
zalegrala May 12, 2023
48f6420
Align function names
zalegrala May 12, 2023
76a606f
Fix instance creation return values
zalegrala May 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func (t *App) initGenerator() (services.Service, error) {
}
t.generator = genSvc

spanStatsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.SpanMetricsHandler))
t.Server.HTTP.Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetrics)), spanStatsHandler)

tempopb.RegisterMetricsGeneratorServer(t.Server.GRPC, t.generator)

return t.generator, nil
Expand Down
2 changes: 2 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Config struct {
// MetricsIngestionSlack is the max amount of time passed since a span's start time
// for the span to be considered in metrics generation
MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"`
QueryTimeout time.Duration `yaml:"query_timeout"`
}

// RegisterFlagsAndApplyDefaults registers the flags.
Expand All @@ -45,6 +46,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
cfg.QueryTimeout = 30 * time.Second
}

type ProcessorConfig struct {
Expand Down
15 changes: 15 additions & 0 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,18 @@ func (g *Generator) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {
// OnRingInstanceHeartbeat implements ring.BasicLifecyclerDelegate
func (g *Generator) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) {
}

func (g *Generator) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequest) (*tempopb.SpanMetricsResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

// return empty if we don't have an instance
instance, ok := g.getInstanceByID(instanceID)
if !ok || instance == nil {
return &tempopb.SpanMetricsResponse{}, nil
}

return instance.GetMetrics(ctx, req)
}
43 changes: 43 additions & 0 deletions modules/generator/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package generator

import (
"context"
"net/http"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
)

func (g *Generator) SpanMetricsHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(g.cfg.QueryTimeout))
defer cancel()

span, ctx := opentracing.StartSpanFromContext(ctx, "Generator.SpanMetricsHandler")
defer span.Finish()

span.SetTag("requestURI", r.RequestURI)

req, err := api.ParseSpanMetricsRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var resp *tempopb.SpanMetricsResponse
resp, err = g.GetMetrics(ctx, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(w, resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set(api.HeaderContentType, api.HeaderAcceptJSON)
}
12 changes: 12 additions & 0 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,18 @@ func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) {
i.updatePushMetrics(size, spanCount, expiredSpanCount)
}

func (i *instance) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequest) (resp *tempopb.SpanMetricsResponse, err error) {
for _, processor := range i.processors {
switch p := processor.(type) {
case *localblocks.Processor:
return p.GetMetrics(ctx, req)
default:
}
}

return nil, fmt.Errorf("localblocks processor not found")
}

func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) {
metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(bytesIngested))
metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount))
Expand Down
66 changes: 66 additions & 0 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/traceqlmetrics"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
Expand Down Expand Up @@ -242,6 +244,57 @@ func (p *Processor) completeBlock() error {
return nil
}

func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequest) (*tempopb.SpanMetricsResponse, error) {

fetcher := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
if p.headBlock == nil {
return traceql.FetchSpansResponse{}, fmt.Errorf("head block is nil")
}
return p.headBlock.Fetch(ctx, req, common.DefaultSearchOptions())
})

m, err := traceqlmetrics.GetMetrics(ctx, req.Query, req.GroupBy, int(req.Limit), fetcher)
if err != nil {
return nil, errors.Wrap(err, "failed to get metrics")
}

resp := &tempopb.SpanMetricsResponse{
SpanCount: uint64(m.SpanCount),
Estimated: m.Estimated,
Metrics: make([]*tempopb.SpanMetrics, 0, len(m.Series)),
}

var rawHistorgram *tempopb.RawHistogram
var errCount int
for static, series := range m.Series {
h := []*tempopb.RawHistogram{}

for bucket, count := range series.Buckets() {
if count != 0 {
rawHistorgram = &tempopb.RawHistogram{
Bucket: uint64(bucket),
Count: uint64(count),
}

h = append(h, rawHistorgram)
}
}

errCount = 0
if errs, ok := m.Errors[static]; ok {
errCount = errs
}

resp.Metrics = append(resp.Metrics, &tempopb.SpanMetrics{
LatencyHistogram: h,
Static: toStaticProto(static),
Errors: uint64(errCount),
})
}

return resp, nil
}

func (p *Processor) deleteOldBlocks() (err error) {
p.blocksMtx.Lock()
defer p.blocksMtx.Unlock()
Expand Down Expand Up @@ -490,3 +543,16 @@ func filterBatch(batch *v1.ResourceSpans) *v1.ResourceSpans {

return nil
}

func toStaticProto(static traceql.Static) *tempopb.TraceQLStatic {
return &tempopb.TraceQLStatic{
Type: int32(static.Type),
N: int64(static.N),
F: static.F,
S: static.S,
B: static.B,
D: uint64(static.D),
Status: int32(static.Status),
Kind: int32(static.Kind),
}
}
28 changes: 27 additions & 1 deletion pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,24 @@ const (
// search tags
urlParamScope = "scope"

// generator summary
urlParamGroupBy = "groupBy"

HeaderAccept = "Accept"
HeaderContentType = "Content-Type"
HeaderAcceptProtobuf = "application/protobuf"
HeaderAcceptJSON = "application/json"

PathPrefixQuerier = "/querier"
PathPrefixQuerier = "/querier"
PathPrefixGenerator = "/generator"

PathTraces = "/api/traces/{traceID}"
PathSearch = "/api/search"
PathSearchTags = "/api/search/tags"
PathSearchTagValues = "/api/search/tag/{" + muxVarTagName + "}/values"
PathEcho = "/api/echo"
PathUsageStats = "/status/usage-stats"
PathSpanMetrics = "/api/metrics"

PathSearchTagValuesV2 = "/api/v2/search/tag/{" + muxVarTagName + "}/values"
PathSearchTagsV2 = "/api/v2/search/tags"
Expand Down Expand Up @@ -313,6 +318,27 @@ func ParseSearchBlockRequest(r *http.Request) (*tempopb.SearchBlockRequest, erro
return req, nil
}

func ParseSpanMetricsRequest(r *http.Request) (*tempopb.SpanMetricsRequest, error) {
req := &tempopb.SpanMetricsRequest{}

groupBy := r.URL.Query().Get(urlParamGroupBy)
req.GroupBy = groupBy

query := r.URL.Query().Get(urlParamQuery)
req.Query = query

l := r.URL.Query().Get(urlParamLimit)
if l != "" {
limit, err := strconv.Atoi(l)
if err != nil {
return nil, fmt.Errorf("invalid limit: %w", err)
}
req.Limit = uint64(limit)
}

return req, nil
}

// BuildSearchRequest takes a tempopb.SearchRequest and populates the passed http.Request
// with the appropriate params. If no http.Request is provided a new one is created.
func BuildSearchRequest(req *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) {
Expand Down
Loading