Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Vertical query merging and compaction #370

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d14623d
Vertical series iterator
codesome Sep 1, 2018
bea7a5d
Select overlapped blocks first in compactor Plan()
codesome Sep 2, 2018
4afa1f0
Added vertical compaction
codesome Sep 3, 2018
f11143c
Code cleanup and comments
codesome Sep 4, 2018
55b13c0
Fix review comments
codesome Oct 6, 2018
f880bc9
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Oct 6, 2018
7de67b3
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Oct 25, 2018
ba9facf
Fix tests
codesome Oct 25, 2018
0f86bb0
Add benchmark for compaction
codesome Oct 25, 2018
0aae01d
Perform vertical compaction only when blocks are overlapping.
codesome Nov 3, 2018
cb9bb62
Benchmark for vertical compaction
codesome Nov 3, 2018
7ae4941
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 3, 2018
94e5ec1
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 13, 2018
4103678
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 19, 2018
9df857d
Benchmark for query iterator and seek for non overlapping blocks
codesome Nov 30, 2018
ad4ef3f
Vertical query merge only for overlapping blocks
codesome Nov 30, 2018
5620350
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 30, 2018
e9b05eb
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Dec 7, 2018
a8f4c26
Simplify logging in Compact(...)
codesome Dec 27, 2018
5e707bf
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Dec 27, 2018
d655420
Updated CHANGELOG.md
codesome Dec 27, 2018
6cb6f2a
Calculate overlapping inside populateBlock
codesome Jan 4, 2019
f53e648
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 5, 2019
6254595
MinTime and MaxTime for BlockReader.
codesome Jan 10, 2019
275eeb0
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 16, 2019
f43086f
Sort blocks w.r.t. MinTime in reload()
codesome Jan 19, 2019
48eed7c
Log about overlapping in LeveledCompactor.write() instead of returnin…
codesome Jan 19, 2019
9f288dc
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 19, 2019
b92a960
Log about overlapping inside LeveledCompactor.populateBlock()
codesome Jan 21, 2019
0d98331
Fix review comments
codesome Jan 21, 2019
acfbdb3
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 21, 2019
4d448d6
Refactor createBlock to take optional []Series
codesome Jan 21, 2019
159cbe3
review1
Jan 23, 2019
f1253d2
Merge pull request #6 from krasi-georgiev/pull/370-review
codesome Jan 23, 2019
1072f0f
Updated CHANGELOG and minor nits
codesome Jan 23, 2019
4a0e2e6
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 28, 2019
8047a2a
nits
codesome Jan 28, 2019
47436d0
Updated CHANGELOG
codesome Jan 28, 2019
ad7993e
Refactor iterator and seek benchmarks for Querier.
codesome Jan 30, 2019
117cef8
Additional test case
codesome Feb 8, 2019
5f8d911
genSeries takes optional labels. Updated BenchmarkQueryIterator and B…
codesome Feb 10, 2019
a23f030
Split genSeries into genSeries and populateSeries
codesome Feb 12, 2019
260665c
Check error in benchmark
codesome Feb 12, 2019
6a1d3f4
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Feb 12, 2019
d5e8479
Fix review comments
codesome Feb 14, 2019
58e534c
Warn about overlapping blocks in reload()
codesome Feb 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion block.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ type StringTuples interface {
type ChunkWriter interface {
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
// must be populated.
// After successful write, it returns the actual chunks that it wrote.
// It might differ from original if there was overlapping between chunks.
// After returning successfully, the Ref fields in the ChunkMetas
// are set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...chunks.Meta) error
WriteChunks(chunks ...chunks.Meta) ([]chunks.Meta, error)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Expand Down
39 changes: 39 additions & 0 deletions chunkenc/xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,42 @@ func (it *xorIterator) readValue() bool {
it.numRead++
return true
}

// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b Chunk) (*XORChunk, error) {
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
newChunk := NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
return newChunk, nil
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
}
53 changes: 45 additions & 8 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,44 @@ func (w *Writer) write(b []byte) error {
return err
}

func (w *Writer) WriteChunks(chks ...Meta) error {
// mergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The first appearing sample is retained in case there is overlapping.
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
func (w *Writer) mergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
var newChks []Meta // Will contain the merged chunks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we allocate space for all of them?

newChks := make([]Meta, len(chks), len(chks)). It will reduce allocations. And it's fine if we will not use all of this. Statistically we will use all of those (when they not overlap).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. I actually observed significant increase in allocs when I tested with #240. I will test again with this change.

newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := chunkenc.MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}

return newChks, nil
}

func (w *Writer) WriteChunks(chks ...Meta) ([]Meta, error) {
chks, err := w.mergeOverlappingChunks(chks)
if err != nil {
return nil, err
}

// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
Expand All @@ -210,7 +247,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error {

if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
if err := w.cut(); err != nil {
return err
return nil, err
}
}

Expand All @@ -226,26 +263,26 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))

if err := w.write(b[:n]); err != nil {
return err
return nil, err
}
b[0] = byte(chk.Chunk.Encoding())
if err := w.write(b[:1]); err != nil {
return err
return nil, err
}
if err := w.write(chk.Chunk.Bytes()); err != nil {
return err
return nil, err
}

w.crc32.Reset()
if err := chk.writeHash(w.crc32); err != nil {
return err
return nil, err
}
if err := w.write(w.crc32.Sum(b[:0])); err != nil {
return err
return nil, err
}
}

return nil
return chks, nil
}

func (w *Writer) seq() int {
Expand Down
47 changes: 44 additions & 3 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,20 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})

var res []string
// Checking for overlapping blocks.
for _, dm := range c.selectOverlappingDirs(dms) {
res = append(res, dm.dir)
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
}
if len(res) > 0 {
return res, nil
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}
// No overlapping blocks, do compaction the usual way.

krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
dms = dms[:len(dms)-1]

var res []string
for _, dm := range c.selectDirs(dms) {
res = append(res, dm.dir)
}
Expand Down Expand Up @@ -238,6 +247,33 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
return nil
}

// selectOverlappingDirs returns one set of dir metas of the blocks whose ranges are overlapping,
// that should be compacted into a single new block.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []dirMeta {
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
if len(ds) < 2 {
return nil
}
startBlock, endBlock := -1, -1
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
prevMaxTime := ds[0].meta.MaxTime
for i, d := range ds[1:] {
if d.meta.MinTime < prevMaxTime {
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
if startBlock < 0 {
startBlock = i
}
endBlock = i + 1
} else if startBlock >= 0 {
break
}
if d.meta.MaxTime > prevMaxTime {
prevMaxTime = d.meta.MaxTime
}
}
if startBlock < 0 {
return nil
}
return ds[startBlock : endBlock+1]
}

// splitByRange splits the directories by the time range. The range sequence starts at 0.
//
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
Expand Down Expand Up @@ -409,7 +445,7 @@ type instrumentedChunkWriter struct {
trange prometheus.Histogram
}

func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) ([]chunks.Meta, error) {
for _, c := range chunks {
w.size.Observe(float64(len(c.Chunk.Bytes())))
w.samples.Observe(float64(c.Chunk.NumSamples()))
Expand Down Expand Up @@ -583,6 +619,10 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,

for set.Next() {
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
// If blocks are overlapped, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
return chks[i].MinTime < chks[j].MinTime
})

// Skip the series with all deleted chunks.
if len(chks) == 0 {
Expand Down Expand Up @@ -611,7 +651,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
chks[i].Chunk = newChunk
}
}
if err := chunkw.WriteChunks(chks...); err != nil {
var err error
if chks, err = chunkw.WriteChunks(chks...); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not use if _ = :...; err != nil idiom then if we don't want to scope down those return variables.

return errors.Wrap(err, "write chunks")
}

Expand Down
34 changes: 34 additions & 0 deletions compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,40 @@ func TestLeveledCompactor_plan(t *testing.T) {
},
expected: []string{"7", "8"},
},
// For overlapping blocks.
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 19, 40, nil),
metaRange("3", 40, 60, nil),
},
expected: []string{"1", "2"},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 30, 50, nil),
},
expected: []string{"2", "3"},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 10, 40, nil),
metaRange("3", 30, 50, nil),
},
expected: []string{"1", "2", "3"},
},
{
metas: []dirMeta{
metaRange("5", 0, 360, nil),
metaRange("6", 340, 560, nil),
metaRange("7", 360, 420, nil),
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
metaRange("8", 420, 540, nil),
},
expected: []string{"5", "6", "7", "8"},
},
}

for _, c := range cases {
Expand Down
22 changes: 0 additions & 22 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,6 @@ func (db *DB) reload() (err error) {
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}

// Swap in new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
Expand Down Expand Up @@ -543,25 +540,6 @@ func (db *DB) reload() (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}

// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {
return nil
}

var metas []BlockMeta
for _, b := range bs {
metas = append(metas, b.meta)
}

overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", overlaps)
}

return nil
}

// TimeRange specifies minTime and maxTime range.
type TimeRange struct {
Min, Max int64
Expand Down
Loading