Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve pattern ingester tracing (backport k226) #14731

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) {
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.tenantConfigs,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,
logger,
Expand Down
37 changes: 35 additions & 2 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -160,7 +162,13 @@ func (p *Push) Stop() {
}

// buildPayload creates the snappy compressed protobuf to send to Loki
func (p *Push) buildPayload() ([]byte, error) {
func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {
sp, _ := opentracing.StartSpanFromContext(
ctx,
"patternIngester.aggregation.Push.buildPayload",
)
defer sp.Finish()

entries := p.entries.reset()

entriesByStream := make(map[string][]logproto.Entry)
Expand All @@ -179,6 +187,14 @@ func (p *Push) buildPayload() ([]byte, error) {
}

streams := make([]logproto.Stream, 0, len(entriesByStream))

// limit the number of services to log to 1000
serviceLimit := len(entriesByStream)
if serviceLimit > 1000 {
serviceLimit = 1000
}

services := make([]string, 0, serviceLimit)
for s, entries := range entriesByStream {
lbls, err := syntax.ParseLabels(s)
if err != nil {
Expand All @@ -190,6 +206,10 @@ func (p *Push) buildPayload() ([]byte, error) {
Entries: entries,
Hash: lbls.Hash(),
})

if len(services) < serviceLimit {
services = append(services, lbls.Get(push.AggregatedMetricLabel))
}
}

req := &logproto.PushRequest{
Expand All @@ -202,6 +222,14 @@ func (p *Push) buildPayload() ([]byte, error) {

payload = snappy.Encode(nil, payload)

sp.LogKV(
"event", "build aggregated metrics payload",
"num_service", len(entriesByStream),
"first_1k_services", strings.Join(services, ","),
"num_streams", len(streams),
"num_entries", len(entries),
)

return payload, nil
}

Expand All @@ -221,7 +249,7 @@ func (p *Push) run(pushPeriod time.Duration) {
cancel()
return
case <-pushTicker.C:
payload, err := p.buildPayload()
payload, err := p.buildPayload(ctx)
if err != nil {
level.Error(p.logger).Log("msg", "failed to build payload", "err", err)
continue
Expand Down Expand Up @@ -265,9 +293,14 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
err error
resp *http.Response
)

// Set a timeout for the request
ctx, cancel := context.WithTimeout(ctx, p.httpClient.Timeout)
defer cancel()

sp, ctx := opentracing.StartSpanFromContext(ctx, "patternIngester.aggregation.Push.send")
defer sp.Finish()

req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload))
if err != nil {
return -1, fmt.Errorf("failed to create push request: %w", err)
Expand Down
17 changes: 15 additions & 2 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -95,7 +96,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
for _, reqStream := range req.Streams {
// All streams are observed for metrics
// TODO(twhitney): this would be better as a queue that drops in response to backpressure
i.Observe(reqStream.Labels, reqStream.Entries)
i.Observe(ctx, reqStream.Labels, reqStream.Entries)

// But only owned streamed are processed for patterns
ownedStream, err := i.isOwnedStream(i.ingesterID, reqStream.Labels)
Expand Down Expand Up @@ -252,10 +253,22 @@ func (i *instance) removeStream(s *stream) {
}
}

func (i *instance) Observe(stream string, entries []logproto.Entry) {
func (i *instance) Observe(ctx context.Context, stream string, entries []logproto.Entry) {
i.aggMetricsLock.Lock()
defer i.aggMetricsLock.Unlock()

sp, _ := opentracing.StartSpanFromContext(
ctx,
"patternIngester.Observe",
)
defer sp.Finish()

sp.LogKV(
"event", "observe stream for metrics",
"stream", stream,
"entries", len(entries),
)

for _, entry := range entries {
lvl := constants.LogLevelUnknown
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
Expand Down
43 changes: 42 additions & 1 deletion pkg/pattern/tee_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util/spanlogger"

ring_client "github.com/grafana/dskit/ring/client"
)

type TeeService struct {
cfg Config
limits Limits
tenantCfgs *runtime.TenantConfigs
logger log.Logger
ringClient RingClient
wg *sync.WaitGroup
Expand All @@ -51,6 +54,7 @@ func NewTeeService(
cfg Config,
limits Limits,
ringClient RingClient,
tenantCfgs *runtime.TenantConfigs,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
Expand Down Expand Up @@ -86,6 +90,7 @@ func NewTeeService(
),
cfg: cfg,
limits: limits,
tenantCfgs: tenantCfgs,
ringClient: ringClient,

wg: &sync.WaitGroup{},
Expand Down Expand Up @@ -293,10 +298,11 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
// are gathered by this request
_ = instrument.CollectedRequest(
ctx,
"FlushTeedLogsToPatternIngested",
"FlushTeedLogsToPatternIngester",
ts.sendDuration,
instrument.ErrorCode,
func(ctx context.Context) error {
sp := spanlogger.FromContext(ctx)
client, err := ts.ringClient.GetClientFor(clientRequest.ingesterAddr)
if err != nil {
return err
Expand All @@ -313,6 +319,41 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
// Success here means the stream will be processed for both metrics and patterns
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "success").Inc()
ts.ingesterMetricAppends.WithLabelValues("success").Inc()

// limit logged labels to 1000
labelsLimit := len(req.Streams)
if labelsLimit > 1000 {
labelsLimit = 1000
}

labels := make([]string, 0, labelsLimit)
for _, stream := range req.Streams {
if len(labels) >= 1000 {
break
}

labels = append(labels, stream.Labels)
}

sp.LogKV(
"event", "forwarded push request to pattern ingester",
"num_streams", len(req.Streams),
"first_1k_labels", strings.Join(labels, ", "),
"tenant", clientRequest.tenant,
)

// this is basically the same as logging push request streams,
// so put it behind the same flag
if ts.tenantCfgs.LogPushRequestStreams(clientRequest.tenant) {
level.Debug(ts.logger).
Log(
"msg", "forwarded push request to pattern ingester",
"num_streams", len(req.Streams),
"first_1k_labels", strings.Join(labels, ", "),
"tenant", clientRequest.tenant,
)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/pattern/tee_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -51,6 +52,7 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) {
metricAggregationEnabled: true,
},
ringClient,
runtime.DefaultTenantConfigs(),
"test",
nil,
log.NewNopLogger(),
Expand Down
Loading