Skip to content

Commit

Permalink
backend/local: do not retry epochNotMatch error when ingest sst (ping…
Browse files Browse the repository at this point in the history
…cap#419)

* do not retry epochNotMatch error when ingest sst

* add retry ingest for 'Raft raft: proposal dropped' error in ingest

* change some retryable error log level from Error to Warn

* fix nextKey

* add a comment for nextKey

* fix comment and add a unit test

* wrap time.Sleep in select

Co-authored-by: kennytm <[email protected]>
  • Loading branch information
glorv and kennytm authored Oct 19, 2020
1 parent 60c1c27 commit 2660ffe
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 53 deletions.
100 changes: 47 additions & 53 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -288,7 +289,7 @@ func (local *local) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientC
)
cancel()
if err != nil {
return nil, errors.WithStack(err)
return nil, errors.Trace(err)
}
return conn, nil
}
Expand Down Expand Up @@ -500,7 +501,7 @@ func (local *local) WriteToTiKV(

wstream, err := cli.Write(ctx)
if err != nil {
return nil, nil, err
return nil, nil, errors.Trace(err)
}

// Bind uuid for this write request
Expand All @@ -510,7 +511,7 @@ func (local *local) WriteToTiKV(
},
}
if err = wstream.Send(req); err != nil {
return nil, nil, err
return nil, nil, errors.Trace(err)
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
Expand Down Expand Up @@ -589,7 +590,7 @@ func (local *local) WriteToTiKV(

// if there is not leader currently, we should directly return an error
if leaderPeerMetas == nil {
log.L().Error("write to tikv no leader", zap.Reflect("region", region),
log.L().Warn("write to tikv no leader", zap.Reflect("region", region),
zap.Uint64("leader_id", leaderID), zap.Reflect("meta", meta),
zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size))
return nil, nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d",
Expand Down Expand Up @@ -1085,11 +1086,17 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro

for {
// split region by given ranges
err = local.SplitAndScatterRegionByRanges(ctx, ranges)
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, ranges)
if err == nil {
break
}
}
if err != nil {
log.L().Error("split & scatter ranges failed", zap.Error(err))
return err
}

// start to write to kv and ingest
err = local.WriteAndIngestByRanges(ctx, engineFile.(*LocalFile), ranges, remains)
if err != nil {
Expand Down Expand Up @@ -1124,7 +1131,7 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err
}
local.engines.Delete(engineUUID)
} else {
log.L().Error("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID))
log.L().Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID))
}
return nil
}
Expand Down Expand Up @@ -1202,7 +1209,27 @@ func (local *local) isIngestRetryable(
return false, nil, nil
}

getRegion := func() (*split.RegionInfo, error) {
for i := 0; ; i++ {
newRegion, err := local.splitCli.GetRegion(ctx, region.Region.GetStartKey())
if err != nil {
return nil, errors.Trace(err)
}
if newRegion != nil {
return newRegion, nil
}
log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region),
zap.Int("retry", i))
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Second):
}
}
}

var newRegion *split.RegionInfo
var err error
switch errPb := resp.GetError(); {
case errPb.NotLeader != nil:
if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil {
Expand All @@ -1211,65 +1238,32 @@ func (local *local) isIngestRetryable(
Region: region.Region,
}
} else {
var err error
for i := 0; ; i++ {
newRegion, err = local.splitCli.GetRegion(ctx, region.Region.GetStartKey())
if err != nil {
return false, nil, errors.Trace(err)
}
if newRegion != nil {
break
}
log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region),
zap.Int("retry", i))
time.Sleep(time.Second)
newRegion, err = getRegion()
if err != nil {
return false, nil, errors.Trace(err)
}
}
return true, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage())
case errPb.EpochNotMatch != nil:
if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil {
var currentRegion *metapb.Region
for _, r := range currentRegions {
if insideRegion(r, meta) {
currentRegion = r
break
}
}
if currentRegion != nil {
var newLeader *metapb.Peer
for _, p := range currentRegion.Peers {
if p.GetStoreId() == region.Leader.GetStoreId() {
newLeader = p
break
}
}
if newLeader != nil {
newRegion = &split.RegionInfo{
Leader: newLeader,
Region: currentRegion,
}
}
}
case strings.Contains(errPb.Message, "raft: proposal dropped"):
// TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader'
newRegion, err = getRegion()
if err != nil {
return false, nil, errors.Trace(err)
}
return true, newRegion, errors.Errorf("epoch not match: %s", errPb.GetMessage())
return true, newRegion, errors.New(errPb.GetMessage())
}
return false, nil, errors.Errorf("non retryable error: %s", resp.GetError().GetMessage())
return false, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
// special case when key is empty, empty bytes means infinity in our context, so directly return itself.
func nextKey(key []byte) []byte {
if len(key) == 0 {
return []byte{}
}
res := make([]byte, 0, len(key)+1)
pos := 0
for i := len(key) - 1; i >= 0; i-- {
if key[i] != '\xff' {
pos = i
break
}
}
s, e := key[:pos], key[pos]+1
res = append(append(res, s...), e)
res = append(res, key...)
res = append(res, 0)
return res
}

Expand Down
33 changes: 33 additions & 0 deletions lightning/backend/local_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package backend

import (
"bytes"

. "github.com/pingcap/check"
)

type localSuite struct{}

var _ = Suite(&localSuite{})

func (s *localSuite) TestNextKey(c *C) {
c.Assert(nextKey([]byte{}), DeepEquals, []byte{})

cases := [][]byte{
{0},
{255},
{1, 255},
}
for _, b := range cases {
next := nextKey(b)
c.Assert(next, DeepEquals, append(b, 0))
}

// in the old logic, this should return []byte{} which is not the actually smallest eky
next := nextKey([]byte{1, 255})
c.Assert(bytes.Compare(next, []byte{2}), Equals, -1)

// another test case, nextkey()'s return should be smaller than key with a prefix of the origin key
next = nextKey([]byte{1, 255})
c.Assert(bytes.Compare(next, []byte{1, 255, 0, 1, 2}), Equals, -1)
}

0 comments on commit 2660ffe

Please sign in to comment.