diff --git a/modules/generator/config.go b/modules/generator/config.go index d705ada2108..2f101a405bc 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -29,6 +29,7 @@ type Config struct { // MetricsIngestionSlack is the max amount of time passed since a span's start time // for the span to be considered in metrics generation MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"` + SummaryTimeout time.Duration `yaml:"summary_timeout"` } // RegisterFlagsAndApplyDefaults registers the flags. diff --git a/modules/generator/generator.go b/modules/generator/generator.go index 6e12b5a1e91..2b170ce7a53 100644 --- a/modules/generator/generator.go +++ b/modules/generator/generator.go @@ -283,3 +283,17 @@ func (g *Generator) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) { // OnRingInstanceHeartbeat implements ring.BasicLifecyclerDelegate func (g *Generator) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) { } + +func (q *Generator) SpanSummary(ctx context.Context, req *tempopb.SpanSummaryRequest) (*tempopb.SpanSummaryResponse, error) { + instanceID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + instance, err := q.getOrCreateInstance(instanceID) + if err != nil { + return nil, err + } + + return instance.spanSummary(ctx, req) +} diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 9f0e7efb8ba..722406eff02 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -349,6 +349,22 @@ func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { i.updatePushMetrics(size, spanCount, expiredSpanCount) } +func (i *instance) spanSummary(ctx context.Context, req *tempopb.SpanSummaryRequest) (resp *tempopb.SpanSummaryResponse, err error) { + select { + case <-ctx.Done(): + return nil, nil + // case i.spanSummaryCh <- req: + default: + // for _, processor := range i.processors { + // switch p := processor.(type) { + // case *localblocks.Processor: + // processor.GetMetrics(ctx, req) + // } + // } + return nil, fmt.Errorf("not implemented yet") + } +} + func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) { metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(bytesIngested)) metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount))