Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support send flashback cluster RPC #37659

Merged
merged 12 commits into from
Sep 9, 2022
Merged
219 changes: 196 additions & 23 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package ddl

import (
"bytes"
"context"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -27,10 +32,18 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

Expand All @@ -42,6 +55,14 @@ var pdScheduleKey = []string{
"replica-schedule-limit",
}

const (
flashbackMaxBackoff = 300000 // 300s
flashbackTimeout = 30 * time.Second // 30s

readOnlyArgsOffset = 2
gcEnabledArgsOffset = 3
)

func closePDSchedule() error {
closeMap := make(map[string]interface{})
for _, key := range pdScheduleKey {
Expand Down Expand Up @@ -96,6 +117,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func setTiDBSuperReadOnly(sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBSuperReadOnly, value)
}

func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand All @@ -107,6 +140,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err = closePDSchedule(); err != nil {
return err
}
if err = setTiDBSuperReadOnly(sess, variable.On); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
Expand Down Expand Up @@ -223,17 +259,123 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa
return keyRanges, nil
}

func sendFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
version uint64,
r tikvstore.KeyRange,
) (rangetask.TaskStat, error) {
startKey, rangeEndKey := r.StartKey, r.EndKey
var taskStat rangetask.TaskStat
for {
select {
case <-ctx.Done():
return taskStat, errors.WithStack(ctx.Err())
default:
}

if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 {
break
}

bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff)
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return taskStat, err
}

endKey := loc.EndKey
isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0)
// If it is the last region
if isLast {
endKey = rangeEndKey
}

req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set MaxExecutionDurationMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version: version,
StartKey: startKey,
EndKey: endKey,
})

resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
if err != nil {
return taskStat, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return taskStat, err
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String()))
if err != nil {
return taskStat, err
}
continue
}
if resp.Resp == nil {
return taskStat, errors.WithStack(tikverr.ErrBodyMissing)
}
flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse)
if err := flashbackToVersionResp.GetError(); err != "" {
return taskStat, errors.Errorf("unexpected flashback to version err: %v", err)
}
taskStat.CompletedRegions++
if isLast {
break
}
startKey = endKey
}
return taskStat, nil
}

func flashbackToVersion(
ctx context.Context,
d *ddlCtx,
version uint64,
startKey []byte, endKey []byte,
) (err error) {
return rangetask.NewRangeTaskRunner(
"flashback-to-version-runner",
d.store.(tikv.Storage),
int(variable.GetDDLFlashbackConcurrency()),
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r)
},
).RunOnRange(ctx, startKey, endKey)
}

// A Flashback has 3 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before flashback start, check timestamp, disable GC and close PD schedule.
// 3. before flashback done, get key ranges, send flashback RPC.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) {
if val.(bool) {
inFlashbackTest = true
}
})
// TODO: Support flashback in unistore.
if d.store.Name() != "TiKV" && !inFlashbackTest {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Not support flashback cluster in non-TiKV env")
}

var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
var readOnlyValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
defer w.sessPool.put(sess)

switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and save the PD schedule.
Expand All @@ -255,6 +397,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
readOnlyValue, err = getTiDBSuperReadOnly(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[readOnlyArgsOffset] = &readOnlyValue
gcEnableValue, err := gcutil.CheckGCEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[gcEnabledArgsOffset] = &gcEnableValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do it here? Or maybe we can do it in StateWriteOnly

Copy link
Contributor Author

@Defined2014 Defined2014 Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should store it in the first stage, like onRecoverTable func.

tidb/ddl/table.go

Lines 430 to 453 in e75a079

// 1. Check GC enable status, to decided whether enable GC after recover table.
// a. Why not disable GC before put the job to DDL job queue?
// Think about concurrency problem. If a recover job-1 is doing and already disabled GC,
// then, another recover table job-2 check GC enable will get disable before into the job queue.
// then, after recover table job-2 finished, the GC will be disabled.
// b. Why split into 2 steps? 1 step also can finish this job: check GC -> disable GC -> recover table -> finish job.
// What if the transaction commit failed? then, the job will retry, but the GC already disabled when first running.
// So, after this job retry succeed, the GC will be disabled.
// 2. Do recover table job.
// a. Check whether GC enabled, if enabled, disable GC first.
// b. Check GC safe point. If drop table time if after safe point time, then can do recover.
// otherwise, can't recover table, because the records of the table may already delete by gc.
// c. Remove GC task of the table from gc_delete_range table.
// d. Create table and rebase table auto ID.
// e. Finish.
switch tblInfo.State {
case model.StateNone:
// none -> write only
// check GC enable and update flag.
if gcEnable {
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagEnableGC
} else {
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but I think putting it to model.StateWriteOnly is also OK. But it should be a little bit easier to understand here

} else {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
Expand All @@ -263,31 +417,39 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
case model.StateWriteOnly:
sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
defer w.sessPool.put(sess)
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// A hack way to make global variables are synchronized to all TiDB.
// TiKV will block read/write requests during flashback cluster.
// So it's not very dangerous when sync failed.
time.Sleep(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here is 1s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the comments, just let most TiDB synced this global variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1s just a magic number. I found the global variable will synced by ETCD, so the time cost will be very small

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

job.SchemaState = model.StateWriteReorganization
return ver, nil
// Stage 3, get key ranges.
case model.StateWriteReorganization:
sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
}
defer w.sessPool.put(sess)
_, err = GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0))

keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0))
if err != nil {
return ver, errors.Trace(err)
}

for _, ranges := range keyRanges {
if err = flashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil {
logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err))
return ver, err
}
}

asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
Expand All @@ -298,27 +460,38 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
func finishFlashbackCluster(w *worker, job *model.Job) error {
var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
var readOnlyValue string
var gcEnabled bool
var jobID int64

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil {
return errors.Trace(err)
}
sess, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(sess)

err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
jobID, err = t.GetFlashbackClusterJobID()
if err != nil {
return err
}
if jobID == job.ID {
if pdScheduleValue != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
if err = enableGC(w); err != nil {
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil {
return err
}
err = t.SetFlashbackClusterJobID(0)
if err != nil {
if gcEnabled {
if err = gcutil.EnableGC(sess); err != nil {
return err
}
}
if err = t.SetFlashbackClusterJobID(0); err != nil {
return err
}
}
Expand Down
Loading