diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 1bad45f773..deed0ba576 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -293,7 +293,7 @@ func runCompact( // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, deleteDelay/2) duplicateBlocksFilter := block.NewDeduplicateFilter() baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7f716562ba..a771d3823a 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -251,7 +251,7 @@ func runStore( return errors.Wrap(err, "create index cache") } - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, ignoreDeletionMarksDelay) metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 7f21d4d801..40f59ae66d 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "io/ioutil" + "math/rand" "os" "path" "path/filepath" @@ -26,12 +27,13 @@ import ( "github.com/prometheus/prometheus/tsdb" tsdberrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" + "golang.org/x/sync/errgroup" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" - "golang.org/x/sync/errgroup" ) type fetcherMetrics struct { @@ -72,6 +74,9 @@ const ( // Modified label values. replicaRemovedMeta = "replica-label-removed" + + // Default value for caching deletion marks in memory. + defaultDeletionMarkCacheEntryTTL = 1 * time.Hour ) func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { @@ -130,13 +135,18 @@ type MetadataFetcher interface { } type MetadataFilter interface { - Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error + Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, deletionMarks map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, incompleteView bool) error } type MetadataModifier interface { Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, incompleteView bool) error } +type cachedDeletionMark struct { + nextCheck time.Time + mark metadata.DeletionMark +} + // BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Go-routine safe. type BaseFetcher struct { @@ -144,9 +154,14 @@ type BaseFetcher struct { concurrency int bkt objstore.InstrumentedBucketReader - // Optional local directory to cache meta.json files. + // How long to cache deletion mark cache entries. + // Note that next check time is computed as: TTL/2 + random(TTL), so this is an average TTL, not max. + deletionMarkCacheEntryTTL time.Duration + + // Optional local directory to cache meta.json and deletion mark files. cacheDir string cached map[ulid.ULID]*metadata.Meta + marks map[ulid.ULID]*cachedDeletionMark syncs prometheus.Counter g singleflight.Group } @@ -176,6 +191,8 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente Name: "base_syncs_total", Help: "Total blocks metadata synchronization attempts by base Fetcher", }), + // TODO: configurable? + deletionMarkCacheEntryTTL: defaultDeletionMarkCacheEntryTTL, }, nil } @@ -274,8 +291,84 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met return m, nil } +func (f *BaseFetcher) newCachedDeletionMark(m metadata.DeletionMark, now time.Time) *cachedDeletionMark { + ttl := f.deletionMarkCacheEntryTTL + + return &cachedDeletionMark{ + nextCheck: now.Add(ttl/2 + time.Duration(rand.Int63n(ttl.Nanoseconds()))), + mark: m, + } +} + +// loadDeletionMark returns (possibly cached) deletion mark from object storage or error. +// Result is a entry that can be stored into a cache, or error. +// Missing deletion mark is not considered to be an error, and simply returns nil. +func (f *BaseFetcher) loadDeletionMark(ctx context.Context, id ulid.ULID, now time.Time) (*cachedDeletionMark, error) { + var ( + markFile = path.Join(id.String(), metadata.DeletionMarkFilename) + cachedBlockDir = filepath.Join(f.cacheDir, id.String()) + ) + + if m := f.marks[id]; m != nil { + if now.Before(m.nextCheck) { + return m, nil + } + } + + ok, err := f.bkt.Exists(ctx, markFile) + if err != nil { + return nil, errors.Wrapf(err, "deletion mark file exists: %v", markFile) + } + if !ok { + return nil, nil + } + + if f.cacheDir != "" { + m, err := metadata.ReadDeletionMarkFromLocalDir(cachedBlockDir) + if err == nil { + return f.newCachedDeletionMark(*m, now), nil + } + + if !errors.Is(err, metadata.ErrorDeletionMarkNotFound) { + level.Warn(f.logger).Log("msg", "loading locally-cached deletion mark file failed, ignoring and deleting", "dir", cachedBlockDir, "err", err) + if err := metadata.DeleteDeletionMarkFromLocalDir(cachedBlockDir); err != nil { + level.Warn(f.logger).Log("msg", "best-effort deletion of locally cached deletion mark failed", "dir", cachedBlockDir, "err", err) + } + } + } + + m, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String()) + if err != nil { + if errors.Is(err, metadata.ErrorDeletionMarkNotFound) { + return nil, nil + } + + if errors.Is(err, metadata.ErrorUnmarshalDeletionMark) { + level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err) + // Report non-existent mark. + return nil, nil + } + + return nil, err + } + + // Best effort cache in local dir. + if f.cacheDir != "" { + if err := os.MkdirAll(cachedBlockDir, os.ModePerm); err != nil { + level.Warn(f.logger).Log("msg", "best effort mkdir of the deletion dir block dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + + if err := metadata.WriteDeletionMarkToLocalDir(f.logger, cachedBlockDir, m); err != nil { + level.Warn(f.logger).Log("msg", "best effort save of the deletion mark to local dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + + return f.newCachedDeletionMark(*m, now), nil +} + type response struct { metas map[ulid.ULID]*metadata.Meta + marks map[ulid.ULID]*cachedDeletionMark partial map[ulid.ULID]error // If metaErr > 0 it means incomplete view, so some metas, failed to be loaded. metaErrs tsdberrors.MultiError @@ -286,12 +379,30 @@ type response struct { incompleteView bool } -func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { +func (r response) metasCopy() map[ulid.ULID]*metadata.Meta { + metas := make(map[ulid.ULID]*metadata.Meta, len(r.metas)) + for id, m := range r.metas { + metas[id] = m + } + return metas +} + +// deletionMarksCopyForFilter makes a copy of deletion marks map, suitable for passing to Filter method. +func (r response) deletionMarksCopyForFilter() map[ulid.ULID]*metadata.DeletionMark { + marks := make(map[ulid.ULID]*metadata.DeletionMark, len(r.marks)) + for id, m := range r.marks { + marks[id] = &m.mark + } + return marks +} + +func (f *BaseFetcher) fetchMetadata(ctx context.Context, now time.Time) (response, error) { f.syncs.Inc() var ( resp = response{ metas: make(map[ulid.ULID]*metadata.Meta), + marks: make(map[ulid.ULID]*cachedDeletionMark), partial: make(map[ulid.ULID]error), } eg errgroup.Group @@ -301,33 +412,42 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { for i := 0; i < f.concurrency; i++ { eg.Go(func() error { for id := range ch { - meta, err := f.loadMeta(ctx, id) - if err == nil { - mtx.Lock() - resp.metas[id] = meta - mtx.Unlock() - continue + meta, metaErr := f.loadMeta(ctx, id) + + var ( + mark *cachedDeletionMark + markErr error + ) + if metaErr == nil { + mark, markErr = f.loadDeletionMark(ctx, id, now) } - switch errors.Cause(err) { - default: - mtx.Lock() - resp.metaErrs.Add(err) - mtx.Unlock() - continue - case ErrorSyncMetaNotFound: - mtx.Lock() - resp.noMetas++ - mtx.Unlock() - case ErrorSyncMetaCorrupted: + func() { mtx.Lock() - resp.corruptedMetas++ - mtx.Unlock() - } - - mtx.Lock() - resp.partial[id] = err - mtx.Unlock() + defer mtx.Unlock() + + // Handle meta result. + switch { + case metaErr == nil: + resp.metas[id] = meta + case errors.Cause(metaErr) == ErrorSyncMetaNotFound: + resp.noMetas++ + resp.partial[id] = metaErr + case errors.Cause(metaErr) == ErrorSyncMetaCorrupted: + resp.corruptedMetas++ + resp.partial[id] = metaErr + default: + resp.metaErrs.Add(metaErr) + } + + // Handle deletion mark. + if mark != nil { + resp.marks[id] = mark + } + if markErr != nil { + resp.metaErrs.Add(markErr) + } + }() } return nil }) @@ -353,7 +473,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { }) if err := eg.Wait(); err != nil { - return nil, errors.Wrap(err, "BaseFetcher: iter bucket") + return resp, errors.Wrap(err, "BaseFetcher: iter bucket") } if len(resp.metaErrs) > 0 { @@ -361,11 +481,8 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { } // Only for complete view of blocks update the cache. - cached := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) - for id, m := range resp.metas { - cached[id] = m - } - f.cached = cached + f.cached = resp.metasCopy() + f.marks = resp.marks // no need to copy, as it's not going to be modified // Best effort cleanup of disk-cached metas. if f.cacheDir != "" { @@ -379,13 +496,15 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { continue } - if _, ok := resp.metas[id]; ok { + _, metaOk := resp.metas[id] + _, markOk := resp.marks[id] + if metaOk || markOk { continue } cachedBlockDir := filepath.Join(f.cacheDir, id.String()) - // No such block loaded, remove the local dir. + // No meta or mark exists, remove the local dir. if err := os.RemoveAll(cachedBlockDir); err != nil { level.Warn(f.logger).Log("msg", "best effort remove of not needed cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) } @@ -395,7 +514,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { return resp, nil } -func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) { +func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]*metadata.DeletionMark, _ map[ulid.ULID]error, err error) { start := time.Now() defer func() { metrics.syncDuration.Observe(time.Since(start).Seconds()) @@ -410,18 +529,16 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter // TODO(bwplotka): Consider custom singleflight with ttl. v, err := f.g.Do("", func() (i interface{}, err error) { // NOTE: First go routine context will go through. - return f.fetchMetadata(ctx) + return f.fetchMetadata(ctx, time.Now()) }) if err != nil { - return nil, nil, err + return nil, nil, nil, err } resp := v.(response) // Copy as same response might be reused by different goroutines. - metas := make(map[ulid.ULID]*metadata.Meta, len(resp.metas)) - for id, m := range resp.metas { - metas[id] = m - } + metas := resp.metasCopy() + marks := resp.deletionMarksCopyForFilter() metrics.synced.WithLabelValues(failedMeta).Set(float64(len(resp.metaErrs))) metrics.synced.WithLabelValues(noMeta).Set(resp.noMetas) @@ -429,15 +546,15 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter for _, filter := range filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := filter.Filter(ctx, metas, metrics.synced, resp.incompleteView); err != nil { - return nil, nil, errors.Wrap(err, "filter metas") + if err := filter.Filter(ctx, metas, marks, metrics.synced, resp.incompleteView); err != nil { + return nil, nil, nil, errors.Wrap(err, "filter metas") } } for _, m := range modifiers { // NOTE: modifier can update modified metric accordingly to the reason of the modification. if err := m.Modify(ctx, metas, metrics.modified, resp.incompleteView); err != nil { - return nil, nil, errors.Wrap(err, "modify metas") + return nil, nil, nil, errors.Wrap(err, "modify metas") } } @@ -445,11 +562,11 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter metrics.submit() if len(resp.metaErrs) > 0 { - return metas, resp.partial, errors.Wrap(resp.metaErrs, "incomplete view") + return metas, marks, resp.partial, errors.Wrap(resp.metaErrs, "incomplete view") } level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) - return metas, resp.partial, nil + return metas, marks, resp.partial, nil } type MetaFetcher struct { @@ -469,7 +586,13 @@ type MetaFetcher struct { // // Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { - metas, partial, err = f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers) + metas, _, partial, err = f.fetchWithDeletionMarks(ctx) + return metas, partial, err +} + +// This method returns all block metas, deletion marks, and partial blocks (without meta or with corrupted meta file). +func (f *MetaFetcher) fetchWithDeletionMarks(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, marks map[ulid.ULID]*metadata.DeletionMark, partial map[ulid.ULID]error, err error) { + metas, marks, partial, err = f.wrapped.fetch(ctx, f.metrics, f.filters, f.modifiers) if f.listener != nil { blocks := make([]metadata.Meta, 0, len(metas)) for _, meta := range metas { @@ -477,7 +600,7 @@ func (f *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. } f.listener(blocks, err) } - return metas, partial, err + return metas, marks, partial, err } // UpdateOnChange allows to add listener that will be update on every change. @@ -499,7 +622,7 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim } // Filter filters out blocks that are outside of specified time range. -func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { +func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, _ bool) error { for id, m := range metas { if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { continue @@ -527,7 +650,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet const blockIDLabel = "__block_id" // Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. -func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { +func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, _ bool) error { var lbls labels.Labels for id, m := range metas { lbls = lbls[:0] @@ -559,7 +682,7 @@ func NewDeduplicateFilter() *DeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { +func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, _ bool) error { var wg sync.WaitGroup metasByResolution := make(map[int64][]*metadata.Meta) @@ -716,7 +839,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura } // Filter filters out blocks that filters blocks that have are created before a specified consistency delay. -func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { +func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, _ bool) error { for id, meta := range metas { // TODO(khyatisoneji): Remove the checks about Thanos Source // by implementing delete delay to fetch metas. @@ -742,15 +865,13 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL type IgnoreDeletionMarkFilter struct { logger log.Logger delay time.Duration - bkt objstore.InstrumentedBucketReader deletionMarkMap map[ulid.ULID]*metadata.DeletionMark } // NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter. -func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration) *IgnoreDeletionMarkFilter { +func NewIgnoreDeletionMarkFilter(logger log.Logger, delay time.Duration) *IgnoreDeletionMarkFilter { return &IgnoreDeletionMarkFilter{ logger: logger, - bkt: bkt, delay: delay, } } @@ -762,21 +883,14 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata. // Filter filters out blocks that are marked for deletion after a given delay. // It also returns the blocks that can be deleted since they were uploaded delay duration before current time. -func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { +func (f *IgnoreDeletionMarkFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, deletionMarks map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, _ bool) error { f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) for id := range metas { - deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String()) - if err == metadata.ErrorDeletionMarkNotFound { + deletionMark := deletionMarks[id] + if deletionMark == nil { continue } - if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark { - level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err) - continue - } - if err != nil { - return err - } f.deletionMarkMap[id] = deletionMark if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() { synced.WithLabelValues(markedForDeletionMeta).Inc() diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 094c5a6147..e6b16daf61 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -45,7 +45,7 @@ type ulidFilter struct { ulidToDelete *ulid.ULID } -func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error { +func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]*metadata.DeletionMark, synced *extprom.TxGaugeVec, incompleteView bool) error { if _, ok := metas[*f.ulidToDelete]; ok { synced.WithLabelValues("filtered").Inc() delete(metas, *f.ulidToDelete) @@ -89,6 +89,7 @@ func TestMetaFetcher_Fetch(t *testing.T) { expectedMetas []ulid.ULID expectedCorruptedMeta []ulid.ULID expectedNoMeta []ulid.ULID + expectedMarks []ulid.ULID expectedFiltered int expectedMetaErr error }{ @@ -235,12 +236,46 @@ func TestMetaFetcher_Fetch(t *testing.T) { expectedNoMeta: ULIDs(4), expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), }, + { + name: "delete markers", + do: func() { + markBlock := func(id ulid.ULID, deletionTime time.Time) { + buf := bytes.Buffer{} + + m := &metadata.DeletionMark{ + ID: id, + DeletionTime: deletionTime.Unix(), + Version: 1, + } + + testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), &buf)) + } + + markBlock(ULID(1), time.Now()) + markBlock(ULID(2), time.Now()) // No meta, deletion marker will not be reported. + markBlock(ULID(5), time.Now()) // Corrupted meta, deletion marker will not be reported. + markBlock(ULID(6), time.Now()) + + // This mark will be ignored. + testutil.Ok(t, bkt.Upload(ctx, path.Join(ULID(3).String(), metadata.DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) + }, + + // from previous tests... + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4, 2), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + + // deletion marker files. + expectedMarks: ULIDs(1, 6), + }, } { if ok := t.Run(tcase.name, func(t *testing.T) { tcase.do() ulidToDelete = tcase.filterULID - metas, partial, err := fetcher.Fetch(ctx) + metas, marks, partial, err := fetcher.fetchWithDeletionMarks(ctx) if tcase.expectedMetaErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tcase.expectedMetaErr.Error(), err.Error()) @@ -254,12 +289,23 @@ func TestMetaFetcher_Fetch(t *testing.T) { testutil.Assert(t, m != nil, "meta is nil") metasSlice = append(metasSlice, id) } - sort.Slice(metasSlice, func(i, j int) bool { - return metasSlice[i].Compare(metasSlice[j]) < 0 - }) + sort.Sort(sortedULIDs(metasSlice)) testutil.Equals(t, tcase.expectedMetas, metasSlice) } + { + marksSlice := make([]ulid.ULID, 0, len(marks)) + for id, m := range marks { + testutil.Assert(t, m != nil, "mark is nil") + marksSlice = append(marksSlice, id) + } + sort.Sort(sortedULIDs(marksSlice)) + if tcase.expectedMarks == nil { + tcase.expectedMarks = ULIDs() + } + testutil.Equals(t, tcase.expectedMarks, marksSlice) + } + { partialSlice := make([]ulid.ULID, 0, len(partial)) for id := range partial { @@ -355,7 +401,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Ok(t, f.Filter(ctx, input, nil, m.synced, false)) testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) testutil.Equals(t, expected, input) @@ -453,7 +499,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { deleted := len(input) - len(expected) m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Ok(t, f.Filter(ctx, input, nil, m.synced, false)) testutil.Equals(t, expected, input) testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) @@ -517,7 +563,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Ok(t, f.Filter(ctx, input, nil, m.synced, false)) testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta))) testutil.Equals(t, expected, input) @@ -868,7 +914,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, } } - testutil.Ok(t, f.Filter(ctx, metas, m.synced, false)) + testutil.Ok(t, f.Filter(ctx, metas, nil, m.synced, false)) compareSliceWithMapKeys(t, metas, tcase.expected) testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta))) }); !ok { @@ -1026,7 +1072,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Ok(t, f.Filter(ctx, input, nil, m.synced, false)) testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) @@ -1051,61 +1097,134 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Ok(t, f.Filter(ctx, input, nil, m.synced, false)) testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) } func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) { - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() - now := time.Now() - f := &IgnoreDeletionMarkFilter{ - logger: log.NewNopLogger(), - bkt: objstore.WithNoopInstr(bkt), - delay: 48 * time.Hour, - } + now := time.Now() + f := &IgnoreDeletionMarkFilter{ + logger: log.NewNopLogger(), + delay: 48 * time.Hour, + } - shouldFetch := &metadata.DeletionMark{ + marks := map[ulid.ULID]*metadata.DeletionMark{ + // Should fetch. + ULID(1): { ID: ULID(1), DeletionTime: now.Add(-15 * time.Hour).Unix(), Version: 1, - } - - shouldIgnore := &metadata.DeletionMark{ + }, + // Should ignore. + ULID(2): { ID: ULID(2), DeletionTime: now.Add(-60 * time.Hour).Unix(), Version: 1, + }, + } + + input := map[ulid.ULID]*metadata.Meta{ + ULID(1): {}, + ULID(2): {}, + ULID(3): {}, + ULID(4): {}, + } + + expected := map[ulid.ULID]*metadata.Meta{ + ULID(1): {}, + ULID(3): {}, + ULID(4): {}, + } + + m := newTestFetcherMetrics() + testutil.Ok(t, f.Filter(ctx, input, marks, m.synced, false)) + testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta))) + testutil.Equals(t, expected, input) +} + +func TestMetaFetcher_FetchDeletionMarkerCache(t *testing.T) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + dir, err := ioutil.TempDir("", "test-meta-fetcher") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + r := prometheus.NewRegistry() + baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, objstore.WithNoopInstr(bkt), dir, r) + testutil.Ok(t, err) + + id := ULID(1) + + // Prepare files. + meta := &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + Version: 1, + ULID: id, + }, } + { + buf := bytes.Buffer{} - var buf bytes.Buffer - testutil.Ok(t, json.NewEncoder(&buf).Encode(&shouldFetch)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldFetch.ID.String(), metadata.DeletionMarkFilename), &buf)) + testutil.Ok(t, json.NewEncoder(&buf).Encode(meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + } - testutil.Ok(t, json.NewEncoder(&buf).Encode(&shouldIgnore)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnore.ID.String(), metadata.DeletionMarkFilename), &buf)) + now := time.Now() - testutil.Ok(t, bkt.Upload(ctx, path.Join(ULID(3).String(), metadata.DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) + { + resp, err := baseFetcher.fetchMetadata(ctx, now) + testutil.Ok(t, err) + testutil.Equals(t, meta, resp.metas[id]) + testutil.Equals(t, (*cachedDeletionMark)(nil), resp.marks[id]) + } - input := map[ulid.ULID]*metadata.Meta{ - ULID(1): {}, - ULID(2): {}, - ULID(3): {}, - ULID(4): {}, + // Write deletion mark. + mark := &metadata.DeletionMark{ + ID: id, + DeletionTime: time.Now().Unix(), + Version: 1, } - expected := map[ulid.ULID]*metadata.Meta{ - ULID(1): {}, - ULID(3): {}, - ULID(4): {}, + { + buf := bytes.Buffer{} + testutil.Ok(t, json.NewEncoder(&buf).Encode(&mark)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(mark.ID.String(), metadata.DeletionMarkFilename), &buf)) } - m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) - testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta))) - testutil.Equals(t, expected, input) + resp, err := baseFetcher.fetchMetadata(ctx, now) + testutil.Ok(t, err) + testutil.Equals(t, meta, resp.metas[id]) + testutil.Equals(t, *mark, resp.marks[id].mark) + + // Delete marker from bucket, and try to fetch metadata again, with the same timestamp. + // It should hit the cache, and keep using deletion marker. + testutil.Ok(t, bkt.Delete(ctx, path.Join(id.String(), metadata.DeletionMarkFilename))) + + { + resp2, err := baseFetcher.fetchMetadata(ctx, now) + testutil.Ok(t, err) + testutil.Equals(t, resp.marks[id], resp2.marks[id]) + } + + // Try again, with time in the future -- mark is no longer available. + now = now.Add(3 * defaultDeletionMarkCacheEntryTTL) + { + resp3, err := baseFetcher.fetchMetadata(ctx, now) + testutil.Ok(t, err) + testutil.Equals(t, (*cachedDeletionMark)(nil), resp3.marks[id]) + } }) } + +type sortedULIDs []ulid.ULID + +func (s sortedULIDs) Len() int { return len(s) } +func (s sortedULIDs) Less(i, j int) bool { return s[i].Compare(s[j]) < 0 } +func (s sortedULIDs) Swap(i, j int) { s[i], s[j] = s[j], s[i] } diff --git a/pkg/block/metadata/deletionmark.go b/pkg/block/metadata/deletionmark.go index 5f2a9f04ad..def5d6b5f9 100644 --- a/pkg/block/metadata/deletionmark.go +++ b/pkg/block/metadata/deletionmark.go @@ -7,11 +7,14 @@ import ( "context" "encoding/json" "io/ioutil" + "os" "path" + "path/filepath" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -63,6 +66,10 @@ func ReadDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucketReader return nil, errors.Wrapf(err, "read file: %s", deletionMarkFile) } + return unmarshalDeletionMark(metaContent, deletionMarkFile) +} + +func unmarshalDeletionMark(metaContent []byte, deletionMarkFile string) (*DeletionMark, error) { deletionMark := DeletionMark{} if err := json.Unmarshal(metaContent, &deletionMark); err != nil { return nil, errors.Wrapf(ErrorUnmarshalDeletionMark, "file: %s; err: %v", deletionMarkFile, err.Error()) @@ -74,3 +81,39 @@ func ReadDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucketReader return &deletionMark, nil } + +func WriteDeletionMarkToLocalDir(logger log.Logger, dir string, mark *DeletionMark) error { + data, err := json.Marshal(mark) + if err != nil { + return errors.Wrap(err, "json encode deletion mark") + } + + p := filepath.Join(dir, DeletionMarkFilename) + tmp := p + ".tmp" + + err = ioutil.WriteFile(tmp, data, 0666) + if err != nil { + return err + } + return renameFile(logger, tmp, p) +} + +// ReadDeletionMarkFromLocalDir from /deletion-mark.json in the local filesystem. +// Returns ErrorDeletionMarkNotFound if file doesn't exist, ErrorUnmarshalDeletionMark if file is corrupted. +func ReadDeletionMarkFromLocalDir(dir string) (*DeletionMark, error) { + deletionMarkFile := filepath.Join(dir, DeletionMarkFilename) + + b, err := ioutil.ReadFile(deletionMarkFile) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, ErrorDeletionMarkNotFound + } + return nil, errors.Wrapf(err, "read file: %s", deletionMarkFile) + } + + return unmarshalDeletionMark(b, deletionMarkFile) +} + +func DeleteDeletionMarkFromLocalDir(dir string) error { + return os.Remove(filepath.Join(dir, DeletionMarkFilename)) +} diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index a6650fec0f..47343466ba 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -96,7 +96,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, 48*time.Hour) sy, err := NewSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, 1, false, false) testutil.Ok(t, err) @@ -176,7 +176,7 @@ func TestGroup_Compact_e2e(t *testing.T) { reg := prometheus.NewRegistry() - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, 48*time.Hour) duplicateBlocksFilter := block.NewDeduplicateFilter() metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter,