Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: remove totalFlashbackRegions since task completion region already contains all regions #40504

Merged
merged 8 commits into from
Feb 2, 2023
Merged
47 changes: 16 additions & 31 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package restore
import (
"context"
"io"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -80,29 +79,27 @@ func NewStoreMeta(storeId uint64) StoreMeta {

// for test
type Recovery struct {
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
totalFlashbackRegions uint64
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
}

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
totalStores := len(allStores)
var StoreMetas = make([]StoreMeta, totalStores)
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
return Recovery{
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency,
totalFlashbackRegions: 0}
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency}
}

func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
Expand Down Expand Up @@ -305,12 +302,8 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
var totalRegions atomic.Uint64
totalRegions.Store(0)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

Expand All @@ -321,23 +314,16 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve
log.Error("region flashback prepare get error")
return errors.Trace(err)
}

recovery.totalFlashbackRegions = totalRegions.Load()
recovery.progress.Inc()
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))

return nil
}

// flashback the region data to version resolveTS
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) {
var completedRegions atomic.Uint64

// only know the total progress of tikv, progress is total state of the whole restore flow.
ratio := int(recovery.totalFlashbackRegions) / len(recovery.allStores)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

Expand All @@ -352,13 +338,12 @@ func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint
return errors.Trace(err)
}

recovery.progress.IncBy(int64(completedRegions.Load()) / int64(ratio))

log.Info("region flashback complete",
zap.Uint64("resolveTS", resolveTS),
zap.Uint64("commitTS", commitTS),
zap.Int("regions", runner.CompletedRegions()))

recovery.progress.Inc()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}

log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile))
// progress = read meta + send recovery + iterate tikv + flashback.
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*4), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*4), cfg.ProgressFile)
// progress = read meta + send recovery + iterate tikv + (1 * prepareflashback + 1 * flashback)
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*3+2), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*3+2), cfg.ProgressFile)

// restore tikv data from a snapshot volume
var totalRegions int
Expand Down