diff --git a/CHANGELOG.md b/CHANGELOG.md index cab5a36852..56f8eb823e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,14 +10,22 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking* word for marking changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased + ### Fixed +- []() Compactor, Store: Significantly improved synchronization process of meta files: + * Added metrics for synchronization process for Store Gateway. + * Removed redundant `thanos_compact_sync_meta_total` `thanos_compact_sync_meta_failures_total`, `thanos_compact_sync_meta_duration_seconds` metrics. All is now available under `thanos_blocks_meta_*` metrics. + * Compactor: Mitigated potential data loss in case of wrongly assumed partial upload for very long uploads or uploads of old blocks. + * Store: Don't fail on malformed disk cache for meta.json files. + * Store: Added more debug logs for startup process. - [#1856](https://github.com/thanos-io/thanos/pull/1856) Receive: close DBReadOnly after flushing to fix a memory leak. - [#1882](https://github.com/thanos-io/thanos/pull/1882) Receive: upload to object storage as 'receive' rather than 'sidecar'. - [#1907](https://github.com/thanos-io/thanos/pull/1907) Store: Fixed the duration unit for the metric `thanos_bucket_store_series_gate_duration_seconds`. - [#1931](https://github.com/thanos-io/thanos/pull/1931) Compact: Fixed the compactor successfully exiting when actually an error occurred while compacting a blocks group. ### Added + - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 157706ef3a..8e398dac36 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -283,7 +283,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name ctx, cancel := context.WithTimeout(context.Background(), *timeout) defer cancel() - // Getting Metas. + // Getting MetasFetcher. var blockMetas []*metadata.Meta if err = bkt.Iter(ctx, "", func(name string) error { id, ok := block.IsBlockDir(name) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8d9c4e9222..66b159dd93 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" + "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +24,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -31,6 +33,15 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +const ( + metricIndexGenerateName = "thanos_compact_generated_index_total" + metricIndexGenerateHelp = "Total number of generated indexes." + + // partialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned. + // Keep it long as it is based on block creation time not upload start time. + partialUploadThresholdAge = time.Duration(2 * 24 * time.Hour) +) + var ( compactions = compactionSet{ 1 * time.Hour, @@ -85,7 +96,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", partialUploadThresholdAge)). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) @@ -162,21 +173,28 @@ func runCompact( ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", - Help: "Set to 1 if the compactor halted due to an unexpected error", + Help: "Set to 1 if the compactor halted due to an unexpected error.", }) + halted.Set(0) retried := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compactor_retries_total", - Help: "Total number of retries after retriable compactor error", + Help: "Total number of retries after retriable compactor error.", }) iterations := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compactor_iterations_total", - Help: "Total number of iterations that were executed successfully", + Help: "Total number of iterations that were executed successfully.", }) - halted.Set(0) - - reg.MustRegister(halted) - reg.MustRegister(retried) - reg.MustRegister(iterations) + consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_consistency_delay_seconds", + Help: "Configured consistency delay in seconds.", + }, func() float64 { + return consistencyDelay.Seconds() + }) + partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total", + Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.", + }) + reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts) downsampleMetrics := newDownsampleMetrics(reg) @@ -225,8 +243,12 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, - blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig) + metaFetcher := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), + block.NewLabelShardedMetaFilter(relabelConfig).Filter, + (&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter, + ) + + sy, err := compact.NewSyncer(logger, reg, metaFetcher, bkt, blockSyncConcurrency, acceptMalformedIndex, false) if err != nil { return errors.Wrap(err, "create syncer") } @@ -276,13 +298,11 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } - f := func() error { + compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction failed") } - level.Info(logger).Log("msg", "compaction iterations done") - // TODO(bplotka): Remove "disableDownsampling" once https://github.com/thanos-io/thanos/issues/297 is fixed. if !disableDownsampling { // After all compactions are done, work down the downsampling backlog. // We run two passes of this to ensure that the 1h downsampling is generated @@ -306,6 +326,8 @@ func runCompact( if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution); err != nil { return errors.Wrap(err, fmt.Sprintf("retention failed")) } + + bestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts) return nil } @@ -320,12 +342,12 @@ func runCompact( } if !wait { - return f() + return compactMainFn() } // --wait=true is specified. return runutil.Repeat(5*time.Minute, ctx.Done(), func() error { - err := f() + err := compactMainFn() if err == nil { iterations.Inc() return nil @@ -363,10 +385,53 @@ func runCompact( return nil } -const ( - metricIndexGenerateName = "thanos_compact_generated_index_total" - metricIndexGenerateHelp = "Total number of generated indexes." -) +type consistencyDelayMetaFilter struct { + logger log.Logger + consistencyDelay time.Duration +} + +func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) { + for id, meta := range metas { + if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { + + level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) + synced.WithLabelValues(block.TooFreshMeta).Inc() + delete(metas, id) + } + } +} + +func bestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter) { + level.Info(logger).Log("msg", "started cleaning of aborted partial uploads") + _, partial, err := fetcher.Fetch(ctx) + if err != nil { + level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err) + } + + // Delete partial blocks that are older than partialUploadThresholdAge. + // TODO(bwplotka): This is can cause data loss if blocks are: + // * being uploaded longer than partialUploadThresholdAge + // * being uploaded and started after their partialUploadThresholdAge + // can be assumed in this case. Keep partialUploadThresholdAge long for now. + // Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time). + for id := range partial { + if ulid.Now()-id.Time() <= uint64(partialUploadThresholdAge/time.Millisecond) { + // Minimum delay has not expired, ignore for now. + continue + } + + deleteAttempts.Inc() + if err := block.Delete(ctx, logger, bkt, id); err != nil { + level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", partialUploadThresholdAge, "err", err) + return + } + level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", partialUploadThresholdAge) + } + level.Info(logger).Log("msg", "cleaning of aborted partial uploads done") +} // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error { diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 1ec81d87b6..a0ab3a0ffb 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -26,6 +26,8 @@ import ( yaml "gopkg.in/yaml.v2" ) +const syncerConcurrency = 32 + // registerStore registers a store command. func registerStore(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.") @@ -47,7 +49,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { Default("2GB").Bytes() maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", - "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() @@ -57,7 +59,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").Duration() - blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). + blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). Default("20").Int() minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). @@ -128,7 +130,7 @@ func runStore( indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, maxSampleCount uint64, - maxConcurrent int, + maxConcurrency int, component component.Component, verbose bool, syncInterval time.Duration, @@ -210,7 +212,7 @@ func runStore( indexCache, chunkPoolSizeBytes, maxSampleCount, - maxConcurrent, + maxConcurrency, verbose, blockSyncConcurrency, filterConf, diff --git a/pkg/block/block.go b/pkg/block/block.go index 6fb43505cc..abe5f85b85 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -16,7 +16,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 51f07d13fe..18f71c982e 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -20,8 +20,6 @@ import ( "github.com/oklog/ulid" ) -// NOTE(bplotka): For block packages we cannot use testutil, because they import block package. Consider moving simple -// testutil methods to separate package. func TestIsBlockDir(t *testing.T) { for _, tc := range []struct { input string @@ -58,10 +56,7 @@ func TestIsBlockDir(t *testing.T) { } { t.Run(tc.input, func(t *testing.T) { id, ok := IsBlockDir(tc.input) - if ok != tc.bdir { - t.Errorf("expected block dir != %v", tc.bdir) - t.FailNow() - } + testutil.Equals(t, tc.bdir, ok) if id.Compare(tc.id) != 0 { t.Errorf("expected %s got %s", tc.id, id) diff --git a/pkg/block/syncer.go b/pkg/block/syncer.go new file mode 100644 index 0000000000..ecd4c0a1f8 --- /dev/null +++ b/pkg/block/syncer.go @@ -0,0 +1,375 @@ +package block + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + tsdberrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +type syncMetrics struct { + syncs prometheus.Counter + syncFailures prometheus.Counter + syncDuration prometheus.Histogram + + synced *extprom.TxGaugeVec +} + +const ( + syncMetricSubSys = "blocks_meta" + + corruptedMeta = "corrupted-meta-json" + noMeta = "no-meta-json" + loadedMeta = "loaded" + failedMeta = "failed" + + // Filter's label values. + labelExcludedMeta = "label-excluded" + timeExcludedMeta = "time-excluded" + TooFreshMeta = "too-fresh" +) + +func newSyncMetrics(r prometheus.Registerer) *syncMetrics { + var m syncMetrics + + m.syncs = prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: syncMetricSubSys, + Name: "syncs_total", + Help: "Total blocks metadata synchronization attempts", + }) + m.syncFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: syncMetricSubSys, + Name: "sync_failures_total", + Help: "Total blocks metadata synchronization failures", + }) + m.syncDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Subsystem: syncMetricSubSys, + Name: "sync_duration_seconds", + Help: "Duration of the blocks metadata synchronization in seconds", + Buckets: []float64{0.01, 1, 10, 100, 1000}, + }) + m.synced = extprom.NewTxGaugeVec(prometheus.GaugeOpts{ + Subsystem: syncMetricSubSys, + Name: "synced", + Help: "Number of block metadata synced", + }, []string{"state"}, []string{corruptedMeta, noMeta, loadedMeta, TooFreshMeta, failedMeta, labelExcludedMeta, timeExcludedMeta}) + if r != nil { + r.MustRegister( + m.syncs, + m.syncFailures, + m.syncDuration, + m.synced, + ) + } + return &m +} + +type MetadataFetcher interface { + Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) +} + +type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) + +// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. +type MetaFetcher struct { + logger log.Logger + concurrency int + bkt objstore.BucketReader + + // Optional local directory to cache meta.json files. + cacheDir string + metrics *syncMetrics + + filters []MetaFetcherFilter + + cached map[ulid.ULID]*metadata.Meta +} + +// NewMetaFetcher constructs MetaFetcher. +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters ...MetaFetcherFilter) *MetaFetcher { + return &MetaFetcher{ + logger: log.With(logger, "component", "block.MetaFetcher"), + concurrency: concurrency, + bkt: bkt, + cacheDir: filepath.Join(dir, "meta-syncer"), + metrics: newSyncMetrics(r), + filters: filters, + cached: map[ulid.ULID]*metadata.Meta{}, + } +} + +var ( + ErrorSyncMetaNotFound = errors.New("meta.json not found") + ErrorSyncMetaCorrupted = errors.New("meta.json corrupted") +) + +// loadMeta returns metadata from object storage or error. +// It returns `ErrorSyncMetaNotFound` and `ErrorSyncMetaCorrupted` sentinel errors in those cases. +func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { + var ( + metaFile = path.Join(id.String(), MetaFilename) + cachedBlockDir = filepath.Join(s.cacheDir, id.String()) + ) + + // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. + // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. + // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverwrite work) + ok, err := s.bkt.Exists(ctx, metaFile) + if err != nil { + return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile) + } + if !ok { + return nil, ErrorSyncMetaNotFound + } + + m, seen := s.cached[id] + if seen { + return m, nil + } + + // Best effort load from local dir. + if s.cacheDir != "" { + m, err := metadata.Read(cachedBlockDir) + if err == nil { + return m, nil + } + + level.Warn(s.logger).Log("msg", "best effort read of the local meta.json failed; removing cached block dir", "dir", cachedBlockDir, "err", err) + if err := os.RemoveAll(cachedBlockDir); err != nil { + level.Warn(s.logger).Log("msg", "best effort remove of cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + + level.Debug(s.logger).Log("msg", "download meta", "name", metaFile) + + r, err := s.bkt.Get(ctx, metaFile) + if s.bkt.IsObjNotFoundErr(err) { + // Meta.json was deleted between bkt.Exists and here. + return nil, errors.Wrapf(ErrorSyncMetaNotFound, "%v ", err) + } + if err != nil { + return nil, errors.Wrapf(err, "get meta file: %v", metaFile) + } + + defer runutil.CloseWithLogOnErr(s.logger, r, "close bkt meta get") + + metaContent, err := ioutil.ReadAll(r) + if err != nil { + return nil, errors.Wrapf(err, "read meta file: %v", metaFile) + } + + if err := json.Unmarshal(metaContent, m); err != nil { + return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v unmarshal: %v ", metaFile, err) + } + + if m.Version != metadata.MetaVersion1 { + return nil, errors.Wrapf(err, "unexpected meta file: %s version: %d", metaFile, m.Version) + } + + // Best effort cache in local dir. + if s.cacheDir != "" { + if err := metadata.Write(s.logger, cachedBlockDir, m); err != nil { + level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + + return m, nil +} + +// Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. +// It's caller responsibility to not change the returned metadata files. Maps can be modified. +// +// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. +func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { + start := time.Now() + defer func() { + s.metrics.syncDuration.Observe(time.Since(start).Seconds()) + if err != nil { + s.metrics.syncFailures.Inc() + } + }() + s.metrics.syncs.Inc() + + var ( + wg sync.WaitGroup + ch = make(chan ulid.ULID, s.concurrency) + mtx sync.Mutex + + metaErrs tsdberrors.MultiError + ) + + s.metrics.synced.ResetTx() + + for i := 0; i < s.concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for id := range ch { + meta, err := s.loadMeta(ctx, id) + if err == nil { + mtx.Lock() + metas[id] = meta + mtx.Unlock() + continue + } + + switch err { + default: + s.metrics.synced.WithLabelValues(failedMeta).Inc() + mtx.Lock() + metaErrs.Add(err) + mtx.Unlock() + continue + case ErrorSyncMetaNotFound: + s.metrics.synced.WithLabelValues(noMeta).Inc() + case ErrorSyncMetaCorrupted: + s.metrics.synced.WithLabelValues(corruptedMeta).Inc() + } + + mtx.Lock() + partial[id] = err + mtx.Unlock() + } + }() + } + + // Workers scheduled, distribute blocks. + err = s.bkt.Iter(ctx, "", func(name string) error { + id, ok := IsBlockDir(name) + if !ok { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- id: + } + + return nil + }) + close(ch) + + wg.Wait() + if err != nil { + return nil, nil, errors.Wrap(err, "MetaFetcher: iter bucket") + } + + incompleteView := len(metaErrs) > 0 + + // Only for complete view of blocks update the cache. + if !incompleteView { + cached := make(map[ulid.ULID]*metadata.Meta, len(metas)) + for id, m := range metas { + cached[id] = m + } + s.cached = cached + + // Best effort cleanup of disk-cached metas. + if s.cacheDir != "" { + names, err := fileutil.ReadDir(s.cacheDir) + if err != nil { + level.Warn(s.logger).Log("msg", "best effort remove of not needed cached dirs failed; ignoring", "err", err) + } else { + for _, n := range names { + id, ok := IsBlockDir(n) + if !ok { + continue + } + + if _, ok := metas[id]; ok { + continue + } + + cachedBlockDir := filepath.Join(s.cacheDir, id.String()) + + // No such block loaded, remove the local dir. + if err := os.RemoveAll(cachedBlockDir); err != nil { + level.Warn(s.logger).Log("msg", "best effort remove of not needed cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + } + } + } + + for _, f := range s.filters { + // NOTE: filter can update synced metric accordingly to the reason of the exclude. + f(metas, s.metrics.synced, incompleteView) + } + + s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) + s.metrics.synced.Submit() + + if incompleteView { + return metas, partial, errors.Wrap(metaErrs, "incomplete view") + } + + level.Info(s.logger).Log("successfully fetched block metadata", "duration", time.Since(start).String(), "cached", len(s.cached), "returned", len(metas), "partial", len(partial)) + return metas, partial, nil +} + +var _ MetaFetcherFilter = TimePartitionMetaFilter{}.Filter + +// TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range. +type TimePartitionMetaFilter struct { + minTime, maxTime model.TimeOrDurationValue +} + +// NewTimePartitionMetaFilter creates TimePartitionMetaFilter. +func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *TimePartitionMetaFilter { + return &TimePartitionMetaFilter{minTime: MinTime, maxTime: MaxTime} +} + +// Filter filters out blocks that are outside of specified time range. +func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) { + for id, m := range metas { + if m.MaxTime > f.minTime.PrometheusTimestamp() && m.MinTime < f.maxTime.PrometheusTimestamp() { + continue + } + synced.WithLabelValues(timeExcludedMeta).Inc() + delete(metas, id) + } +} + +var _ MetaFetcherFilter = LabelShardedMetaFilter{}.Filter + +// LabelShardedMetaFilter is a MetaFetcher filter that filters out blocks that have no labels after relabelling. +type LabelShardedMetaFilter struct { + relabelConfig []*relabel.Config +} + +// NewLabelShardedMetaFilter creates LabelShardedMetaFilter. +func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMetaFilter { + return &LabelShardedMetaFilter{relabelConfig: relabelConfig} +} + +// Filter filters out blocks that filters blocks that have no labels after relabelling. +func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) { + for id, m := range metas { + if processedLabels := relabel.Process(labels.FromMap(m.Thanos.Labels), f.relabelConfig...); processedLabels != nil { + continue + } + synced.WithLabelValues(labelExcludedMeta).Inc() + delete(metas, id) + } +} diff --git a/pkg/block/syncer_test.go b/pkg/block/syncer_test.go new file mode 100644 index 0000000000..bd5a2ecd15 --- /dev/null +++ b/pkg/block/syncer_test.go @@ -0,0 +1 @@ +package block diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b853ff5be1..e53948de6a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -5,7 +5,6 @@ import ( "fmt" "io/ioutil" "os" - "path" "path/filepath" "sort" "sync" @@ -17,7 +16,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block" @@ -32,19 +30,17 @@ const ( ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) - - MinimumAgeForRemoval = time.Duration(30 * time.Minute) ) var blockTooFreshSentinelError = errors.New("Block too fresh") -// Syncer syncronizes block metas from a bucket into a local directory. +// Syncer synchronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. type Syncer struct { logger log.Logger reg prometheus.Registerer bkt objstore.Bucket - consistencyDelay time.Duration + fetcher block.MetadataFetcher mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta blocksMtx sync.Mutex @@ -52,13 +48,9 @@ type Syncer struct { metrics *syncerMetrics acceptMalformedIndex bool enableVerticalCompaction bool - relabelConfig []*relabel.Config } type syncerMetrics struct { - syncMetas prometheus.Counter - syncMetaFailures prometheus.Counter - syncMetaDuration prometheus.Histogram garbageCollectedBlocks prometheus.Counter garbageCollections prometheus.Counter garbageCollectionFailures prometheus.Counter @@ -73,20 +65,6 @@ type syncerMetrics struct { func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { var m syncerMetrics - m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_sync_meta_total", - Help: "Total number of sync meta operations.", - }) - m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_sync_meta_failures_total", - Help: "Total number of failed sync meta operations.", - }) - m.syncMetaDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "thanos_compact_sync_meta_duration_seconds", - Help: "Time it took to sync meta files.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - }) - m.garbageCollectedBlocks = prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_garbage_collected_blocks_total", Help: "Total number of deleted blocks by compactor.", @@ -128,9 +106,6 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { if reg != nil { reg.MustRegister( - m.syncMetas, - m.syncMetaFailures, - m.syncMetaDuration, m.garbageCollectedBlocks, m.garbageCollections, m.garbageCollectionFailures, @@ -145,22 +120,21 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { return &m } -// NewSyncer returns a new Syncer for the given Bucket and directory. +// NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool, relabelConfig []*relabel.Config) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, fetcher block.MetadataFetcher, bkt objstore.Bucket, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ logger: logger, reg: reg, - consistencyDelay: consistencyDelay, - blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, + fetcher: fetcher, + blocks: map[ulid.ULID]*metadata.Meta{}, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, - relabelConfig: relabelConfig, // The syncer offers an option to enable vertical compaction, even if it's // not currently used by Thanos, because the compactor is also used by Cortex // which needs vertical compaction. @@ -168,24 +142,6 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket }, nil } -// SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. It removes any partial blocks older than the max of -// consistencyDelay and MinimumAgeForRemoval from the bucket. -func (c *Syncer) SyncMetas(ctx context.Context) error { - c.mtx.Lock() - defer c.mtx.Unlock() - - begin := time.Now() - - err := c.syncMetas(ctx) - if err != nil { - c.metrics.syncMetaFailures.Inc() - } - c.metrics.syncMetas.Inc() - c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds()) - return err -} - // UntilNextDownsampling calculates how long it will take until the next downsampling operation. // Returns an error if there will be no downsampling. func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { @@ -202,153 +158,15 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { } } -func (c *Syncer) syncMetas(ctx context.Context) error { - var wg sync.WaitGroup - defer wg.Wait() - - metaIDsChan := make(chan ulid.ULID) - errChan := make(chan error, c.blockSyncConcurrency) - - workCtx, cancel := context.WithCancel(ctx) - defer cancel() - for i := 0; i < c.blockSyncConcurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - for id := range metaIDsChan { - // Check if we already have this block cached locally. - c.blocksMtx.Lock() - _, seen := c.blocks[id] - c.blocksMtx.Unlock() - if seen { - continue - } - - meta, err := c.downloadMeta(workCtx, id) - if err == blockTooFreshSentinelError { - continue - } - - if err != nil { - if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored { - continue - } - errChan <- err - return - } - - // Check for block labels by relabeling. - // If output is empty, the block will be dropped. - lset := labels.FromMap(meta.Thanos.Labels) - processedLabels := relabel.Process(lset, c.relabelConfig...) - if processedLabels == nil { - level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) - continue - } - - c.blocksMtx.Lock() - c.blocks[id] = meta - c.blocksMtx.Unlock() - } - }() - } - - // Read back all block metas so we can detect deleted blocks. - remote := map[ulid.ULID]struct{}{} - - err := c.bkt.Iter(ctx, "", func(name string) error { - id, ok := block.IsBlockDir(name) - if !ok { - return nil - } - - remote[id] = struct{}{} - - select { - case <-ctx.Done(): - case metaIDsChan <- id: - } +func (s *Syncer) SyncMetas(ctx context.Context) error { + s.mtx.Lock() + defer s.mtx.Unlock() - return nil - }) - close(metaIDsChan) + metas, _, err := s.fetcher.Fetch(ctx) if err != nil { - return retry(errors.Wrap(err, "retrieve bucket block metas")) - } - - wg.Wait() - close(errChan) - - if err := <-errChan; err != nil { return retry(err) } - - // Delete all local block dirs that no longer exist in the bucket. - for id := range c.blocks { - if _, ok := remote[id]; !ok { - delete(c.blocks, id) - } - } - - return nil -} - -func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { - level.Debug(c.logger).Log("msg", "download meta", "block", id) - - meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) - if err != nil { - if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { - level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) - return nil, blockTooFreshSentinelError - } - return nil, errors.Wrapf(err, "downloading meta.json for %s", id) - } - - // ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to - // avoid races when a block is only partially uploaded. This relates to all blocks, excluding: - // - repair created blocks - // - compactor created blocks - // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. - // TODO(bplotka): https://github.com/thanos-io/thanos/issues/377. - if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) && - meta.Thanos.Source != metadata.BucketRepairSource && - meta.Thanos.Source != metadata.CompactorSource && - meta.Thanos.Source != metadata.CompactorRepairSource { - - level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) - return nil, blockTooFreshSentinelError - } - - return &meta, nil -} - -// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that -// are younger than MinimumAgeForRemoval. -func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) { - metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) - if err != nil { - level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) - return false - } - if metaExists { - // Meta exists, block is not malformed. - return false - } - - if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) { - // Minimum delay has not expired, ignore for now. - return true - } - - if err := block.Delete(ctx, c.logger, c.bkt, id); err != nil { - level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) - return false - } - level.Info(c.logger).Log("msg", "deleted malformed block", "block", id) - - return true + s.blocks = metas } // GroupKey returns a unique identifier for the group the block belongs to. It considers @@ -363,27 +181,27 @@ func groupKey(res int64, lbls labels.Labels) string { // Groups returns the compaction groups for all blocks currently known to the syncer. // It creates all groups from the scratch on every call. -func (c *Syncer) Groups() (res []*Group, err error) { - c.mtx.Lock() - defer c.mtx.Unlock() +func (s *Syncer) Groups() (res []*Group, err error) { + s.mtx.Lock() + defer s.mtx.Unlock() groups := map[string]*Group{} - for _, m := range c.blocks { + for _, m := range s.blocks { g, ok := groups[GroupKey(m.Thanos)] if !ok { g, err = newGroup( - log.With(c.logger, "compactionGroup", GroupKey(m.Thanos)), - c.bkt, + log.With(s.logger, "compactionGroup", GroupKey(m.Thanos)), + s.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, - c.acceptMalformedIndex, - c.enableVerticalCompaction, - c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.garbageCollectedBlocks, + s.acceptMalformedIndex, + s.enableVerticalCompaction, + s.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.garbageCollectedBlocks, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -403,9 +221,9 @@ func (c *Syncer) Groups() (res []*Group, err error) { // GarbageCollect deletes blocks from the bucket if their data is available as part of a // block with a higher compaction level. -func (c *Syncer) GarbageCollect(ctx context.Context) error { - c.mtx.Lock() - defer c.mtx.Unlock() +func (s *Syncer) GarbageCollect(ctx context.Context) error { + s.mtx.Lock() + defer s.mtx.Unlock() begin := time.Now() @@ -413,12 +231,12 @@ func (c *Syncer) GarbageCollect(ctx context.Context) error { for _, res := range []int64{ downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2, } { - err := c.garbageCollect(ctx, res) + err := s.garbageCollect(ctx, res) if err != nil { - c.metrics.garbageCollectionFailures.Inc() + s.metrics.garbageCollectionFailures.Inc() } - c.metrics.garbageCollections.Inc() - c.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) + s.metrics.garbageCollections.Inc() + s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) if err != nil { return errors.Wrapf(err, "garbage collect resolution %d", res) @@ -427,12 +245,12 @@ func (c *Syncer) GarbageCollect(ctx context.Context) error { return nil } -func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { +func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { // Map each block to its highest priority parent. Initial blocks have themselves // in their source section, i.e. are their own parent. parents := map[ulid.ULID]ulid.ULID{} - for id, meta := range c.blocks { + for id, meta := range s.blocks { // Skip any block that has a different resolution. if meta.Thanos.Downsample.Resolution != resolution { @@ -447,7 +265,7 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { parents[sid] = id continue } - pmeta, ok := c.blocks[pid] + pmeta, ok := s.blocks[pid] if !ok { return nil, errors.Errorf("previous parent block %s not found", pid) } @@ -473,7 +291,7 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { topParents[pid] = struct{}{} } - for id, meta := range c.blocks { + for id, meta := range s.blocks { // Skip any block that has a different resolution. if meta.Thanos.Downsample.Resolution != resolution { continue @@ -487,8 +305,8 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { return ids, nil } -func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { - garbageIds, err := c.GarbageBlocks(resolution) +func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { + garbageIds, err := s.GarbageBlocks(resolution) if err != nil { return err } @@ -501,9 +319,9 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { // Spawn a new context so we always delete a block in full on shutdown. delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - level.Info(c.logger).Log("msg", "deleting outdated block", "block", id) + level.Info(s.logger).Log("msg", "deleting outdated block", "block", id) - err := block.Delete(delCtx, c.logger, c.bkt, id) + err := block.Delete(delCtx, s.logger, s.bkt, id) cancel() if err != nil { return retry(errors.Wrapf(err, "delete block %s from bucket", id)) @@ -511,8 +329,8 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { // Immediately update our in-memory state so no further call to SyncMetas is needed // after running garbage collection. - delete(c.blocks, id) - c.metrics.garbageCollectedBlocks.Inc() + delete(s.blocks, id) + s.metrics.garbageCollectedBlocks.Inc() } return nil } @@ -1161,5 +979,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { break } } + level.Info(c.logger).Log("msg", "compaction iterations done") return nil } diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 3624d27fda..8506661f88 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -82,13 +82,13 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, false, relabelConfig) testutil.Ok(t, err) - // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. - shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) + // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. + shouldDeleteID, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) testutil.Ok(t, err) var fakeChunk bytes.Buffer fakeChunk.Write([]byte{0, 1, 2, 3}) - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"), &fakeChunk)) // Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk // data but no meta. Compactor should ignore it. @@ -99,7 +99,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { testutil.Ok(t, sy.SyncMetas(ctx)) - exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001")) + exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001")) testutil.Ok(t, err) testutil.Equals(t, false, exists) diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 0bc84e11c4..531ee6efa9 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -128,7 +128,7 @@ func NewStreamedBlockWriter( }, nil } -// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to +// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks MetasFetcher to indexWrites and adds label sets to // labelsValues sets and memPostings to be written on the finalize state in the end of downsampling process. func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Meta) error { if w.finalized || w.ignoreFinalize { diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index f06b4478c0..ba4dd29857 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -177,59 +177,36 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { } type lazyOverlapChecker struct { - synced bool logger log.Logger - bucket objstore.Bucket + bkt objstore.Bucket labels func() labels.Labels metas []tsdb.BlockMeta - lookupMetas map[ulid.ULID]struct{} + lookupMetas map[ulid.ULID]*metadata.Meta } func newLazyOverlapChecker(logger log.Logger, bucket objstore.Bucket, labels func() labels.Labels) *lazyOverlapChecker { return &lazyOverlapChecker{ logger: logger, - bucket: bucket, + bkt: bucket, labels: labels, - - lookupMetas: map[ulid.ULID]struct{}{}, } } -func (c *lazyOverlapChecker) sync(ctx context.Context) error { - if err := c.bucket.Iter(ctx, "", func(path string) error { - id, ok := block.IsBlockDir(path) - if !ok { - return nil - } - - m, err := block.DownloadMeta(ctx, c.logger, c.bucket, id) - if err != nil { - return err - } +func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.BlockMeta) error { + if c.lookupMetas == nil { + level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bkt for check", "id", newMeta.ULID.String()) + added, _, err := block.NewMetaSyncer(c.logger, 20, c.bkt, "", nil, func(meta *metadata.Meta, err error) error { + if err != nil { + return err + } - if !labels.Equal(labels.FromMap(m.Thanos.Labels), c.labels()) { + if !labels.Equal(labels.FromMap(meta.Thanos.Labels), c.labels()) { + return nil + } return nil - } - - c.metas = append(c.metas, m.BlockMeta) - c.lookupMetas[m.ULID] = struct{}{} - return nil - - }); err != nil { - return errors.Wrap(err, "get all block meta.") - } - - c.synced = true - return nil -} - -func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.BlockMeta) error { - if !c.synced { - level.Info(c.logger).Log("msg", "gathering all existing blocks from the remote bucket for check", "id", newMeta.ULID.String()) - if err := c.sync(ctx); err != nil { - return err - } + }).Update(ctx) + c.lookupMetas = added } // TODO(bwplotka) so confusing! we need to sort it first. Add comment to TSDB code. @@ -245,16 +222,16 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo } // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded -// to the object bucket once. +// to the object bkt once. // // If uploaded. // -// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). +// It is not concurrency-safe, however it is process-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { meta, err := ReadMetaFile(s.dir) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. - // The meta file is only used to avoid unnecessary bucket.Exists call, + // The meta file is only used to avoid unnecessary bkt.Exists call, // which are properly handled by the system if their occur anyway. if !os.IsNotExist(err) { level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err) @@ -277,7 +254,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { ) // Sync non compacted blocks first. if err := s.iterBlockMetas(func(m *metadata.Meta) error { - // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket, + // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bkt, // it was generally removed by the compaction process. if _, uploaded := hasUploaded[m.ULID]; uploaded { meta.Uploaded = append(meta.Uploaded, m.ULID) @@ -290,7 +267,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return nil } - // Check against bucket if the meta file for this block exists. + // Check against bkt if the meta file for this block exists. ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) if err != nil { return errors.Wrap(err, "check exists") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ddb9edb114..92df0f0345 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" @@ -33,7 +32,6 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" - "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" @@ -190,17 +188,13 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { return &m } -// FilterConfig is a configuration, which Store uses for filtering metrics. -type FilterConfig struct { - MinTime, MaxTime model.TimeOrDurationValue -} - // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. type BucketStore struct { logger log.Logger metrics *bucketStoreMetrics bucket objstore.BucketReader + fetcher block.MetadataFetcher dir string indexCache storecache.IndexCache chunkPool *pool.BytesPool @@ -222,9 +216,6 @@ type BucketStore struct { samplesLimiter *Limiter partitioner partitioner - filterConfig *FilterConfig - relabelConfig []*relabel.Config - advLabelSets []storepb.LabelSet enableCompatibilityLabel bool } @@ -235,6 +226,7 @@ func NewBucketStore( logger log.Logger, reg prometheus.Registerer, bucket objstore.BucketReader, + fetcher block.MetadataFetcher, dir string, indexCache storecache.IndexCache, maxChunkPoolBytes uint64, @@ -242,8 +234,6 @@ func NewBucketStore( maxConcurrent int, debugLogging bool, blockSyncConcurrency int, - filterConf *FilterConfig, - relabelConfig []*relabel.Config, enableCompatibilityLabel bool, ) (*BucketStore, error) { if logger == nil { @@ -265,6 +255,7 @@ func NewBucketStore( s := &BucketStore{ logger: logger, bucket: bucket, + fetcher: fetcher, dir: dir, indexCache: indexCache, chunkPool: chunkPool, @@ -278,8 +269,6 @@ func NewBucketStore( ), samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, - filterConfig: filterConf, - relabelConfig: relabelConfig, enableCompatibilityLabel: enableCompatibilityLabel, } s.metrics = metrics @@ -310,6 +299,12 @@ func (s *BucketStore) Close() (err error) { // SyncBlocks synchronizes the stores state with the Bucket bucket. // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { + metas, _, metaFetchErr := s.fetcher.Fetch(ctx) + // For partial view allow addition at least. + if metaFetchErr != nil && metas == nil { + return metaFetchErr + } + var wg sync.WaitGroup blockc := make(chan *metadata.Meta) @@ -318,7 +313,6 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { go func() { for meta := range blockc { if err := s.addBlock(ctx, meta); err != nil { - level.Warn(s.logger).Log("msg", "loading block failed", "id", meta.ULID, "err", err) continue } } @@ -326,65 +320,33 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { }() } - allIDs := map[ulid.ULID]struct{}{} - - err := s.bucket.Iter(ctx, "", func(name string) error { - // Strip trailing slash indicating a directory. - id, err := ulid.Parse(name[:len(name)-1]) - if err != nil { - return nil - } - - bdir := path.Join(s.dir, id.String()) - meta, err := loadMeta(ctx, s.logger, s.bucket, bdir, id) - if err != nil { - return errors.Wrap(err, "load meta") - } - - inRange, err := s.isBlockInMinMaxRange(ctx, meta) - if err != nil { - level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) - return os.RemoveAll(bdir) - } - - if !inRange { - return os.RemoveAll(bdir) - } - - // Check for block labels by relabeling. - // If output is empty, the block will be dropped. - if processedLabels := relabel.Process(labels.FromMap(meta.Thanos.Labels), s.relabelConfig...); processedLabels == nil { - level.Debug(s.logger).Log("msg", "ignoring block (drop in relabeling)", "block", id) - return os.RemoveAll(bdir) - } - - allIDs[id] = struct{}{} - + for id, meta := range metas { if b := s.getBlock(id); b != nil { - return nil + continue } select { case <-ctx.Done(): case blockc <- meta: } - return nil - }) + } close(blockc) wg.Wait() - if err != nil { - return errors.Wrap(err, "iter") + if metaFetchErr != nil { + return metaFetchErr } + // Drop all blocks that are no longer present in the bucket. for id := range s.blocks { - if _, ok := allIDs[id]; ok { + if _, ok := metas[id]; ok { continue } if err := s.removeBlock(id); err != nil { - level.Warn(s.logger).Log("msg", "drop outdated block", "block", id, "err", err) + level.Warn(s.logger).Log("msg", "drop outdated block failed", "block", id, "err", err) s.metrics.blockDropFailures.Inc() } + level.Debug(s.logger).Log("msg", "dropped outdated block", "block", id) s.metrics.blockDrops.Inc() } @@ -442,19 +404,6 @@ func (s *BucketStore) numBlocks() int { return len(s.blocks) } -func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, meta *metadata.Meta) (bool, error) { - // We check for blocks in configured minTime, maxTime range. - switch { - case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): - return false, nil - - case meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp(): - return false, nil - } - - return true, nil -} - func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { s.mtx.RLock() defer s.mtx.RUnlock() @@ -463,13 +412,18 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) + start := time.Now() + level.Debug(s.logger).Log("msg", "loading new block", "block", meta.ULID) defer func() { if err != nil { s.metrics.blockLoadFailures.Inc() if err2 := os.RemoveAll(dir); err2 != nil { level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2) } + level.Warn(s.logger).Log("msg", "loading block failed", "elapsed", time.Since(start), "id", meta.ULID, "err", err) + } else { + level.Debug(s.logger).Log("msg", "loaded block", "block", "elapsed", time.Since(start), "block", meta.ULID) } }() s.metrics.blockLoads.Inc()