diff --git a/lightning/backend/local.go b/lightning/backend/local.go index cc65f96f9..b5d19f522 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -92,6 +92,12 @@ type Range struct { length int } +// Contains check if the range contains the given key, [start, end). +func (rg *Range) Contains(key []byte) bool { + return bytes.Compare(key, rg.start) >= 0 && + (len(rg.end) == 0 || bytes.Compare(key, rg.end) < 0) +} + // localFileMeta contains some field that is necessary to continue the engine restore/import process. // These field should be written to disk when we update chunk checkpoint type localFileMeta struct { @@ -170,15 +176,16 @@ type local struct { pdAddr string g glue.Glue - localStoreDir string - regionSplitSize int64 + localStoreDir string + regionSplitSize int64 + tcpConcurrency int + batchWriteKVPairs int + checkpointEnabled bool rangeConcurrency *worker.Pool ingestConcurrency *worker.Pool - batchWriteKVPairs int - checkpointEnabled bool - tcpConcurrency int + runningRanges *rangeLockTree } // connPool is a lazy pool of gRPC channels. @@ -291,6 +298,7 @@ func NewLocalBackend( tcpConcurrency: rangeConcurrency, batchWriteKVPairs: sendKVPairs, checkpointEnabled: enableCheckpoint, + runningRanges: newRangeLockTree(), } local.conns.conns = make(map[uint64]*connPool) return MakeBackend(local), nil @@ -1087,6 +1095,9 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro remains := &syncdRanges{} for { + // local the range to avoid race condition + rg := Range{start: ranges[0].start, end: ranges[len(ranges)-1].end} + local.runningRanges.put(rg) log.L().Info("start import engine", zap.Stringer("uuid", engineUUID), zap.Int("ranges", len(ranges))) @@ -1101,12 +1112,14 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro log.ShortError(err), zap.Int("retry", i)) } if err != nil { + local.runningRanges.remove(rg) log.L().Error("split & scatter ranges failed", zap.Stringer("uuid", engineUUID), log.ShortError(err)) return err } // start to write to kv and ingest err = local.writeAndIngestByRanges(ctx, engineFile.(*LocalFile), ranges, remains) + local.runningRanges.remove(rg) if err != nil { log.L().Error("write and ingest engine failed", log.ShortError(err)) return err @@ -1469,3 +1482,75 @@ func (s *sizeProperties) iter(f func(p *rangeProperty) bool) { return f(prop) }) } + +type LockedRange struct { + sync.Mutex + Range +} + +func (l *LockedRange) Less(than btree.Item) bool { + other := than.(*LockedRange) + return bytes.Compare(l.start, other.start) < 0 +} + +var _ btree.Item = &LockedRange{} + +type rangeLockTree struct { + sync.Mutex + *btree.BTree +} + +func newRangeLockTree() *rangeLockTree { + return &rangeLockTree{BTree: btree.New(2)} +} + +func (t *rangeLockTree) firstOverlap(rg *LockedRange) *LockedRange { + var ret *LockedRange + t.DescendLessOrEqual(rg, func(i btree.Item) bool { + ret = i.(*LockedRange) + return false + }) + + if ret != nil && ret.Contains(rg.start) { + return ret + } + + t.AscendGreaterOrEqual(rg, func(i btree.Item) bool { + ret = i.(*LockedRange) + return false + }) + + if ret == nil || !rg.Contains(ret.start) { + return nil + } + return ret +} + +func (t *rangeLockTree) put(rg Range) { + lr := &LockedRange{Range: rg} + lr.Lock() + for { + t.Lock() + node := t.firstOverlap(lr) + if node != nil { + t.Unlock() + // wait the first overlapped node to be finish and try again + node.Lock() + node.Unlock() + continue + } + // here there should be no overlap nodes in the tree, put self there + t.ReplaceOrInsert(lr) + t.Unlock() + return + } +} + +// remove the range from tree and unlock it +func (t *rangeLockTree) remove(rg Range) { + lr := &LockedRange{Range: rg} + t.Lock() + node := t.Delete(lr).(*LockedRange) + node.Unlock() + t.Unlock() +} diff --git a/lightning/backend/local_test.go b/lightning/backend/local_test.go index 25c7b28fa..5a1e34cae 100644 --- a/lightning/backend/local_test.go +++ b/lightning/backend/local_test.go @@ -19,8 +19,10 @@ import ( "math" "math/rand" "path/filepath" + "time" "github.com/cockroachdb/pebble" + "github.com/google/btree" . "github.com/pingcap/check" "github.com/pingcap/tidb/util/hack" ) @@ -51,6 +53,81 @@ func (s *localSuite) TestNextKey(c *C) { c.Assert(bytes.Compare(next, []byte{1, 255, 0, 1, 2}), Equals, -1) } +func checkTreeNodes(c *C, rt *rangeLockTree, ranges []Range) { + idx := 0 + rt.BTree.Ascend(func(i btree.Item) bool { + n := i.(*LockedRange) + c.Assert(n.Range, DeepEquals, ranges[idx]) + idx++ + return true + }) + c.Assert(idx, Equals, len(ranges)) +} + +func (s *localSuite) TestRangeTree(c *C) { + rt := &rangeLockTree{BTree: btree.New(2)} + + finishChan := make(chan struct{}) + asyncPut := func(r Range) { + go func(r Range) { + rt.put(r) + finishChan <- struct{}{} + }(r) + } + + checkNonFinish := func() { + select { + case <-finishChan: + c.Fatal("") + case <-time.After(10 * time.Millisecond): + } + } + + r1 := Range{start: []byte{0}, end: []byte{5}} + rt.put(r1) + checkTreeNodes(c, rt, []Range{r1}) + + r2 := Range{start: []byte{10}, end: []byte{15}} + rt.put(r2) + checkTreeNodes(c, rt, []Range{r1, r2}) + + // insert a range with overlap + r3 := Range{start: []byte{3}, end: []byte{12}} + asyncPut(r3) + checkNonFinish() + checkTreeNodes(c, rt, []Range{r1, r2}) + + // add another no-overlap range, shouldn't be blocked + r4 := Range{start: []byte{20}, end: []byte{25}} + rt.put(r4) + checkTreeNodes(c, rt, []Range{r1, r2, r4}) + + r5 := Range{start: []byte{11}, end: []byte{20}} + asyncPut(r5) + checkNonFinish() + checkTreeNodes(c, rt, []Range{r1, r2, r4}) + + // remove r2, r5 should be inserted now + rt.remove(r2) + <-finishChan + + checkTreeNodes(c, rt, []Range{r1, r5, r4}) + rt.remove(r1) + checkNonFinish() + checkTreeNodes(c, rt, []Range{r5, r4}) + + rt.remove(r4) + checkNonFinish() + checkTreeNodes(c, rt, []Range{r5}) + + rt.remove(r5) + <-finishChan + checkTreeNodes(c, rt, []Range{r3}) + + rt.remove(r3) + checkTreeNodes(c, rt, []Range{}) +} + // The first half of this test is same as the test in tikv: // https://github.com/tikv/tikv/blob/dbfe7730dd0fddb34cb8c3a7f8a079a1349d2d41/components/engine_rocks/src/properties.rs#L572 func (s *localSuite) TestRangeProperties(c *C) {