From 48bbf5ca0526e1f864dfb737454575f0b883c722 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 5 Jun 2024 12:01:22 +0100 Subject: [PATCH] Add backoff to flush op This commit adds a configurable backoff to flush ops in the ingester. This is to prevent situations where the store put operation fails fast (i.e. 401 Unauthorized) and can cause ingesters to be rate limited. --- pkg/ingester/flush.go | 41 ++++++++++++++-- pkg/ingester/flush_test.go | 98 ++++++++++++++++++++++++++++++++++++++ pkg/ingester/ingester.go | 16 ++++++- 3 files changed, 149 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 00aad05475495..a31aa91fac225 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -2,12 +2,14 @@ package ingester import ( "bytes" + "errors" "fmt" "net/http" "sync" "time" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" @@ -147,7 +149,7 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - err := i.flushUserSeries(op.userID, op.fp, op.immediate) + err := i.flushOp(op) if err != nil { level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err) } @@ -161,7 +163,39 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { +func (i *Ingester) flushOp(op *flushOp) error { + // A flush is retried until either it is successful, the maximum number + // of retries is exceeded, or the timeout has expired. The context is + // used to cancel the backoff should the latter happen. + ctx, cancelFunc := context.WithTimeout(context.Background(), i.cfg.FlushOpTimeout) + defer cancelFunc() + + b := backoff.New(ctx, i.cfg.FlushOpBackoff) + for b.Ongoing() { + err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate) + if err == nil { + break + } + level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err) + b.Wait() + } + + if err := b.Err(); err != nil { + // If we got here then either the maximum number of retries have + // been exceeded or the timeout expired. We do not need to check + // ctx.Err() as it is checked in b.Err(). + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("timed out after %s: %w", i.cfg.FlushOpTimeout, err) + } + return err + } + + return nil +} + +func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error { + ctx = user.InjectOrgID(context.Background(), userID) + instance, ok := i.getInstanceByID(userID) if !ok { return nil @@ -175,9 +209,6 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat lbs := labels.String() level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs) - ctx := user.InjectOrgID(context.Background(), userID) - ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) - defer cancel() err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) if err != nil { return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 6fd52bafa066f..81c741826d11a 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -1,6 +1,7 @@ package ingester import ( + "errors" "fmt" "os" "sort" @@ -102,6 +103,99 @@ func Benchmark_FlushLoop(b *testing.B) { } } +func Test_FlushOp(t *testing.T) { + t.Run("no error", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + _, ing := newTestStore(t, cfg, nil) + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.NoError(t, ing.flushOp(&flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + })) + }) + + t.Run("max retries exceeded", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("failed to write chunks") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(&flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "terminated after 1 retries") + }) + + t.Run("timeout expired", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = time.Second + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("store is unavailable") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(&flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "timed out after 1s: context deadline exceeded") + }) +} + func Test_Flush(t *testing.T) { var ( store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) @@ -297,6 +391,10 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg := Config{} flagext.DefaultValues(&cfg) + cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = 15 * time.Second cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6d27d349c93f4..b837d28342fa9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" @@ -81,6 +82,7 @@ type Config struct { ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` @@ -126,7 +128,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.") + f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes. Is canceled when `ingester.flush-op-timeout` is exceeded.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is canceled.") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") @@ -154,6 +159,15 @@ func (cfg *Config) Validate() error { return err } + if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff { + return errors.New("invalid flush op min backoff: cannot be larger than max backoff") + } + if cfg.FlushOpBackoff.MaxRetries <= 0 { + return fmt.Errorf("invalid flush op max retries: %s", cfg.FlushOpBackoff.MaxRetries) + } + if cfg.FlushOpTimeout <= 0 { + return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout) + } if cfg.IndexShards <= 0 { return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards) }