Skip to content

Commit

Permalink
*: support send flashback cluster RPC (#37659)
Browse files Browse the repository at this point in the history
close #37651, close #37665
  • Loading branch information
Defined2014 authored Sep 9, 2022
1 parent 3f0cea3 commit d86e795
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 37 deletions.
219 changes: 196 additions & 23 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package ddl

import (
"bytes"
"context"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -27,10 +32,18 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

Expand All @@ -42,6 +55,14 @@ var pdScheduleKey = []string{
"replica-schedule-limit",
}

const (
flashbackMaxBackoff = 300000 // 300s
flashbackTimeout = 30 * time.Second // 30s

readOnlyArgsOffset = 2
gcEnabledArgsOffset = 3
)

func closePDSchedule() error {
closeMap := make(map[string]interface{})
for _, key := range pdScheduleKey {
Expand Down Expand Up @@ -96,6 +117,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func setTiDBSuperReadOnly(sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBSuperReadOnly, value)
}

func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand All @@ -107,6 +140,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err = closePDSchedule(); err != nil {
return err
}
if err = setTiDBSuperReadOnly(sess, variable.On); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
Expand Down Expand Up @@ -223,17 +259,123 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa
return keyRanges, nil
}

func sendFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
version uint64,
r tikvstore.KeyRange,
) (rangetask.TaskStat, error) {
startKey, rangeEndKey := r.StartKey, r.EndKey
var taskStat rangetask.TaskStat
for {
select {
case <-ctx.Done():
return taskStat, errors.WithStack(ctx.Err())
default:
}

if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 {
break
}

bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff)
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return taskStat, err
}

endKey := loc.EndKey
isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0)
// If it is the last region
if isLast {
endKey = rangeEndKey
}

req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{
Version: version,
StartKey: startKey,
EndKey: endKey,
})

resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
if err != nil {
return taskStat, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return taskStat, err
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String()))
if err != nil {
return taskStat, err
}
continue
}
if resp.Resp == nil {
return taskStat, errors.WithStack(tikverr.ErrBodyMissing)
}
flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse)
if err := flashbackToVersionResp.GetError(); err != "" {
return taskStat, errors.Errorf("unexpected flashback to version err: %v", err)
}
taskStat.CompletedRegions++
if isLast {
break
}
startKey = endKey
}
return taskStat, nil
}

func flashbackToVersion(
ctx context.Context,
d *ddlCtx,
version uint64,
startKey []byte, endKey []byte,
) (err error) {
return rangetask.NewRangeTaskRunner(
"flashback-to-version-runner",
d.store.(tikv.Storage),
int(variable.GetDDLFlashbackConcurrency()),
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r)
},
).RunOnRange(ctx, startKey, endKey)
}

// A Flashback has 3 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before flashback start, check timestamp, disable GC and close PD schedule.
// 3. before flashback done, get key ranges, send flashback RPC.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) {
if val.(bool) {
inFlashbackTest = true
}
})
// TODO: Support flashback in unistore.
if d.store.Name() != "TiKV" && !inFlashbackTest {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Not support flashback cluster in non-TiKV env")
}

var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
var readOnlyValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
defer w.sessPool.put(sess)

switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and save the PD schedule.
Expand All @@ -255,6 +397,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
readOnlyValue, err = getTiDBSuperReadOnly(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[readOnlyArgsOffset] = &readOnlyValue
gcEnableValue, err := gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[gcEnabledArgsOffset] = &gcEnableValue
} else {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
Expand All @@ -263,31 +417,39 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
case model.StateWriteOnly:
sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
defer w.sessPool.put(sess)
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// A hack way to make global variables are synchronized to all TiDB.
// TiKV will block read/write requests during flashback cluster.
// So it's not very dangerous when sync failed.
time.Sleep(1 * time.Second)
job.SchemaState = model.StateWriteReorganization
return ver, nil
// Stage 3, get key ranges.
case model.StateWriteReorganization:
sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
}
defer w.sessPool.put(sess)
_, err = GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0))

keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0))
if err != nil {
return ver, errors.Trace(err)
}

for _, ranges := range keyRanges {
if err = flashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil {
logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err))
return ver, err
}
}

asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
Expand All @@ -298,27 +460,38 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
func finishFlashbackCluster(w *worker, job *model.Job) error {
var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
var readOnlyValue string
var gcEnabled bool
var jobID int64

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil {
return errors.Trace(err)
}
sess, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(sess)

err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
jobID, err = t.GetFlashbackClusterJobID()
if err != nil {
return err
}
if jobID == job.ID {
if pdScheduleValue != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
if err = enableGC(w); err != nil {
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil {
return err
}
err = t.SetFlashbackClusterJobID(0)
if err != nil {
if gcEnabled {
if err = gcutil.EnableGC(sess); err != nil {
return err
}
}
if err = t.SetFlashbackClusterJobID(0); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit d86e795

Please sign in to comment.