From ad22540792b5ddea9f0faa4416d3aa3a2465d65d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 20 Sep 2024 14:38:49 +0200 Subject: [PATCH] kafka replay speed: adjust batchingQueueCapacity (#9344) * kafka replay speed: adjust batchingQueueCapacity I made 2000 up when we were flushing individual series to the channel. Then 2000 might have made sense, but when flushing whole WriteRequests a capacity of 1 should be sufficient. Signed-off-by: Dimitar Dimitrov * Increase errCh capacity Signed-off-by: Dimitar Dimitrov * Explain why +1 Signed-off-by: Dimitar Dimitrov * Set capacity to 5 Signed-off-by: Dimitar Dimitrov * Update pkg/storage/ingest/pusher.go Co-authored-by: gotjosh * Improve test Signed-off-by: Dimitar Dimitrov * Update pkg/storage/ingest/pusher.go --------- Signed-off-by: Dimitar Dimitrov Co-authored-by: gotjosh --- pkg/storage/ingest/pusher.go | 11 ++++++--- pkg/storage/ingest/pusher_test.go | 39 +++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index f67a5eed029..1a78ab25444 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -24,7 +24,12 @@ import ( "github.com/grafana/mimir/pkg/util/spanlogger" ) -const shardForSeriesBuffer = 2000 // TODO dimitarvdimitrov 2000 is arbitrary; the idea is that we don't block the goroutine calling PushToStorage while we're flushing. A linked list with a sync.Cond or something different would also work +// batchingQueueCapacity controls how many batches can be enqueued for flushing. +// We don't want to push any batches in parallel and instead want to prepare the next one while the current one finishes, hence the buffer of 1. +// For example, if we flush 1 batch/sec, then batching 2 batches/sec doesn't make us faster. +// This is our initial assumption, and there's potential in testing with higher numbers if there's a high variability in flush times - assuming we can preserve the order of the batches. For now, we'll stick to 5. +// If there's high variability in the time to flush or in the time to batch, then this buffer might need to be increased. +const batchingQueueCapacity = 5 type Pusher interface { PushToStorage(context.Context, *mimirpb.WriteRequest) error @@ -332,7 +337,7 @@ func (c parallelStoragePusher) shardsFor(userID string) *parallelStorageShards { } // Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes. hashLabels := labels.Labels.Hash - p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, shardForSeriesBuffer, c.upstreamPusher, hashLabels) + p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels) c.pushers[userID] = p return p } @@ -462,7 +467,7 @@ type batchingQueue struct { func newBatchingQueue(capacity int, batchSize int) *batchingQueue { return &batchingQueue{ ch: make(chan flushableWriteRequest, capacity), - errCh: make(chan error, capacity), + errCh: make(chan error, capacity+1), // We check errs before pushing to the channel, so we need to have a buffer of at least capacity+1 so that the consumer can push all of its errors and not rely on the producer to unblock it. done: make(chan struct{}), currentBatch: flushableWriteRequest{WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}}, batchSize: batchSize, diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index c23f2f445f2..8a3e104e373 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -638,6 +639,44 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { } } +func TestBatchingQueue_NoDeadlock(t *testing.T) { + capacity := 2 + batchSize := 3 + queue := newBatchingQueue(capacity, batchSize) + + ctx := context.Background() + series := mockPreallocTimeseries("series_1") + + // Start a goroutine to process the queue + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer queue.Done() + for range queue.Channel() { + // Simulate processing time + time.Sleep(50 * time.Millisecond) + queue.ErrorChannel() <- fmt.Errorf("mock error") + } + }() + + // Add items to the queue + for i := 0; i < batchSize*(capacity+1); i++ { + require.NoError(t, queue.AddToBatch(ctx, series)) + } + + // Close the queue to signal no more items will be added + err := queue.Close() + require.ErrorContains(t, err, "mock error") + + wg.Wait() + + // Ensure the queue is empty and no deadlock occurred + require.Len(t, queue.ch, 0) + require.Len(t, queue.errCh, 0) + require.Len(t, queue.currentBatch.Timeseries, 0) +} + func TestBatchingQueue(t *testing.T) { capacity := 5 batchSize := 3