Skip to content

Commit

Permalink
Add backoff to flush op
Browse files Browse the repository at this point in the history
This commit adds a configurable backoff to flush ops in the ingester.
This is to prevent situations where the store put operation fails fast
(i.e. 401 Unauthorized) and can cause ingesters to be rate limited.
  • Loading branch information
grobinson-grafana committed Jun 5, 2024
1 parent 21dd4af commit 48bbf5c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 6 deletions.
41 changes: 36 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ingester

import (
"bytes"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -147,7 +149,7 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
err := i.flushOp(op)
if err != nil {
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
}
Expand All @@ -161,7 +163,39 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
func (i *Ingester) flushOp(op *flushOp) error {
// A flush is retried until either it is successful, the maximum number
// of retries is exceeded, or the timeout has expired. The context is
// used to cancel the backoff should the latter happen.
ctx, cancelFunc := context.WithTimeout(context.Background(), i.cfg.FlushOpTimeout)
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
if err == nil {
break
}
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}

if err := b.Err(); err != nil {
// If we got here then either the maximum number of retries have
// been exceeded or the timeout expired. We do not need to check
// ctx.Err() as it is checked in b.Err().
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("timed out after %s: %w", i.cfg.FlushOpTimeout, err)
}
return err
}

return nil
}

func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {

Check failure on line 196 in pkg/ingester/flush.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

SA4009: argument ctx is overwritten before first use (staticcheck)
ctx = user.InjectOrgID(context.Background(), userID)

Check failure on line 197 in pkg/ingester/flush.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

SA4009(related information): assignment to ctx (staticcheck)

instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
Expand All @@ -175,9 +209,6 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
lbs := labels.String()
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
if err != nil {
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)
Expand Down
98 changes: 98 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"errors"
"fmt"
"os"
"sort"
Expand Down Expand Up @@ -102,6 +103,99 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}

func Test_FlushOp(t *testing.T) {
t.Run("no error", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

_, ing := newTestStore(t, cfg, nil)

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.NoError(t, ing.flushOp(&flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}))
})

t.Run("max retries exceeded", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
return errors.New("failed to write chunks")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(&flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "terminated after 1 retries")
})

t.Run("timeout expired", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = time.Second
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
return errors.New("store is unavailable")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(&flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "timed out after 1s: context deadline exceeded")
})
}

func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
Expand Down Expand Up @@ -297,6 +391,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = 15 * time.Second
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.MaxChunkIdle = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
Expand Down
16 changes: 15 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Config struct {

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
Expand Down Expand Up @@ -126,7 +128,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes. Is canceled when `ingester.flush-op-timeout` is exceeded.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is canceled.")
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
Expand Down Expand Up @@ -154,6 +159,15 @@ func (cfg *Config) Validate() error {
return err
}

if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
}
if cfg.FlushOpBackoff.MaxRetries <= 0 {
return fmt.Errorf("invalid flush op max retries: %s", cfg.FlushOpBackoff.MaxRetries)
}

Check failure on line 167 in pkg/ingester/ingester.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

printf: fmt.Errorf format %s has arg cfg.FlushOpBackoff.MaxRetries of wrong type int (govet)
if cfg.FlushOpTimeout <= 0 {
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
}
if cfg.IndexShards <= 0 {
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
}
Expand Down

0 comments on commit 48bbf5c

Please sign in to comment.