Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
More efficient Merge implementation. (#486)
Browse files Browse the repository at this point in the history
Avoid a tree of merge objects, which can result in
what I suspect is n^2 calls to Seek when using Without.

With 100k metrics, and a regex of ^$ in BenchmarkHeadPostingForMatchers:

Before:
BenchmarkHeadPostingForMatchers-8              1        51633185216 ns/op      29745528 B/op      200357 allocs/op

After:
BenchmarkHeadPostingForMatchers-8             10         108924996 ns/op 25715025 B/op     101748 allocs/op

Signed-off-by: Brian Brazil <[email protected]>
  • Loading branch information
brian-brazil authored Jan 3, 2019
1 parent b2d7bbd commit 296f943
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 75 deletions.
89 changes: 17 additions & 72 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,80 +366,25 @@ func Merge(its ...Postings) Postings {
if len(its) == 1 {
return its[0]
}
l := len(its) / 2
return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...))
}

type mergedPostings struct {
a, b Postings
initialized bool
aok, bok bool
cur uint64
}

func newMergedPostings(a, b Postings) *mergedPostings {
return &mergedPostings{a: a, b: b}
}

func (it *mergedPostings) At() uint64 {
return it.cur
}

func (it *mergedPostings) Next() bool {
if !it.initialized {
it.aok = it.a.Next()
it.bok = it.b.Next()
it.initialized = true
}

if !it.aok && !it.bok {
return false
}

if !it.aok {
it.cur = it.b.At()
it.bok = it.b.Next()
return true
}
if !it.bok {
it.cur = it.a.At()
it.aok = it.a.Next()
return true
}

acur, bcur := it.a.At(), it.b.At()

if acur < bcur {
it.cur = acur
it.aok = it.a.Next()
} else if acur > bcur {
it.cur = bcur
it.bok = it.b.Next()
} else {
it.cur = acur
it.aok = it.a.Next()
it.bok = it.b.Next()
}
return true
}

func (it *mergedPostings) Seek(id uint64) bool {
if it.cur >= id {
return true
// All the uses of this function immediately expand it, so
// collect everything in a map. This is more efficient
// when there's 100ks of postings, compared to
// having a tree of merge objects.
pm := make(map[uint64]struct{}, len(its))
for _, it := range its {
for it.Next() {
pm[it.At()] = struct{}{}
}
if it.Err() != nil {
return ErrPostings(it.Err())
}
}

it.aok = it.a.Seek(id)
it.bok = it.b.Seek(id)
it.initialized = true

return it.Next()
}

func (it *mergedPostings) Err() error {
if it.a.Err() != nil {
return it.a.Err()
pl := make([]uint64, 0, len(pm))
for p := range pm {
pl = append(pl, p)
}
return it.b.Err()
sort.Slice(pl, func(i, j int) bool { return pl[i] < pl[j] })
return newListPostings(pl)
}

// Without returns a new postings list that contains all elements from the full list that
Expand Down
6 changes: 3 additions & 3 deletions index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestMergedPostings(t *testing.T) {
a := newListPostings(c.a)
b := newListPostings(c.b)

res, err := ExpandPostings(newMergedPostings(a, b))
res, err := ExpandPostings(Merge(a, b))
testutil.Ok(t, err)
testutil.Equals(t, c.res, res)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestMergedPostingsSeek(t *testing.T) {
a := newListPostings(c.a)
b := newListPostings(c.b)

p := newMergedPostings(a, b)
p := Merge(a, b)

testutil.Equals(t, c.success, p.Seek(c.seek))

Expand Down Expand Up @@ -546,7 +546,7 @@ func TestIntersectWithMerge(t *testing.T) {
// https://github.com/prometheus/prometheus/issues/2616
a := newListPostings([]uint64{21, 22, 23, 24, 25, 30})

b := newMergedPostings(
b := Merge(
newListPostings([]uint64{10, 20, 30}),
newListPostings([]uint64{15, 26, 30}),
)
Expand Down

0 comments on commit 296f943

Please sign in to comment.