Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let store-gateways ignore blocks that are too young. #502

Merged
merged 9 commits into from
Nov 18, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
* [ENHANCEMENT] Querier&Ruler: reduce cpu usage, latency and peak memory consumption. #459 #463
* [ENHANCEMENT] Overrides Exporter: Add `max_fetched_chunks_per_query` limit to the default and per-tenant limits exported as metrics. #471
* [ENHANCEMENT] Compactor (blocks cleaner): Delete blocks marked for deletion faster. #490
* [ENHANCEMENT] Store-gateway: store-gateway can now ignore blocks with minimum time within `-blocks-storage.bucket-store.ignore-blocks-within` duration. Useful when used together with `-querier.query-store-after`. #502
* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #206
* [BUGFIX] Fixes a panic in the query-tee when comparing result. #207
* [BUGFIX] Upgrade Prometheus. TSDB now waits for pending readers before truncating Head block, fixing the `chunk not found` error and preventing wrong query results. #16
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,14 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# Blocks with minimum time within this duration are ignored, and not loaded
# by store-gateway. Useful when used together with
# -querier.query-store-after to prevent loading young blocks, because there
# are usually many of them (depending on number of ingesters) and they are
# not yet compacted. Negative values or 0 disable the filter.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,14 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# Blocks with minimum time within this duration are ignored, and not loaded
# by store-gateway. Useful when used together with
# -querier.query-store-after to prevent loading young blocks, because there
# are usually many of them (depending on number of ingesters) and they are
# not yet compacted. Negative values or 0 disable the filter.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4985,6 +4985,14 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# Blocks with minimum time within this duration are ignored, and not loaded by
# store-gateway. Useful when used together with -querier.query-store-after to
# prevent loading young blocks, because there are usually many of them
# (depending on number of ingesters) and they are not yet compacted. Negative
# values or 0 disable the filter.
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
[ignore_blocks_within: <duration> | default = 0s]

# Max size - in bytes - of a chunks pool, used to reduce memory allocations.
# The pool is shared across all tenants. 0 to disable the limit.
# CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type BucketStoreConfig struct {
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`

// Chunk pool.
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
Expand Down Expand Up @@ -305,6 +306,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.IgnoreDeletionMarksDelay, "blocks-storage.bucket-store.ignore-deletion-marks-delay", time.Hour*6, "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"Default is 6h, half of the default value for -compactor.deletion-delay.")
f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "Blocks with minimum time within this duration are ignored, and not loaded by store-gateway. Useful when used together with -querier.query-store-after to prevent loading young blocks, because there are usually many of them (depending on number of ingesters) and they are not yet compacted. Negative values or 0 disable the filter.")
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewBucketIndexMetadataFetcher(
logger: logger,
filters: filters,
modifiers: modifiers,
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil),
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}, {minTimeExcludedMeta}}, nil),
}
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -43,19 +44,22 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
block1 := &bucketindex.Block{ID: ulid.MustNew(1, nil)}
block2 := &bucketindex.Block{ID: ulid.MustNew(2, nil)}
block3 := &bucketindex.Block{ID: ulid.MustNew(3, nil)}
block4 := &bucketindex.Block{ID: ulid.MustNew(4, nil), MinTime: timestamp.FromTime(now.Add(-30 * time.Minute))} // Has most-recent data, to be ignored by minTimeMetaFilter.

mark1 := &bucketindex.BlockDeletionMark{ID: block1.ID, DeletionTime: now.Add(-time.Hour).Unix()} // Below the ignore delay threshold.
mark2 := &bucketindex.BlockDeletionMark{ID: block2.ID, DeletionTime: now.Add(-3 * time.Hour).Unix()} // Above the ignore delay threshold.

require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{
Version: bucketindex.IndexVersion1,
Blocks: bucketindex.Blocks{block1, block2, block3},
Blocks: bucketindex.Blocks{block1, block2, block3, block4},
BlockDeletionMarks: bucketindex.BlockDeletionMarks{mark1, mark2},
UpdatedAt: now.Unix(),
}))

// Create a metadata fetcher with filters.
filters := []block.MetadataFilter{
NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt, nil), 2*time.Hour, 1),
newMinTimeMetaFilter(1 * time.Hour),
}

fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, filters, nil)
Expand Down Expand Up @@ -90,6 +94,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 1
blocks_meta_synced{state="too-fresh"} 0

# HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts
Expand Down Expand Up @@ -141,6 +146,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
blocks_meta_synced{state="no-bucket-index"} 1
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0

# HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts
Expand Down Expand Up @@ -195,6 +201,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0

# HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts
Expand Down Expand Up @@ -241,6 +248,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))

Expand All @@ -265,6 +273,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 1
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))

Expand Down Expand Up @@ -297,6 +306,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))

Expand All @@ -323,6 +333,7 @@ func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T)
blocks_meta_synced{state="no-bucket-index"} 0
blocks_meta_synced{state="no-meta-json"} 0
blocks_meta_synced{state="time-excluded"} 0
blocks_meta_synced{state="min-time-excluded"} 0
blocks_meta_synced{state="too-fresh"} 0
`), "blocks_meta_synced"))
}
6 changes: 4 additions & 2 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,17 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
fetcherReg := prometheus.NewRegistry()

// The sharding strategy filter MUST be before the ones we create here (order matters).
filters := append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{
filters := []block.MetadataFilter{
NewShardingMetadataFilterAdapter(userID, u.shardingStrategy),
block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg),
newMinTimeMetaFilter(u.cfg.BucketStore.IgnoreBlocksWithin),
// Use our own custom implementation.
NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency),
// The duplicate filter has been intentionally omitted because it could cause troubles with
// the consistency check done on the querier. The duplicate filter removes redundant blocks
// but if the store-gateway removes redundant blocks before the querier discovers them, the
// consistency check on the querier will fail.
}...)
}

modifiers := []block.MetadataModifier{
// Remove Mimir external labels so that they're not injected when querying blocks.
Expand Down
30 changes: 30 additions & 0 deletions pkg/storegateway/metadata_fetcher_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -81,3 +82,32 @@ func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, meta

return nil
}

const minTimeExcludedMeta = "min-time-excluded"

// minTimeMetaFilter filters out blocks that contain the most recent data (based on block MinTime).
type minTimeMetaFilter struct {
limit time.Duration
}

func newMinTimeMetaFilter(limit time.Duration) *minTimeMetaFilter {
return &minTimeMetaFilter{limit: limit}
}

func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
if f.limit <= 0 {
return nil
}

limitTime := timestamp.FromTime(time.Now().Add(-f.limit))

for id, m := range metas {
if m.MinTime < limitTime {
continue
}

synced.WithLabelValues(minTimeExcludedMeta).Inc()
delete(metas, id)
}
return nil
}
38 changes: 38 additions & 0 deletions pkg/storegateway/metadata_fetcher_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -110,3 +112,39 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) {
assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, expectedDeletionMarks, f.DeletionMarkBlocks())
}

func TestTimeMetaFilter(t *testing.T) {
now := time.Now()
limit := 10 * time.Minute
limitTime := now.Add(-limit)

ulid1 := ulid.MustNew(1, nil)
ulid2 := ulid.MustNew(2, nil)
ulid3 := ulid.MustNew(3, nil)
ulid4 := ulid.MustNew(4, nil)

inputMetas := map[ulid.ULID]*metadata.Meta{
ulid1: {BlockMeta: tsdb.BlockMeta{MinTime: 100}}, // Very old, keep it
ulid2: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(now)}}, // Fresh block, remove.
ulid3: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(limitTime.Add(time.Minute))}}, // Inside limit time, remove.
ulid4: {BlockMeta: tsdb.BlockMeta{MinTime: timestamp.FromTime(limitTime.Add(-time.Minute))}}, // Before limit time, keep.
}

expectedMetas := map[ulid.ULID]*metadata.Meta{}
expectedMetas[ulid1] = inputMetas[ulid1]
expectedMetas[ulid4] = inputMetas[ulid4]

synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{Name: "synced"}, []string{"state"})

// Test negative limit.
f := newMinTimeMetaFilter(-10 * time.Minute)
require.NoError(t, f.Filter(context.Background(), inputMetas, synced))
assert.Equal(t, inputMetas, inputMetas)
assert.Equal(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta)))

f = newMinTimeMetaFilter(limit)
require.NoError(t, f.Filter(context.Background(), inputMetas, synced))

assert.Equal(t, expectedMetas, inputMetas)
assert.Equal(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(minTimeExcludedMeta)))
}