Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#40504
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
fengou1 authored and ti-chi-bot committed Feb 15, 2023
1 parent 1c0e1e5 commit 9884380
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
59 changes: 34 additions & 25 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package restore
import (
"context"
"io"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -80,29 +79,27 @@ func NewStoreMeta(storeId uint64) StoreMeta {

// for test
type Recovery struct {
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
totalFlashbackRegions uint64
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
}

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
totalStores := len(allStores)
var StoreMetas = make([]StoreMeta, totalStores)
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
return Recovery{
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency,
totalFlashbackRegions: 0}
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency}
}

func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
Expand Down Expand Up @@ -305,6 +302,7 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
<<<<<<< HEAD
retryErr := utils.WithRetry(
ctx,
func() error {
Expand All @@ -328,18 +326,30 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve

recovery.progress.Inc()
return retryErr
=======
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
return stats, err
}

runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Error("region flashback prepare get error")
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))

return nil
>>>>>>> 321d313b060 (br: remove totalFlashbackRegions since task completion region already contains all regions (#40504))
}

// flashback the region data to version resolveTS
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) {
var completedRegions atomic.Uint64

// only know the total progress of tikv, progress is total state of the whole restore flow.
ratio := int(recovery.totalFlashbackRegions) / len(recovery.allStores)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

Expand All @@ -354,13 +364,12 @@ func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint
return errors.Trace(err)
}

recovery.progress.IncBy(int64(completedRegions.Load()) / int64(ratio))

log.Info("region flashback complete",
zap.Uint64("resolveTS", resolveTS),
zap.Uint64("commitTS", commitTS),
zap.Int("regions", runner.CompletedRegions()))

recovery.progress.Inc()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}

log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile))
// progress = read meta + send recovery + iterate tikv + flashback.
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*4), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*4), cfg.ProgressFile)
// progress = read meta + send recovery + iterate tikv + (1 * prepareflashback + 1 * flashback)
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*3+2), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*3+2), cfg.ProgressFile)

// restore tikv data from a snapshot volume
var totalRegions int
Expand Down

0 comments on commit 9884380

Please sign in to comment.