diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e121e9d408..ee23a69124 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -30,7 +30,6 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/fileutil" @@ -468,45 +467,20 @@ func (s *BucketStore) blockSeries( matchers []labels.Matcher, req *storepb.SeriesRequest, ) (storepb.SeriesSet, *queryStats, error) { - stats := &queryStats{} - - // The postings to preload are registered within the call to PostingsForMatchers, - // when it invokes indexr.Postings for each underlying postings list. - // They are ready to use ONLY after preloadPostings was called successfully. - lazyPostings, err := tsdb.PostingsForMatchers(indexr, matchers...) + ps, err := indexr.ExpandedPostings(matchers) if err != nil { - return nil, stats, errors.Wrap(err, "get postings for matchers") - } - // If the tree was reduced to the empty postings list, don't preload the registered - // leaf postings and return early with an empty result. - if lazyPostings == index.EmptyPostings() { - return storepb.EmptySeriesSet(), stats, nil + return nil, nil, errors.Wrap(err, "expanded matching posting") } - if err := indexr.preloadPostings(); err != nil { - return nil, stats, errors.Wrap(err, "preload postings") - } - - // Get result postings list by resolving the postings tree. - // TODO(bwplotka): Users are seeing panics here, because of lazyPosting being not loaded by preloadPostings. - ps, err := index.ExpandPostings(lazyPostings) - if err != nil { - return nil, stats, errors.Wrap(err, "expand postings") - } - - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - // We do it right at the beginning as it's easier than doing it more fine-grained - // at the loading level. - if indexr.block.indexVersion >= 2 { - for i, id := range ps { - ps[i] = id * 16 - } + if len(ps) == 0 { + return storepb.EmptySeriesSet(), indexr.stats, nil } - // Preload all series index data - if err := indexr.preloadSeries(ps); err != nil { - return nil, stats, errors.Wrap(err, "preload series") + // Preload all series index data. + // TODO(bwplotka): Consider not keeping all series in memory all the time. + // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. + if err := indexr.PreloadSeries(ps); err != nil { + return nil, nil, errors.Wrap(err, "preload series") } // Transform all series into the response types and mark their relevant chunks @@ -517,8 +491,8 @@ func (s *BucketStore) blockSeries( chks []chunks.Meta ) for _, id := range ps { - if err := indexr.Series(id, &lset, &chks); err != nil { - return nil, stats, errors.Wrap(err, "read series") + if err := indexr.LoadedSeries(id, &lset, &chks); err != nil { + return nil, nil, errors.Wrap(err, "read series") } s := seriesEntry{ lset: make([]storepb.Label, 0, len(lset)), @@ -555,7 +529,7 @@ func (s *BucketStore) blockSeries( } if err := chunkr.addPreload(meta.Ref); err != nil { - return nil, stats, errors.Wrap(err, "add chunk preload") + return nil, nil, errors.Wrap(err, "add chunk preload") } s.chks = append(s.chks, storepb.AggrChunk{ MinTime: meta.MinTime, @@ -570,7 +544,7 @@ func (s *BucketStore) blockSeries( // Preload all chunks that were marked in the previous stage. if err := chunkr.preload(); err != nil { - return nil, stats, errors.Wrap(err, "preload chunks") + return nil, nil, errors.Wrap(err, "preload chunks") } // Transform all chunks into the response format. @@ -578,18 +552,15 @@ func (s *BucketStore) blockSeries( for i, ref := range s.refs { chk, err := chunkr.Chunk(ref) if err != nil { - return nil, stats, errors.Wrap(err, "get chunk") + return nil, nil, errors.Wrap(err, "get chunk") } if err := populateChunk(&s.chks[i], chk, req.Aggregates); err != nil { - return nil, stats, errors.Wrap(err, "populate chunk") + return nil, nil, errors.Wrap(err, "populate chunk") } } } - stats = stats.merge(indexr.stats) - stats = stats.merge(chunkr.stats) - - return newBucketSeriesSet(res), stats, nil + return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error { @@ -839,19 +810,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR g.Go(func() error { defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") - tpls, err := indexr.LabelValues(req.Label) - if err != nil { - return errors.Wrap(err, "lookup label values") - } - res := make([]string, 0, tpls.Len()) - - for i := 0; i < tpls.Len(); i++ { - e, err := tpls.At(i) - if err != nil { - return errors.Wrap(err, "get string tuple entry") - } - res = append(res, e[0]) - } + res := indexr.LabelValues(req.Label) mtx.Lock() sets = append(sets, res) @@ -1161,6 +1120,8 @@ func (b *bucketBlock) Close() error { return nil } +// bucketIndexReader is a custom index reader (not conforming index.Reader interface) that gets postings +// by type bucketIndexReader struct { logger log.Logger ctx context.Context @@ -1169,9 +1130,8 @@ type bucketIndexReader struct { stats *queryStats cache *indexCache - mtx sync.Mutex - loadedPostings []*lazyPostings - loadedSeries map[uint64][]byte + mtx sync.Mutex + loadedSeries map[uint64][]byte } func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *indexCache) *bucketIndexReader { @@ -1188,89 +1148,202 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB return r } -func (r *bucketIndexReader) preloadPostings() error { - const maxGapSize = 512 * 1024 +func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) { + s, ok := r.block.symbols[o] + if !ok { + return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o) + } + return s, nil +} - ps := r.loadedPostings +// Postings returns postings in expanded list instead of index.Postings. +// This is because we need to have them buffered anyway to perform efficient lookup +// on object storage. +// Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during +// background garbage collections. +// +// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first +// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by +// single label name=value. +func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, error) { + var postingsToIntersect []index.Postings + + // NOTE: Derived from tsdb.PostingsForMatchers. + for _, m := range ms { + matching, err := matchingLabels(r.LabelValues, m) + if err != nil { + return nil, errors.Wrap(err, "match labels") + } + if len(matching) == 0 { + continue + } - sort.Slice(ps, func(i, j int) bool { - return ps[i].ptr.Start < ps[j].ptr.Start - }) - parts := partitionRanges(len(ps), func(i int) (start, end uint64) { - return uint64(ps[i].ptr.Start), uint64(ps[i].ptr.End) - }, maxGapSize) - var g run.Group + // We need to load all matching postings to tell what postings are intersecting with what. + postings, err := r.fetchPostings(matching) + if err != nil { + return nil, errors.Wrap(err, "get postings") + } - for _, p := range parts { - ctx, cancel := context.WithCancel(r.ctx) - i, j := p[0], p[1] + postingsToIntersect = append(postingsToIntersect, postings) + } - g.Add(func() error { - return r.loadPostings(ctx, ps[i:j], ps[i].ptr.Start, ps[j-1].ptr.End) - }, func(err error) { - if err != nil { - cancel() - } - }) + if len(postingsToIntersect) == 0 { + return nil, nil } - if err := g.Run(); err != nil { - return err + ps, err := index.ExpandPostings(index.Intersect(postingsToIntersect...)) + if err != nil { + return nil, errors.Wrap(err, "expand") } - // TODO(bwplotka): Users are seeing lazyPostings not fully loaded. Double checking it here and printing more info - // on failure case. - for _, postings := range ps { - if postings.Postings != nil { - continue + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + if r.block.indexVersion >= 2 { + for i, id := range ps { + ps[i] = id * 16 } + } - msg := fmt.Sprintf("found parts: %v bases on:", parts) - for _, p := range ps { - msg += fmt.Sprintf(" [start: %v; end: %v]", p.ptr.Start, p.ptr.End) - } + return ps, nil +} - return errors.Errorf("expected all postings to be loaded but spotted malformed one with key: %v; start: %v; end: %v. Additional info: %s", - postings.key, postings.ptr.Start, postings.ptr.End, msg) +// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. +func matchingLabels(lvalsFn func(name string) []string, m labels.Matcher) (labels.Labels, error) { + // If the matcher selects an empty value, it selects all the series which don't + // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 + // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + if m.Matches("") { + // We don't support tsdb.postingsForUnsetLabelMatcher. + // This is because it requires fetching all postings for index. + // This requires additional logic to avoid fetching big bytes range (todo: how big?). See https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + // to what it blocks. + return nil, errors.Errorf("support for <> != matcher is not implemented; empty matcher for label name %s", m.Name()) } - return nil + // Fast-path for equal matching. + if em, ok := m.(*labels.EqualMatcher); ok { + return labels.Labels{{Name: em.Name(), Value: em.Value()}}, nil + } + + var matchingLabels labels.Labels + for _, val := range lvalsFn(m.Name()) { + if m.Matches(val) { + matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val}) + } + } + + return matchingLabels, nil } -// loadPostings loads given postings using given start + length. It is expected to have given postings data within given range. -func (r *bucketIndexReader) loadPostings(ctx context.Context, postings []*lazyPostings, start, end int64) error { - begin := time.Now() +type postingPtr struct { + key labels.Label + ptr index.Range +} - b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) - if err != nil { - return errors.Wrap(err, "read postings range") - } +// fetchPostings returns sorted slice of postings that match the selected labels. +func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) { + const maxGapSize = 512 * 1024 - r.mtx.Lock() - defer r.mtx.Unlock() + var ( + ptrs []postingPtr + postings = make([]index.Postings, 0, len(keys)) + ) - r.stats.postingsFetchCount++ - r.stats.postingsFetched += len(postings) - r.stats.postingsFetchDurationSum += time.Since(begin) - r.stats.postingsFetchedSizeSum += int(end - start) + // TODO(bwplotka): sort postings? - for _, p := range postings { - c := b[p.ptr.Start-start : p.ptr.End-start] + for _, k := range keys { + // Get postings for given key from cache first. + if b, ok := r.cache.postings(r.block.meta.ULID, k); ok { + r.stats.postingsTouched++ + r.stats.postingsTouchedSizeSum += len(b) - _, l, err := r.dec.Postings(c) - if err != nil { - return errors.Wrap(err, "read postings list") + _, l, err := r.dec.Postings(b) + if err != nil { + return nil, errors.Wrap(err, "decode postings") + } + postings = append(postings, l) + continue } - p.set(l) - r.cache.setPostings(r.block.meta.ULID, p.key, c) - // If we just fetched it we still have to update the stats for touched postings. - r.stats.postingsTouched++ - r.stats.postingsTouchedSizeSum += len(c) + + // Cache miss; save pointer for actual posting in index stored in object store. + ptr, ok := r.block.postings[k] + if !ok { + // Index malformed? Should not happen. + continue + } + + ptrs = append(ptrs, postingPtr{ptr: ptr, key: k}) } - return nil + + sort.Slice(ptrs, func(i, j int) bool { + return ptrs[i].ptr.Start < ptrs[j].ptr.Start + }) + + // TODO(bwplotka): Asses how large in worst case scenario this can be. (e.g fetch for AllPostingsKeys) + // Consider sub split if too big. + parts := partitionRanges(len(ptrs), func(i int) (start, end uint64) { + return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End) + }, maxGapSize) + + var g run.Group + for _, p := range parts { + ctx, cancel := context.WithCancel(r.ctx) + i, j := p.elemRng[0], p.elemRng[1] + + start := int64(p.start) + // We assume index does not have any ptrs that has 0 length. + length := int64(p.end) - start + + // Fetch from object storage concurrently and update stats and posting list. + g.Add(func() error { + begin := time.Now() + + b, err := r.block.readIndexRange(ctx, start, length) + if err != nil { + return errors.Wrap(err, "read postings range") + } + fetchTime := time.Since(begin) + + r.mtx.Lock() + defer r.mtx.Unlock() + + r.stats.postingsFetchCount++ + r.stats.postingsFetched += j - i + r.stats.postingsFetchDurationSum += fetchTime + r.stats.postingsFetchedSizeSum += int(length) + + for _, p := range ptrs[i:j] { + c := b[p.ptr.Start-start : p.ptr.End-start] + + _, fetchedPostings, err := r.dec.Postings(c) + if err != nil { + return errors.Wrap(err, "read postings list") + } + + // Return postings and fill LRU cache. + postings = append(postings, fetchedPostings) + r.cache.setPostings(r.block.meta.ULID, p.key, c) + + // If we just fetched it we still have to update the stats for touched postings. + r.stats.postingsTouched++ + r.stats.postingsTouchedSizeSum += len(c) + } + return nil + }, func(err error) { + if err != nil { + cancel() + } + }) + } + + if err := g.Run(); err != nil { + return nil, err + } + + return index.Merge(postings...), nil } -func (r *bucketIndexReader) preloadSeries(ids []uint64) error { +func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { const maxSeriesSize = 64 * 1024 const maxGapSize = 512 * 1024 @@ -1292,10 +1365,10 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error { for _, p := range parts { ctx, cancel := context.WithCancel(r.ctx) - i, j := p[0], p[1] + i, j := p.elemRng[0], p.elemRng[1] g.Add(func() error { - return r.loadSeries(ctx, ids[i:j], ids[i], ids[j-1]+maxSeriesSize) + return r.loadSeries(ctx, ids[i:j], p.start, p.end+maxSeriesSize) }, func(err error) { if err != nil { cancel() @@ -1338,90 +1411,50 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, return nil } +type part struct { + start uint64 + end uint64 + + elemRng [2]int +} + // partitionRanges partitions length entries into n <= length ranges that cover all // input ranges. // It combines entries that are separated by reasonably small gaps. -func partitionRanges(length int, rng func(int) (uint64, uint64), maxGapSize uint64) (parts [][2]int) { +// It supports overlapping ranges. +// NOTE: It expects range to be sorted by start time. +func partitionRanges(length int, rng func(int) (uint64, uint64), maxGapSize uint64) (parts []part) { j := 0 k := 0 for k < length { j = k k++ - _, end := rng(j) + p := part{} + p.start, p.end = rng(j) // Keep growing the range until the end or we encounter a large gap. for ; k < length; k++ { s, e := rng(k) - if end+maxGapSize < s { + if p.end+maxGapSize < s { break } - end = e - } - parts = append(parts, [2]int{j, k}) - } - return parts -} - -func (r *bucketIndexReader) Symbols() (map[string]struct{}, error) { - return nil, errors.New("not implemented") -} - -// LabelValues returns the possible label values. -func (r *bucketIndexReader) LabelValues(names ...string) (index.StringTuples, error) { - if len(names) != 1 { - return nil, errors.New("label value lookups only supported for single name") - } - return index.NewStringTuples(r.block.lvals[names[0]], 1) -} - -type lazyPostings struct { - index.Postings - key labels.Label - ptr index.Range -} - -func (p *lazyPostings) set(v index.Postings) { - p.Postings = v -} - -// Postings returns the postings list iterator for the label pair. -// The Postings here contain the offsets to the series inside the index. -// Found IDs are not strictly required to point to a valid Series, e.g. during -// background garbage collections. -func (r *bucketIndexReader) Postings(name, value string) (index.Postings, error) { - l := labels.Label{Name: name, Value: value} - ptr, ok := r.block.postings[l] - if !ok { - return index.EmptyPostings(), nil - } - if b, ok := r.cache.postings(r.block.meta.ULID, l); ok { - r.stats.postingsTouched++ - r.stats.postingsTouchedSizeSum += len(b) - _, p, err := r.dec.Postings(b) - if err != nil { - return nil, errors.Wrap(err, "decode postings") + if p.end <= e { + p.end = e + } } - return p, nil + p.elemRng = [2]int{j, k} + parts = append(parts, p) } - // The stats for touched postings are updated as they are loaded. - p := &lazyPostings{key: l, ptr: ptr} - r.loadedPostings = append(r.loadedPostings, p) - return p, nil -} - -// SortedPostings returns a postings list that is reordered to be sorted -// by the label set of the underlying series. -func (r *bucketIndexReader) SortedPostings(p index.Postings) index.Postings { - return p + return parts } -// Series populates the given labels and chunk metas for the series identified +// LoadedSeries populates the given labels and chunk metas for the series identified // by the reference. // Returns ErrNotFound if the ref does not resolve to a known series. -func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { +func (r *bucketIndexReader) LoadedSeries(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { b, ok := r.loadedSeries[ref] if !ok { return errors.Errorf("series %d not found", ref) @@ -1433,9 +1466,13 @@ func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chun return r.dec.Series(b, lset, chks) } -// LabelIndices returns the label pairs for which indices exist. -func (r *bucketIndexReader) LabelIndices() ([][]string, error) { - return nil, errors.New("not implemented") +// LabelValues returns label values for single name. +func (r *bucketIndexReader) LabelValues(name string) []string { + res := make([]string, 0, len(r.block.lvals[name])) + for _, v := range r.block.lvals[name] { + res = append(res, v) + } + return res } // Close released the underlying resources of the reader. @@ -1500,10 +1537,10 @@ func (r *bucketChunkReader) preload() error { for _, p := range parts { ctx, cancel := context.WithCancel(r.ctx) - m, n := p[0], p[1] + m, n := p.elemRng[0], p.elemRng[1] g.Add(func() error { - return r.loadChunks(ctx, offsets[m:n], seq, offsets[m], offsets[n-1]+maxChunkSize) + return r.loadChunks(ctx, offsets[m:n], seq, uint32(p.start), uint32(p.end)+maxChunkSize) }, func(err error) { if err != nil { cancel() diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index b5bbd6692a..c5528121ac 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -20,6 +20,82 @@ import ( "github.com/prometheus/tsdb/labels" ) +func prepareStoreWithTestBlocks(t testing.TB, ctx context.Context, dir string, bkt objstore.Bucket) (store *BucketStore, minTime, maxTime int64) { + series := []labels.Labels{ + labels.FromStrings("a", "1", "b", "1"), + labels.FromStrings("a", "1", "b", "2"), + labels.FromStrings("a", "2", "b", "1"), + labels.FromStrings("a", "2", "b", "2"), + labels.FromStrings("a", "1", "c", "1"), + labels.FromStrings("a", "1", "c", "2"), + labels.FromStrings("a", "2", "c", "1"), + labels.FromStrings("a", "2", "c", "2"), + } + extLset := labels.FromStrings("ext1", "value1") + + start := time.Now() + now := start + + minTime = int64(0) + maxTime = int64(0) + blocks := 0 + for i := 0; i < 3; i++ { + mint := timestamp.FromTime(now) + now = now.Add(2 * time.Hour) + maxt := timestamp.FromTime(now) + + if minTime == 0 { + minTime = mint + } + maxTime = maxt + + // Create two blocks per time slot. Only add 10 samples each so only one chunk + // gets created each. This way we can easily verify we got 10 chunks per series below. + id1, err := testutil.CreateBlock(dir, series[:4], 10, mint, maxt, extLset, 0) + testutil.Ok(t, err) + id2, err := testutil.CreateBlock(dir, series[4:], 10, mint, maxt, extLset, 0) + testutil.Ok(t, err) + + dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) + + // Add labels to the meta of the second block. + meta, err := block.ReadMetaFile(dir2) + testutil.Ok(t, err) + meta.Thanos.Labels = map[string]string{"ext2": "value2"} + testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta)) + + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) + blocks += 2 + + testutil.Ok(t, os.RemoveAll(dir1)) + testutil.Ok(t, os.RemoveAll(dir2)) + } + + store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false) + testutil.Ok(t, err) + + go func() { + if err := runutil.Repeat(100*time.Millisecond, ctx.Done(), func() error { + return store.SyncBlocks(ctx) + }); err != nil && errors.Cause(err) != context.Canceled { + t.Error(err) + t.FailNow() + } + }() + + rctx, rcancel := context.WithTimeout(ctx, 30*time.Second) + defer rcancel() + testutil.Ok(t, runutil.Retry(100*time.Millisecond, rctx.Done(), func() error { + if store.numBlocks() < blocks { + return errors.New("not all blocks loaded") + } + return nil + })) + + return store, minTime, maxTime +} + func TestBucketStore_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { ctx, cancel := context.WithCancel(context.Background()) @@ -29,75 +105,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - series := []labels.Labels{ - labels.FromStrings("a", "1", "b", "1"), - labels.FromStrings("a", "1", "b", "2"), - labels.FromStrings("a", "2", "b", "1"), - labels.FromStrings("a", "2", "b", "2"), - labels.FromStrings("a", "1", "c", "1"), - labels.FromStrings("a", "1", "c", "2"), - labels.FromStrings("a", "2", "c", "1"), - labels.FromStrings("a", "2", "c", "2"), - } - extLset := labels.FromStrings("ext1", "value1") - - start := time.Now() - now := start - - minTime := int64(0) - maxTime := int64(0) - for i := 0; i < 3; i++ { - mint := timestamp.FromTime(now) - now = now.Add(2 * time.Hour) - maxt := timestamp.FromTime(now) - - if minTime == 0 { - minTime = mint - } - maxTime = maxt - - // Create two blocks per time slot. Only add 10 samples each so only one chunk - // gets created each. This way we can easily verify we got 10 chunks per series below. - id1, err := testutil.CreateBlock(dir, series[:4], 10, mint, maxt, extLset, 0) - testutil.Ok(t, err) - id2, err := testutil.CreateBlock(dir, series[4:], 10, mint, maxt, extLset, 0) - testutil.Ok(t, err) - - dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) - - // Add labels to the meta of the second block. - meta, err := block.ReadMetaFile(dir2) - testutil.Ok(t, err) - meta.Thanos.Labels = map[string]string{"ext2": "value2"} - testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta)) - - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) - - testutil.Ok(t, os.RemoveAll(dir1)) - testutil.Ok(t, os.RemoveAll(dir2)) - } - - store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false) - testutil.Ok(t, err) - - go func() { - if err := runutil.Repeat(100*time.Millisecond, ctx.Done(), func() error { - return store.SyncBlocks(ctx) - }); err != nil && errors.Cause(err) != context.Canceled { - t.Error(err) - t.FailNow() - } - }() - - rctx, rcancel := context.WithTimeout(ctx, 30*time.Second) - defer rcancel() - testutil.Ok(t, runutil.Retry(100*time.Millisecond, rctx.Done(), func() error { - if store.numBlocks() < 6 { - return errors.New("not all blocks loaded") - } - return nil - })) + store, minTime, maxTime := prepareStoreWithTestBlocks(t, ctx, dir, bkt) mint, maxt := store.TimeRange() testutil.Equals(t, minTime, mint) @@ -123,8 +131,8 @@ func TestBucketStore_e2e(t *testing.T) { Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, }, - MinTime: timestamp.FromTime(start), - MaxTime: timestamp.FromTime(now), + MinTime: mint, + MaxTime: maxt, }, srv)) testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) @@ -143,8 +151,8 @@ func TestBucketStore_e2e(t *testing.T) { Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "b", Value: "2"}, }, - MinTime: timestamp.FromTime(start), - MaxTime: timestamp.FromTime(now), + MinTime: mint, + MaxTime: maxt, }, srv)) testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) @@ -165,8 +173,8 @@ func TestBucketStore_e2e(t *testing.T) { {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, {Type: storepb.LabelMatcher_EQ, Name: "ext2", Value: "value2"}, }, - MinTime: timestamp.FromTime(start), - MaxTime: timestamp.FromTime(now), + MinTime: mint, + MaxTime: maxt, }, srv)) testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) @@ -181,8 +189,8 @@ func TestBucketStore_e2e(t *testing.T) { {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, {Type: storepb.LabelMatcher_EQ, Name: "ext2", Value: "wrong-value"}, }, - MinTime: timestamp.FromTime(start), - MaxTime: timestamp.FromTime(now), + MinTime: mint, + MaxTime: maxt, }, srv)) testutil.Equals(t, 0, len(srv.SeriesSet)) }) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index fa23dfc6f8..06d9d2f98b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -183,6 +183,27 @@ func TestBucketBlockSet_labelMatchers(t *testing.T) { }, match: true, }, + // Those are matchers mentioned here: https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + // We want to provide explicit tests that says when Thanos supports its and when not. We don't support it here in + // external labelset level. + { + in: []labels.Matcher{ + labels.Not(labels.NewEqualMatcher("", "x")), + }, + res: []labels.Matcher{ + labels.Not(labels.NewEqualMatcher("", "x")), + }, + match: true, + }, + { + in: []labels.Matcher{ + labels.Not(labels.NewEqualMatcher("", "d")), + }, + res: []labels.Matcher{ + labels.Not(labels.NewEqualMatcher("", "d")), + }, + match: true, + }, } for _, c := range cases { res, ok := set.labelMatchers(c.in...) @@ -198,15 +219,15 @@ func TestPartitionRanges(t *testing.T) { for _, c := range []struct { input [][2]int - expected [][2]int + expected []part }{ { input: [][2]int{{1, 10}}, - expected: [][2]int{{0, 1}}, + expected: []part{{start: 1, end: 10, elemRng: [2]int{0, 1}}}, }, { input: [][2]int{{1, 2}, {3, 5}, {7, 10}}, - expected: [][2]int{{0, 3}}, + expected: []part{{start: 1, end: 10, elemRng: [2]int{0, 3}}}, }, { input: [][2]int{ @@ -215,18 +236,33 @@ func TestPartitionRanges(t *testing.T) { {20, 30}, {maxGapSize + 31, maxGapSize + 32}, }, - expected: [][2]int{{0, 3}, {3, 4}}, + expected: []part{ + {start: 1, end: 30, elemRng: [2]int{0, 3}}, + {start: maxGapSize + 31, end: maxGapSize + 32, elemRng: [2]int{3, 4}}, + }, }, // Overlapping ranges. { input: [][2]int{ {1, 30}, - {3, 28}, {1, 4}, + {3, 28}, {maxGapSize + 31, maxGapSize + 32}, {maxGapSize + 31, maxGapSize + 40}, }, - expected: [][2]int{{0, 3}, {3, 5}}, + expected: []part{ + {start: 1, end: 30, elemRng: [2]int{0, 3}}, + {start: maxGapSize + 31, end: maxGapSize + 40, elemRng: [2]int{3, 5}}, + }, + }, + { + input: [][2]int{ + // Mimick AllPostingsKey, where range specified whole range. + {1, 15}, + {1, maxGapSize + 100}, + {maxGapSize + 31, maxGapSize + 40}, + }, + expected: []part{{start: 1, end: maxGapSize + 100, elemRng: [2]int{0, 3}}}, }, } { res := partitionRanges(len(c.input), func(i int) (uint64, uint64) {