diff --git a/CHANGELOG.md b/CHANGELOG.md index 93650e7b8f..87d8c82db3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4508](https://github.com/thanos-io/thanos/pull/4508) Adjust and rename `ThanosSidecarUnhealthy` to `ThanosSidecarNoConnectionToStartedPrometheus`; Remove `ThanosSidecarPrometheusDown` alert; Remove unused `thanos_sidecar_last_heartbeat_success_time_seconds` metrics. - [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races. - [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint. +- [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter ## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1 diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 19b6dbf837..25310f9ef3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -113,7 +113,7 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").DurationVar(&sc.syncInterval) - cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1."). Default("20").IntVar(&sc.blockSyncConcurrency) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). diff --git a/docs/components/store.md b/docs/components/store.md index 3ae037beb9..cdc8605052 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -34,6 +34,7 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when constructing index-cache.json blocks from object storage. + Must be equal or greater than 1. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index f9f45202d9..77091ecbff 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -660,9 +660,9 @@ func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { func addNodeBySources(root, add *Node) bool { var rootNode *Node + childSources := add.Compaction.Sources for _, node := range root.Children { parentSources := node.Compaction.Sources - childSources := add.Compaction.Sources // Block exists with same sources, add as child. if contains(parentSources, childSources) && contains(childSources, parentSources) { diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 6659d20cc7..25a0af5ae9 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -247,6 +247,84 @@ func TestReloader_DirectoriesApply(t *testing.T) { testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) + stepFunc := func(rel int) { + t.Log("Performing step number", rel) + switch rel { + case 0: + // Create rule2.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // └─ rule2.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) + case 1: + // Update rule1.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml (*) + // └─ rule2.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) + case 2: + // Create dir/rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + case 3: + // Update the symlinked file and replace the symlink file to trigger fsnotify. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-002.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-002.yaml -> rule3-source.yaml (*) + // └─ rule3-source.yaml (*) + testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) + case 4: + // Update rule4.yaml in the symlinked directory. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> rule3-source.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml (*) + // └─ rule3-source.yaml + testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) g := sync.WaitGroup{} g.Add(1) @@ -267,90 +345,21 @@ func TestReloader_DirectoriesApply(t *testing.T) { reloadsMtx.Lock() rel := reloads + reloadsMtx.Unlock() if init && rel <= reloadsSeen { - reloadsMtx.Unlock() continue } - reloadsMtx.Unlock() - init = true - reloadsSeen = rel - t.Log("Performing step number", rel) - switch rel { - case 0: - // Create rule2.yaml. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // └─ rule2.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) - case 1: - // Update rule1.yaml. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml (*) - // └─ rule2.yaml - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, os.Rename(tempRule1File, path.Join(dir, "rule1.yaml"))) - case 2: - // Create dir/rule3.yaml (symlink to rule3-001.yaml). - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> dir2/rule3-001.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-001.yaml -> rule3-source.yaml - // └─ rule3-source.yaml - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) - testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) - case 3: - // Update the symlinked file and replace the symlink file to trigger fsnotify. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> dir2/rule3-002.yaml (*) - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml - // ├─ rule3-002.yaml -> rule3-source.yaml (*) - // └─ rule3-source.yaml (*) - testutil.Ok(t, os.Rename(tempRule3File, path.Join(dir2, "rule3-source.yaml"))) - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) - testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) - testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) - case 4: - // Update rule4.yaml in the symlinked directory. - // - // dir - // ├─ rule-dir -> dir2/rule-dir - // ├─ rule1.yaml - // ├─ rule2.yaml - // └─ rule3.yaml -> rule3-source.yaml - // dir2 - // ├─ rule-dir - // │ └─ rule4.yaml (*) - // └─ rule3-source.yaml - testutil.Ok(t, os.Rename(tempRule4File, path.Join(dir2, "rule-dir", "rule4.yaml"))) + // Catch up if reloader is step(s) ahead. + for skipped := rel - reloadsSeen - 1; skipped > 0; skipped-- { + stepFunc(rel - skipped) } + stepFunc(rel) + + init = true + reloadsSeen = rel + if rel > 4 { // All good. return diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 39d78b257f..3af0c179ec 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -92,6 +92,12 @@ const ( // Labels for metrics. labelEncode = "encode" labelDecode = "decode" + + minBlockSyncConcurrency = 1 +) + +var ( + errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") ) type bucketStoreMetrics struct { @@ -298,6 +304,13 @@ type BucketStore struct { enableSeriesResponseHints bool } +func (b *BucketStore) validate() error { + if b.blockSyncConcurrency < minBlockSyncConcurrency { + return errBlockSyncConcurrencyNotValid + } + return nil +} + type noopCache struct{} func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {} @@ -407,6 +420,10 @@ func NewBucketStore( s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too + if err := s.validate(); err != nil { + return nil, errors.Wrap(err, "validate config") + } + if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e635ab22c1..adc5701740 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -556,6 +556,32 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { } } +func TestBucketStoreConfig_validate(t *testing.T) { + tests := map[string]struct { + config *BucketStore + expected error + }{ + "should pass on valid config": { + config: &BucketStore{ + blockSyncConcurrency: 1, + }, + expected: nil, + }, + "should fail on blockSyncConcurrency < 1": { + config: &BucketStore{ + blockSyncConcurrency: 0, + }, + expected: errBlockSyncConcurrencyNotValid, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + testutil.Equals(t, testData.expected, testData.config.validate()) + }) + } +} + func TestBucketStore_Info(t *testing.T) { defer testutil.TolerantVerifyLeak(t)