Skip to content

Commit

Permalink
Fixed bug that blocks cannot be fully deleted from TSDB (cortexprojec…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexqyle authored Sep 25, 2024
1 parent 409f065 commit f21e06d
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ type userTSDB struct {
// Used to dedup strings and keep a single reference in memory
labelsStringInterningEnabled bool
interner util.Interner

blockRetentionPeriod int64
}

// Explicitly wrapping the tsdb.DB functions that we use.
Expand Down Expand Up @@ -470,6 +472,14 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
return nil
}
deletable := tsdb.DefaultBlocksToDelete(u.db)(blocks)

now := time.Now().UnixMilli()
for _, b := range blocks {
if now-b.MaxTime() >= u.blockRetentionPeriod {
deletable[b.Meta().ULID] = struct{}{}
}
}

if u.shipper == nil {
return deletable
}
Expand Down Expand Up @@ -2152,6 +2162,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
instanceSeriesCount: &i.TSDBState.seriesCount,
interner: util.NewInterner(),
labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled,

blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(),
}

enableExemplars := false
Expand Down
225 changes: 225 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -5327,6 +5328,194 @@ func Test_Ingester_ModeHandler(t *testing.T) {
}
}

func TestIngester_UserTSDB_BlocksToDelete(t *testing.T) {
tempDir := t.TempDir()
db, err := tsdb.Open(tempDir, log.NewNopLogger(), prometheus.NewPedanticRegistry(), &tsdb.Options{}, nil)
require.NoError(t, err)

t.Run("should delete all block beyond block retention period and were shipped", func(t *testing.T) {
currentTime := time.Now()
var blocks []*tsdb.Block
block1 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-3*time.Hour).UnixMilli(), currentTime.Add(-2*time.Hour).UnixMilli())
blocks = append(blocks, block1)
block2 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-4*time.Hour).UnixMilli(), currentTime.Add(-3*time.Hour).UnixMilli())
blocks = append(blocks, block2)
block3 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-5*time.Hour).UnixMilli(), currentTime.Add(-4*time.Hour).UnixMilli())
blocks = append(blocks, block3)
block4 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-6*time.Hour).UnixMilli(), currentTime.Add(-5*time.Hour).UnixMilli())
blocks = append(blocks, block4)

shippedBlocks := map[ulid.ULID]struct{}{
block1.Meta().ULID: {},
block2.Meta().ULID: {},
block3.Meta().ULID: {},
block4.Meta().ULID: {},
}
userDB := &userTSDB{
db: db,
shipper: &shipperMock{},
shippedBlocks: shippedBlocks,
blockRetentionPeriod: 2 * time.Hour.Milliseconds(),
}

blocksToDelete := userDB.blocksToDelete(blocks)
require.Equal(t, 4, len(blocksToDelete))
require.Contains(t, blocksToDelete, block1.Meta().ULID)
require.Contains(t, blocksToDelete, block2.Meta().ULID)
require.Contains(t, blocksToDelete, block3.Meta().ULID)
require.Contains(t, blocksToDelete, block4.Meta().ULID)
})

t.Run("should not delete not-shipped block even it is beyond block retention period", func(t *testing.T) {
currentTime := time.Now()
var blocks []*tsdb.Block
block1 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-3*time.Hour).UnixMilli(), currentTime.Add(-2*time.Hour).UnixMilli())
blocks = append(blocks, block1)
block2 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-4*time.Hour).UnixMilli(), currentTime.Add(-3*time.Hour).UnixMilli())
blocks = append(blocks, block2)
block3 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-5*time.Hour).UnixMilli(), currentTime.Add(-4*time.Hour).UnixMilli())
blocks = append(blocks, block3)
block4 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-6*time.Hour).UnixMilli(), currentTime.Add(-5*time.Hour).UnixMilli())
blocks = append(blocks, block4)

shippedBlocks := map[ulid.ULID]struct{}{
block1.Meta().ULID: {},
block2.Meta().ULID: {},
block3.Meta().ULID: {},
}
userDB := &userTSDB{
db: db,
shipper: &shipperMock{},
shippedBlocks: shippedBlocks,
blockRetentionPeriod: 2 * time.Hour.Milliseconds(),
}

blocksToDelete := userDB.blocksToDelete(blocks)
require.Equal(t, 3, len(blocksToDelete))
require.Contains(t, blocksToDelete, block1.Meta().ULID)
require.Contains(t, blocksToDelete, block2.Meta().ULID)
require.Contains(t, blocksToDelete, block3.Meta().ULID)
require.NotContains(t, blocksToDelete, block4.Meta().ULID)
})

t.Run("should not delete any block not reaching block retention period and not shipped", func(t *testing.T) {
currentTime := time.Now()
var blocks []*tsdb.Block
block1 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-1*time.Hour).UnixMilli(), currentTime.Add(0).UnixMilli())
blocks = append(blocks, block1)
block2 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-1*time.Hour).UnixMilli(), currentTime.Add(0).UnixMilli())
blocks = append(blocks, block2)
block3 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-2*time.Hour).UnixMilli(), currentTime.Add(-1*time.Hour).UnixMilli())
blocks = append(blocks, block3)
block4 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-2*time.Hour).UnixMilli(), currentTime.Add(-1*time.Hour).UnixMilli())
blocks = append(blocks, block4)

shippedBlocks := map[ulid.ULID]struct{}{}
userDB := &userTSDB{
db: db,
shipper: &shipperMock{},
shippedBlocks: shippedBlocks,
blockRetentionPeriod: 2 * time.Hour.Milliseconds(),
}

blocksToDelete := userDB.blocksToDelete(blocks)
require.Equal(t, 0, len(blocksToDelete))
require.NotContains(t, blocksToDelete, block1.Meta().ULID)
require.NotContains(t, blocksToDelete, block2.Meta().ULID)
require.NotContains(t, blocksToDelete, block3.Meta().ULID)
require.NotContains(t, blocksToDelete, block4.Meta().ULID)
})

t.Run("should not delete block not reaching block retention period", func(t *testing.T) {
currentTime := time.Now()
var blocks []*tsdb.Block
block1 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-1*time.Hour).UnixMilli(), currentTime.Add(0).UnixMilli())
blocks = append(blocks, block1)
block2 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-4*time.Hour).UnixMilli(), currentTime.Add(-3*time.Hour).UnixMilli())
blocks = append(blocks, block2)
block3 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-5*time.Hour).UnixMilli(), currentTime.Add(-4*time.Hour).UnixMilli())
blocks = append(blocks, block3)
block4 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-6*time.Hour).UnixMilli(), currentTime.Add(-5*time.Hour).UnixMilli())
blocks = append(blocks, block4)

shippedBlocks := map[ulid.ULID]struct{}{
block2.Meta().ULID: {},
block3.Meta().ULID: {},
block4.Meta().ULID: {},
}
userDB := &userTSDB{
db: db,
shipper: &shipperMock{},
shippedBlocks: shippedBlocks,
blockRetentionPeriod: 2 * time.Hour.Milliseconds(),
}

blocksToDelete := userDB.blocksToDelete(blocks)
require.Equal(t, 3, len(blocksToDelete))
require.NotContains(t, blocksToDelete, block1.Meta().ULID)
require.Contains(t, blocksToDelete, block2.Meta().ULID)
require.Contains(t, blocksToDelete, block3.Meta().ULID)
require.Contains(t, blocksToDelete, block4.Meta().ULID)
})

t.Run("should not delete block not reaching block retention period even it is shipped", func(t *testing.T) {
currentTime := time.Now()
var blocks []*tsdb.Block
block1 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-2*time.Hour).UnixMilli(), currentTime.Add(-1*time.Hour).UnixMilli())
blocks = append(blocks, block1)
block2 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-4*time.Hour).UnixMilli(), currentTime.Add(-3*time.Hour).UnixMilli())
blocks = append(blocks, block2)
block3 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-5*time.Hour).UnixMilli(), currentTime.Add(-4*time.Hour).UnixMilli())
blocks = append(blocks, block3)
block4 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-6*time.Hour).UnixMilli(), currentTime.Add(-5*time.Hour).UnixMilli())
blocks = append(blocks, block4)

shippedBlocks := map[ulid.ULID]struct{}{
block1.Meta().ULID: {},
block2.Meta().ULID: {},
block3.Meta().ULID: {},
block4.Meta().ULID: {},
}
userDB := &userTSDB{
db: db,
shipper: &shipperMock{},
shippedBlocks: shippedBlocks,
blockRetentionPeriod: 2 * time.Hour.Milliseconds(),
}

blocksToDelete := userDB.blocksToDelete(blocks)
require.Equal(t, 3, len(blocksToDelete))
require.NotContains(t, blocksToDelete, block1.Meta().ULID)
require.Contains(t, blocksToDelete, block2.Meta().ULID)
require.Contains(t, blocksToDelete, block3.Meta().ULID)
require.Contains(t, blocksToDelete, block4.Meta().ULID)
})

t.Run("should delete all block beyond block retention period if there is no shipper", func(t *testing.T) {
currentTime := time.Now()
var blocks []*tsdb.Block
block1 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-3*time.Hour).UnixMilli(), currentTime.Add(-2*time.Hour).UnixMilli())
blocks = append(blocks, block1)
block2 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-4*time.Hour).UnixMilli(), currentTime.Add(-3*time.Hour).UnixMilli())
blocks = append(blocks, block2)
block3 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-5*time.Hour).UnixMilli(), currentTime.Add(-4*time.Hour).UnixMilli())
blocks = append(blocks, block3)
block4 := CreateBlock(t, context.Background(), tempDir, currentTime.Add(-6*time.Hour).UnixMilli(), currentTime.Add(-5*time.Hour).UnixMilli())
blocks = append(blocks, block4)
userDB := &userTSDB{
db: db,
blockRetentionPeriod: 2 * time.Hour.Milliseconds(),
}

blocksToDelete := userDB.blocksToDelete(blocks)
require.Equal(t, 4, len(blocksToDelete))
require.Contains(t, blocksToDelete, block1.Meta().ULID)
require.Contains(t, blocksToDelete, block2.Meta().ULID)
require.Contains(t, blocksToDelete, block3.Meta().ULID)
require.Contains(t, blocksToDelete, block4.Meta().ULID)
})
}

// mockTenantLimits exposes per-tenant limits based on a provided map
type mockTenantLimits struct {
limits map[string]*validation.Limits
Expand Down Expand Up @@ -5358,3 +5547,39 @@ func (l *mockTenantLimits) setLimits(userID string, limits *validation.Limits) {
defer l.m.Unlock()
l.limits[userID] = limits
}

func CreateBlock(t *testing.T, ctx context.Context, dir string, mint, maxt int64) *tsdb.Block {
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
headOpts.ChunkRange = 10000000000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
require.NoError(t, err)
defer func() {
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
e := os.RemoveAll(headOpts.ChunkDirRoot)
require.NoError(t, e)
}()

app := h.Appender(ctx)

var ref storage.SeriesRef
start := (maxt-mint)/2 + mint
_, err = app.Append(ref, labels.Labels{labels.Label{Name: "test_label", Value: "test_value"}}, start, float64(1))
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)

c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil)
require.NoError(t, err)

ids, err := c.Write(dir, h, mint, maxt, nil)
require.NoError(t, err)
blockId := ids[0]

blockDir := filepath.Join(dir, blockId.String())
logger := log.NewNopLogger()
block, err := tsdb.OpenBlock(logger, blockDir, nil)
require.NoError(t, err)

return block
}

0 comments on commit f21e06d

Please sign in to comment.