Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: ebs volume snapshot backup and restore with flashback solution #38700

Merged
merged 23 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e15ed35
feat: move ebs resolve data solution to flashback
fengou1 Oct 27, 2022
b5a140e
refactor: add concurrency and progress for flashback solution
fengou1 Oct 28, 2022
e44e112
Merge branch 'master' into ebs_with_flashback
fengou1 Oct 28, 2022
4e64ba0
Merge branch 'master' into ebs_with_flashback
fengou1 Oct 28, 2022
ebb79bf
feat: adapts flashback solution with the entire cluster range
fengou1 Nov 1, 2022
97f4b7c
refactor: add ddl flashback concurrency
fengou1 Nov 1, 2022
7ca4452
Merge branch 'master' into ebs_with_flashback
fengou1 Nov 1, 2022
6db3708
refactor: remove dependency from flashbackToVersion function
fengou1 Nov 2, 2022
f0047ca
fix: make check_dev happy
fengou1 Nov 2, 2022
3df6aeb
refactor: add progress for flashback, rename resolvedTs to resolveTS …
fengou1 Nov 2, 2022
8a4747b
fix: remove empty lines to get build ci pass
fengou1 Nov 2, 2022
e501d50
Merge branch 'master' into ebs_with_flashback
fengou1 Nov 2, 2022
90adf23
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
0f045e5
Merge branch 'master' into ebs_with_flashback
3pointer Nov 3, 2022
900d1c8
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
5c0b444
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
7b686e1
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
30d904d
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
007a5f4
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
fb74ee1
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
e531b69
fix: update build bazel to get check_dev up
fengou1 Nov 3, 2022
00347be
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
f5d1fde
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 89 additions & 64 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package restore
import (
"context"
"io"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -13,7 +14,11 @@ import (
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/util/mathutil"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -25,9 +30,10 @@ import (
// 2. make recovery plan and then recovery max allocate ID firstly
// 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log
// 4. ensure all region apply to last log
// 5. send the resolvedTs to tikv for deleting data.
func RecoverData(ctx context.Context, resolvedTs uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress) (int, error) {
var recovery = NewRecovery(allStores, mgr, progress)
// 5. prepare the flashback
// 6. flashback to resolveTS
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -51,7 +57,11 @@ func RecoverData(ctx context.Context, resolvedTs uint64, allStores []*metapb.Sto
return totalRegions, errors.Trace(err)
}

if err := recovery.ResolveData(ctx, resolvedTs); err != nil {
if err := recovery.PrepareFlashbackToVersion(ctx); err != nil {
return totalRegions, errors.Trace(err)
}

if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
return totalRegions, errors.Trace(err)
}

Expand All @@ -70,33 +80,36 @@ func NewStoreMeta(storeId uint64) StoreMeta {

// for test
type Recovery struct {
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
totalFlashbackRegions uint64
}

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress) Recovery {
func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
totalStores := len(allStores)
var StoreMetas = make([]StoreMeta, totalStores)
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
return Recovery{
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress}
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency,
totalFlashbackRegions: 0}
}

func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
// Connect to the Recovery service on the given TiKV node.
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
//TODO: connection may need some adjust
//keepaliveConf keepalive.ClientParameters

conn, err := utils.GRPCConn(ctx, storeAddr, recovery.mgr.GetTLSConfig(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(recovery.mgr.GetKeepalive()),
Expand Down Expand Up @@ -190,8 +203,6 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
return eg.Wait()
}

// TODO: map may be more suitable for this function

func (recovery *Recovery) GetTotalRegions() int {
// Group region peer info by region id.
var regions = make(map[uint64]struct{}, 0)
Expand Down Expand Up @@ -292,51 +303,65 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
return eg.Wait()
}

// ResolveData a worker pool to all tikv for execute delete all data whose has ts > resolvedTs
func (recovery *Recovery) ResolveData(ctx context.Context, resolvedTs uint64) (err error) {
eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, common.MaxStoreConcurrency)), "resolve data from tikv")
// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context) (err error) {

// TODO: what if the resolved data take long time take long time?, it look we need some handling here, at least some retry may necessary
// TODO: what if the network disturbing, a retry machanism may need here
for _, store := range recovery.allStores {
if err := ectx.Err(); err != nil {
break
}
storeAddr := getStoreAddress(recovery.allStores, store.Id)
storeId := store.Id
workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("resolve data to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
req := &recovpb.ResolveKvDataRequest{ResolvedTs: resolvedTs}
stream, err := recoveryClient.ResolveKvData(ectx, req)
if err != nil {
log.Error("send the resolve kv data failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}
// for a TiKV, received the stream
for {
var resp *recovpb.ResolveKvDataResponse
if resp, err = stream.Recv(); err == nil {
log.Info("current delete key", zap.Uint64("resolved key num", resp.ResolvedKeyCount), zap.Uint64("store id", resp.StoreId))
} else if err == io.EOF {
break
} else {
return errors.Trace(err)
}
}
recovery.progress.Inc()
log.Info("resolve kv data done", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
return nil
})
var totalRegions atomic.Uint64
totalRegions.Store(0)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()

runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Error("region flashback prepare get error")
return errors.Trace(err)
}

recovery.totalFlashbackRegions = totalRegions.Load()
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))

return nil
}

// flashback the region data to version resolveTS
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) {

var completedRegions atomic.Uint64

// only know the total progress of tikv, progress is total state of the whole restore flow.
ratio := int(recovery.totalFlashbackRegions) / len(recovery.allStores)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

runner := rangetask.NewRangeTaskRunner("br-flashback-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Error("region flashback get error",
zap.Uint64("resolveTS", resolveTS),
zap.Uint64("commitTS", commitTS),
zap.Int("regions", runner.CompletedRegions()))
return errors.Trace(err)
}

recovery.progress.IncBy(int64(completedRegions.Load()) / int64(ratio))

log.Info("region flashback complete",
zap.Uint64("resolveTS", resolveTS),
zap.Uint64("commitTS", commitTS),
zap.Int("regions", runner.CompletedRegions()))

return nil
}

type RecoverRegion struct {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func createDataSuite(t *testing.T) *testData {

fakeProgress := mockGlue.StartProgress(ctx, "Restore Data", int64(numOnlineStore*3), false)

var recovery = restore.NewRecovery(createStores(), mockMgr, fakeProgress)
var recovery = restore.NewRecovery(createStores(), mockMgr, fakeProgress, 64)
tikvClient.Close()
return &testData{
ctx: ctx,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
// Step.2 starts call ebs snapshot api to back up volume data.
// NOTE: we should start snapshot in specify order.

progress := g.StartProgress(ctx, "backup", int64(storeCount), !cfg.LogProgress)
progress := g.StartProgress(ctx, "backup", int64(storeCount)*100, !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(storeCount)*100, cfg.ProgressFile)

ec2Session, err := aws.NewEC2Session(cfg.CloudAPIConcurrency)
Expand Down
12 changes: 7 additions & 5 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}

// read the backup meta resolved ts and total tikvs from backup storage
var resolveTs uint64
var resolveTS uint64
_, externStorage, err := GetStorage(ctx, cfg.Config.Storage, &cfg.Config)
if err != nil {
return errors.Trace(err)
}

resolveTs, numBackupStore, err := ReadBackupMetaData(ctx, externStorage)
resolveTS, numBackupStore, err := ReadBackupMetaData(ctx, externStorage)
if err != nil {
return errors.Trace(err)
}
summary.CollectUint("resolve-ts", resolveTs)
summary.CollectUint("resolve-ts", resolveTS)

keepaliveCfg := GetKeepalive(&cfg.Config)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, false, conn.NormalVersionChecker)
Expand All @@ -85,6 +85,8 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
ID: utils.MakeSafePointID(),
}

// TODO: since data restore does not have tidb up, it looks we can remove this keeper
// it requires to do more test, then remove this part of code.
err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -131,14 +133,14 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}

log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile))
// progress = read meta + send recovery + iterate tikv + resolve kv data.
// progress = read meta + send recovery + iterate tikv + flashback.
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*4), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*4), cfg.ProgressFile)

// restore tikv data from a snapshot volume
var totalRegions int

totalRegions, err = restore.RecoverData(ctx, resolveTs, allStores, mgr, progress)
totalRegions, err = restore.RecoverData(ctx, resolveTS, allStores, mgr, progress, restoreTS, cfg.Concurrency)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 8 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
return keyRanges, nil
}

func sendPrepareFlashbackToVersionRPC(
// SendPrepareFlashbackToVersionRPC prepares regions for flashback, the purpose is to put region into flashback state which region stop write
// Function also be called by BR for volume snapshot backup and restore
func SendPrepareFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
r tikvstore.KeyRange,
Expand Down Expand Up @@ -324,7 +326,9 @@ func sendPrepareFlashbackToVersionRPC(
return taskStat, nil
}

func sendFlashbackToVersionRPC(
// SendFlashbackToVersionRPC flashback the MVCC key to the version
// Function also be called by BR for volume snapshot backup and restore
func SendFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
version uint64,
Expand Down Expand Up @@ -524,7 +528,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r)
stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}, r.StartKey, r.EndKey); err != nil {
Expand Down Expand Up @@ -560,7 +564,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err = flashbackToVersion(d.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use commitTS - 1 as startTS, make sure it less than commitTS.
stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r)
stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
logutil.BgLogger().Info("[ddl] flashback cluster stats",
zap.Uint64("complete regions", completedRegions.Load()),
Expand Down