Skip to content

Commit

Permalink
store: Preallocate output buffer when encoding postings. (#2812)
Browse files Browse the repository at this point in the history
* Preallocate output buffer when encoding postings.

Signed-off-by: Peter Štibraný <[email protected]>

* Rename values to length.

Signed-off-by: Peter Štibraný <[email protected]>

* Renamed count method to length.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Jun 26, 2020
1 parent a7bb287 commit fc27af4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 7 deletions.
8 changes: 7 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,8 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:]))
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
Expand Down Expand Up @@ -1803,6 +1804,11 @@ func (it *bigEndianPostings) Err() error {
return nil
}

// Returns number of remaining postings values.
func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package store
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -1922,3 +1923,21 @@ func mustMarshalAny(pb proto.Message) *types.Any {
}
return out
}

func TestBigEndianPostingsCount(t *testing.T) {
const count = 1000
raw := make([]byte, count*4)

for ix := 0; ix < count; ix++ {
binary.BigEndian.PutUint32(raw[4*ix:], rand.Uint32())
}

p := newBigEndianPostings(raw)
testutil.Equals(t, count, p.length())

c := 0
for p.Next() {
c++
}
testutil.Equals(t, count, c)
}
14 changes: 11 additions & 3 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func isDiffVarintSnappyEncodedPostings(input []byte) bool {
// diffVarintSnappyEncode encodes postings into diff+varint representation,
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
func diffVarintSnappyEncode(p index.Postings) ([]byte, error) {
buf, err := diffVarintEncodeNoHeader(p)
// Length argument is expected number of postings, used for preallocating buffer.
func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) {
buf, err := diffVarintEncodeNoHeader(p, length)
if err != nil {
return nil, err
}
Expand All @@ -52,9 +53,16 @@ func diffVarintSnappyEncode(p index.Postings) ([]byte, error) {

// diffVarintEncodeNoHeader encodes postings into diff+varint representation.
// It doesn't add any header to the output bytes.
func diffVarintEncodeNoHeader(p index.Postings) ([]byte, error) {
// Length argument is expected number of postings, used for preallocating buffer.
func diffVarintEncodeNoHeader(p index.Postings, length int) ([]byte, error) {
buf := encoding.Encbuf{}

// This encoding uses around ~1 bytes per posting, but let's use
// conservative 1.25 bytes per posting to avoid extra allocations.
if length > 0 {
buf.B = make([]byte, 0, 5*length/4)
}

prev := uint64(0)
for p.Next() {
v := p.At()
Expand Down
37 changes: 34 additions & 3 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ package store

import (
"io/ioutil"
"math"
"math/rand"
"os"
"strconv"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -49,7 +52,7 @@ func TestDiffVarintCodec(t *testing.T) {
}

codecs := map[string]struct {
codingFunction func(index.Postings) ([]byte, error)
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (index.Postings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }},
Expand All @@ -68,11 +71,11 @@ func TestDiffVarintCodec(t *testing.T) {
t.Log("original size (4*entries):", 4*p.len(), "bytes")
p.reset() // We reuse postings between runs, so we need to reset iterator.

data, err := codec.codingFunction(p)
data, err := codec.codingFunction(p, p.len())
testutil.Ok(t, err)

t.Log("encoded size", len(data), "bytes")
t.Logf("ratio: %0.3f", (float64(len(data)) / float64(4*p.len())))
t.Logf("ratio: %0.3f", float64(len(data))/float64(4*p.len()))

decodedPostings, err := codec.decodingFunction(data)
testutil.Ok(t, err)
Expand Down Expand Up @@ -188,3 +191,31 @@ func (p *uint64Postings) reset() {
func (p *uint64Postings) len() int {
return len(p.vals)
}

func BenchmarkEncodePostings(b *testing.B) {
const max = 1000000
r := rand.New(rand.NewSource(0))

p := make([]uint64, max)

for ix := 1; ix < len(p); ix++ {
// Use normal distribution, with stddev=64 (i.e. most values are < 64).
// This is very rough approximation of experiments with real blocks.v
d := math.Abs(r.NormFloat64()*64) + 1

p[ix] = p[ix-1] + uint64(d)
}

for _, count := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := diffVarintEncodeNoHeader(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
}
}

0 comments on commit fc27af4

Please sign in to comment.