Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

backend/local: serial import engines with range overlap #451

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 90 additions & 5 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ type Range struct {
length int
}

// Contains check if the range contains the given key, [start, end).
func (rg *Range) Contains(key []byte) bool {
return bytes.Compare(key, rg.start) >= 0 &&
(len(rg.end) == 0 || bytes.Compare(key, rg.end) < 0)
}

// localFileMeta contains some field that is necessary to continue the engine restore/import process.
// These field should be written to disk when we update chunk checkpoint
type localFileMeta struct {
Expand Down Expand Up @@ -170,15 +176,16 @@ type local struct {
pdAddr string
g glue.Glue

localStoreDir string
regionSplitSize int64
localStoreDir string
regionSplitSize int64
tcpConcurrency int
batchWriteKVPairs int
checkpointEnabled bool

rangeConcurrency *worker.Pool
ingestConcurrency *worker.Pool
batchWriteKVPairs int
checkpointEnabled bool

tcpConcurrency int
runningRanges *rangeLockTree
}

// connPool is a lazy pool of gRPC channels.
Expand Down Expand Up @@ -291,6 +298,7 @@ func NewLocalBackend(
tcpConcurrency: rangeConcurrency,
batchWriteKVPairs: sendKVPairs,
checkpointEnabled: enableCheckpoint,
runningRanges: newRangeLockTree(),
}
local.conns.conns = make(map[uint64]*connPool)
return MakeBackend(local), nil
Expand Down Expand Up @@ -1087,6 +1095,9 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
remains := &syncdRanges{}

for {
// local the range to avoid race condition
rg := Range{start: ranges[0].start, end: ranges[len(ranges)-1].end}
local.runningRanges.put(rg)
log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)))

Expand All @@ -1101,12 +1112,14 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
log.ShortError(err), zap.Int("retry", i))
}
if err != nil {
local.runningRanges.remove(rg)
log.L().Error("split & scatter ranges failed", zap.Stringer("uuid", engineUUID), log.ShortError(err))
return err
}

// start to write to kv and ingest
err = local.writeAndIngestByRanges(ctx, engineFile.(*LocalFile), ranges, remains)
local.runningRanges.remove(rg)
if err != nil {
log.L().Error("write and ingest engine failed", log.ShortError(err))
return err
Expand Down Expand Up @@ -1469,3 +1482,75 @@ func (s *sizeProperties) iter(f func(p *rangeProperty) bool) {
return f(prop)
})
}

type LockedRange struct {
sync.Mutex
Range
}

func (l *LockedRange) Less(than btree.Item) bool {
other := than.(*LockedRange)
return bytes.Compare(l.start, other.start) < 0
}

var _ btree.Item = &LockedRange{}

type rangeLockTree struct {
sync.Mutex
*btree.BTree
}

func newRangeLockTree() *rangeLockTree {
return &rangeLockTree{BTree: btree.New(2)}
}

func (t *rangeLockTree) firstOverlap(rg *LockedRange) *LockedRange {
var ret *LockedRange
t.DescendLessOrEqual(rg, func(i btree.Item) bool {
ret = i.(*LockedRange)
return false
})

if ret != nil && ret.Contains(rg.start) {
return ret
}

t.AscendGreaterOrEqual(rg, func(i btree.Item) bool {
ret = i.(*LockedRange)
return false
})

if ret == nil || !rg.Contains(ret.start) {
return nil
}
return ret
}

func (t *rangeLockTree) put(rg Range) {
lr := &LockedRange{Range: rg}
lr.Lock()
for {
t.Lock()
node := t.firstOverlap(lr)
if node != nil {
t.Unlock()
// wait the first overlapped node to be finish and try again
node.Lock()
node.Unlock()
continue
}
// here there should be no overlap nodes in the tree, put self there
t.ReplaceOrInsert(lr)
t.Unlock()
return
}
}

// remove the range from tree and unlock it
func (t *rangeLockTree) remove(rg Range) {
lr := &LockedRange{Range: rg}
t.Lock()
node := t.Delete(lr).(*LockedRange)
node.Unlock()
t.Unlock()
}
77 changes: 77 additions & 0 deletions lightning/backend/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"math"
"math/rand"
"path/filepath"
"time"

"github.com/cockroachdb/pebble"
"github.com/google/btree"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/hack"
)
Expand Down Expand Up @@ -51,6 +53,81 @@ func (s *localSuite) TestNextKey(c *C) {
c.Assert(bytes.Compare(next, []byte{1, 255, 0, 1, 2}), Equals, -1)
}

func checkTreeNodes(c *C, rt *rangeLockTree, ranges []Range) {
idx := 0
rt.BTree.Ascend(func(i btree.Item) bool {
n := i.(*LockedRange)
c.Assert(n.Range, DeepEquals, ranges[idx])
idx++
return true
})
c.Assert(idx, Equals, len(ranges))
}

func (s *localSuite) TestRangeTree(c *C) {
rt := &rangeLockTree{BTree: btree.New(2)}

finishChan := make(chan struct{})
asyncPut := func(r Range) {
go func(r Range) {
rt.put(r)
finishChan <- struct{}{}
}(r)
}

checkNonFinish := func() {
select {
case <-finishChan:
c.Fatal("")
case <-time.After(10 * time.Millisecond):
}
}

r1 := Range{start: []byte{0}, end: []byte{5}}
rt.put(r1)
checkTreeNodes(c, rt, []Range{r1})

r2 := Range{start: []byte{10}, end: []byte{15}}
rt.put(r2)
checkTreeNodes(c, rt, []Range{r1, r2})

// insert a range with overlap
r3 := Range{start: []byte{3}, end: []byte{12}}
asyncPut(r3)
checkNonFinish()
checkTreeNodes(c, rt, []Range{r1, r2})

// add another no-overlap range, shouldn't be blocked
r4 := Range{start: []byte{20}, end: []byte{25}}
rt.put(r4)
checkTreeNodes(c, rt, []Range{r1, r2, r4})

r5 := Range{start: []byte{11}, end: []byte{20}}
asyncPut(r5)
checkNonFinish()
checkTreeNodes(c, rt, []Range{r1, r2, r4})

// remove r2, r5 should be inserted now
rt.remove(r2)
<-finishChan

checkTreeNodes(c, rt, []Range{r1, r5, r4})
rt.remove(r1)
checkNonFinish()
checkTreeNodes(c, rt, []Range{r5, r4})

rt.remove(r4)
checkNonFinish()
checkTreeNodes(c, rt, []Range{r5})

rt.remove(r5)
<-finishChan
checkTreeNodes(c, rt, []Range{r3})

rt.remove(r3)
checkTreeNodes(c, rt, []Range{})
}

// The first half of this test is same as the test in tikv:
// https://github.com/tikv/tikv/blob/dbfe7730dd0fddb34cb8c3a7f8a079a1349d2d41/components/engine_rocks/src/properties.rs#L572
func (s *localSuite) TestRangeProperties(c *C) {
Expand Down