From b77b8ee04f99e543d0e4fa4e5c0f23468b3e149a Mon Sep 17 00:00:00 2001 From: Matej Gera <38492574+matej-g@users.noreply.github.com> Date: Tue, 21 Sep 2021 13:22:53 +0200 Subject: [PATCH] Fetcher: Fix data races (#4663) * Fix data race for cached map Signed-off-by: Matej Gera * Fix data race for ReplicaLabelRemover Signed-off-by: Matej Gera * Fix data race for IgnoreDeletionMarkFilter Signed-off-by: Matej Gera * Update CHANGELOG.md Signed-off-by: Matej Gera * Newline Signed-off-by: Matej Gera * Improve deletionMarkMap Signed-off-by: Matej Gera --- CHANGELOG.md | 5 ++++ pkg/block/fetcher.go | 55 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 18445979a5..9f071c41be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased +### Fixed + +- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races + ### Added + - [#4680](https://github.com/thanos-io/thanos/pull/4680) Query: add `exemplar.partial-response` flag to control partial response. ## v0.23.0 - In Progress diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index d4cbfc0bd2..f9f45202d9 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -162,9 +162,11 @@ type BaseFetcher struct { // Optional local directory to cache meta.json files. cacheDir string - cached map[ulid.ULID]*metadata.Meta syncs prometheus.Counter g singleflight.Group + + mtx sync.Mutex + cached map[ulid.ULID]*metadata.Meta } // NewBaseFetcher constructs BaseFetcher. @@ -386,7 +388,10 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { for id, m := range resp.metas { cached[id] = m } + + f.mtx.Lock() f.cached = cached + f.mtx.Unlock() // Best effort cleanup of disk-cached metas. if f.cacheDir != "" { @@ -473,10 +478,17 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view") } - level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) + level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", f.countCached(), "returned", len(metas), "partial", len(resp.partial)) return metas, resp.partial, nil } +func (f *BaseFetcher) countCached() int { + f.mtx.Lock() + defer f.mtx.Unlock() + + return len(f.cached) +} + type MetaFetcher struct { wrapped *BaseFetcher metrics *FetcherMetrics @@ -711,7 +723,11 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met } for u, meta := range metas { - l := meta.Thanos.Labels + l := make(map[string]string) + for n, v := range meta.Thanos.Labels { + l[n] = v + } + for _, replicaLabel := range r.replicaLabels { if _, exists := l[replicaLabel]; exists { level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel) @@ -723,7 +739,10 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met level.Warn(r.logger).Log("msg", "block has no labels left, creating one", r.replicaLabels[0], "deduped") l[r.replicaLabels[0]] = "deduped" } - metas[u].Thanos.Labels = l + + nm := *meta + nm.Thanos.Labels = l + metas[u] = &nm } return nil } @@ -778,10 +797,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL // Delay is not considered when computing DeletionMarkBlocks map. // Not go-routine safe. type IgnoreDeletionMarkFilter struct { - logger log.Logger - delay time.Duration - concurrency int - bkt objstore.InstrumentedBucketReader + logger log.Logger + delay time.Duration + concurrency int + bkt objstore.InstrumentedBucketReader + + mtx sync.Mutex deletionMarkMap map[ulid.ULID]*metadata.DeletionMark } @@ -797,13 +818,21 @@ func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBuc // DeletionMarkBlocks returns block ids that were marked for deletion. func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark { - return f.deletionMarkMap + f.mtx.Lock() + defer f.mtx.Unlock() + + deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark, len(f.deletionMarkMap)) + for id, meta := range f.deletionMarkMap { + deletionMarkMap[id] = meta + } + + return deletionMarkMap } // Filter filters out blocks that are marked for deletion after a given delay. // It also returns the blocks that can be deleted since they were uploaded delay duration before current time. func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { - f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) + deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark) // Make a copy of block IDs to check, in order to avoid concurrency issues // between the scheduler and workers. @@ -839,7 +868,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL // Keep track of the blocks marked for deletion and filter them out if their // deletion time is greater than the configured delay. mtx.Lock() - f.deletionMarkMap[id] = m + deletionMarkMap[id] = m if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() { synced.WithLabelValues(MarkedForDeletionMeta).Inc() delete(metas, id) @@ -871,6 +900,10 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL return errors.Wrap(err, "filter blocks marked for deletion") } + f.mtx.Lock() + f.deletionMarkMap = deletionMarkMap + f.mtx.Unlock() + return nil }