-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support send flashback cluster RPC #37659
Changes from 10 commits
8ed1f0f
7465000
0d11414
c653c51
e924564
b2409e3
54b8f81
29d46d4
411885b
2ded02f
8f4bc12
250258f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -15,10 +15,15 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
package ddl | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||
"bytes" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"strings" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/failpoint" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/kvproto/pkg/kvrpcpb" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/ddl/util" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/domain/infosync" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/expression" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/infoschema" | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -27,10 +32,18 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/metrics" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/parser/model" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/sessionctx" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/sessionctx/variable" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/tablecodec" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/util/filter" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/util/gcutil" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/pingcap/tidb/util/logutil" | ||||||||||||||||||||||||||||||||||||||||||||||||||
tikverr "github.com/tikv/client-go/v2/error" | ||||||||||||||||||||||||||||||||||||||||||||||||||
tikvstore "github.com/tikv/client-go/v2/kv" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/tikv/client-go/v2/oracle" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/tikv/client-go/v2/tikv" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/tikv/client-go/v2/tikvrpc" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"github.com/tikv/client-go/v2/txnkv/rangetask" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"go.uber.org/zap" | ||||||||||||||||||||||||||||||||||||||||||||||||||
"golang.org/x/exp/slices" | ||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -42,6 +55,14 @@ var pdScheduleKey = []string{ | |||||||||||||||||||||||||||||||||||||||||||||||||
"replica-schedule-limit", | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
const ( | ||||||||||||||||||||||||||||||||||||||||||||||||||
flashbackMaxBackoff = 300000 // 300s | ||||||||||||||||||||||||||||||||||||||||||||||||||
flashbackTimeout = 30 * time.Second // 30s | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
readOnlyArgsOffset = 2 | ||||||||||||||||||||||||||||||||||||||||||||||||||
gcEnabledArgsOffset = 3 | ||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
func closePDSchedule() error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
closeMap := make(map[string]interface{}) | ||||||||||||||||||||||||||||||||||||||||||||||||||
for _, key := range pdScheduleKey { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -96,6 +117,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack | |||||||||||||||||||||||||||||||||||||||||||||||||
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
func setTiDBSuperReadOnly(sess sessionctx.Context, value string) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBSuperReadOnly, value) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return "", errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
return val, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -107,6 +140,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta | |||||||||||||||||||||||||||||||||||||||||||||||||
if err = closePDSchedule(); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = setTiDBSuperReadOnly(sess, variable.On); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
nowSchemaVersion, err := t.GetSchemaVersion() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -223,17 +259,123 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa | |||||||||||||||||||||||||||||||||||||||||||||||||
return keyRanges, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
func sendFlashbackToVersionRPC( | ||||||||||||||||||||||||||||||||||||||||||||||||||
ctx context.Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||
s tikv.Storage, | ||||||||||||||||||||||||||||||||||||||||||||||||||
version uint64, | ||||||||||||||||||||||||||||||||||||||||||||||||||
r tikvstore.KeyRange, | ||||||||||||||||||||||||||||||||||||||||||||||||||
) (rangetask.TaskStat, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
startKey, rangeEndKey := r.StartKey, r.EndKey | ||||||||||||||||||||||||||||||||||||||||||||||||||
var taskStat rangetask.TaskStat | ||||||||||||||||||||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, errors.WithStack(ctx.Err()) | ||||||||||||||||||||||||||||||||||||||||||||||||||
default: | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) | ||||||||||||||||||||||||||||||||||||||||||||||||||
loc, err := s.GetRegionCache().LocateKey(bo, startKey) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
endKey := loc.EndKey | ||||||||||||||||||||||||||||||||||||||||||||||||||
isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
// If it is the last region | ||||||||||||||||||||||||||||||||||||||||||||||||||
if isLast { | ||||||||||||||||||||||||||||||||||||||||||||||||||
endKey = rangeEndKey | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ | ||||||||||||||||||||||||||||||||||||||||||||||||||
Version: version, | ||||||||||||||||||||||||||||||||||||||||||||||||||
StartKey: startKey, | ||||||||||||||||||||||||||||||||||||||||||||||||||
EndKey: endKey, | ||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
regionErr, err := resp.GetRegionError() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if regionErr != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
continue | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if resp.Resp == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, errors.WithStack(tikverr.ErrBodyMissing) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err := flashbackToVersionResp.GetError(); err != "" { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, errors.Errorf("unexpected flashback to version err: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
taskStat.CompletedRegions++ | ||||||||||||||||||||||||||||||||||||||||||||||||||
if isLast { | ||||||||||||||||||||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
startKey = endKey | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
return taskStat, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
func flashbackToVersion( | ||||||||||||||||||||||||||||||||||||||||||||||||||
ctx context.Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||
d *ddlCtx, | ||||||||||||||||||||||||||||||||||||||||||||||||||
version uint64, | ||||||||||||||||||||||||||||||||||||||||||||||||||
startKey []byte, endKey []byte, | ||||||||||||||||||||||||||||||||||||||||||||||||||
) (err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return rangetask.NewRangeTaskRunner( | ||||||||||||||||||||||||||||||||||||||||||||||||||
"flashback-to-version-runner", | ||||||||||||||||||||||||||||||||||||||||||||||||||
d.store.(tikv.Storage), | ||||||||||||||||||||||||||||||||||||||||||||||||||
int(variable.GetDDLFlashbackConcurrency()), | ||||||||||||||||||||||||||||||||||||||||||||||||||
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) | ||||||||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||||||||
).RunOnRange(ctx, startKey, endKey) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// A Flashback has 3 different stages. | ||||||||||||||||||||||||||||||||||||||||||||||||||
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it. | ||||||||||||||||||||||||||||||||||||||||||||||||||
// 2. before flashback start, check timestamp, disable GC and close PD schedule. | ||||||||||||||||||||||||||||||||||||||||||||||||||
// 3. before flashback done, get key ranges, send flashback RPC. | ||||||||||||||||||||||||||||||||||||||||||||||||||
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
inFlashbackTest := false | ||||||||||||||||||||||||||||||||||||||||||||||||||
failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if val.(bool) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
inFlashbackTest = true | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO support flashback in unistore | ||||||||||||||||||||||||||||||||||||||||||||||||||
if d.store.Name() != "TiKV" && !inFlashbackTest { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
var flashbackTS uint64 | ||||||||||||||||||||||||||||||||||||||||||||||||||
var pdScheduleValue map[string]interface{} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
var readOnlyValue string | ||||||||||||||||||||||||||||||||||||||||||||||||||
var gcEnabledValue bool | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
sess, err := w.sessPool.get() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
defer w.sessPool.put(sess) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
switch job.SchemaState { | ||||||||||||||||||||||||||||||||||||||||||||||||||
// Stage 1, check and set FlashbackClusterJobID, and save the PD schedule. | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -255,6 +397,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve | |||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
readOnlyValue, err = getTiDBSuperReadOnly(sess) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.Args[readOnlyArgsOffset] = &readOnlyValue | ||||||||||||||||||||||||||||||||||||||||||||||||||
gcEnableValue, err := gcutil.CheckGCEnable(sess) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.Args[gcEnabledArgsOffset] = &gcEnableValue | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to do it here? Or maybe we can do it in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should store it in the first stage, like Lines 430 to 453 in e75a079
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, but I think putting it to |
||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -263,31 +417,39 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve | |||||||||||||||||||||||||||||||||||||||||||||||||
return ver, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
// Stage 2, check flashbackTS, close GC and PD schedule. | ||||||||||||||||||||||||||||||||||||||||||||||||||
case model.StateWriteOnly: | ||||||||||||||||||||||||||||||||||||||||||||||||||
sess, err := w.sessPool.get() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
defer w.sessPool.put(sess) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
// A hack way to make global variables are synchronized to all tidb. | ||||||||||||||||||||||||||||||||||||||||||||||||||
Defined2014 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// TiKV will block read/write request during flashback cluster, | ||||||||||||||||||||||||||||||||||||||||||||||||||
Defined2014 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||
// So it's not very dangerous when sync failed. | ||||||||||||||||||||||||||||||||||||||||||||||||||
time.Sleep(1 * time.Second) | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why here is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can see the comments, just let most TiDB synced this global variable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. |
||||||||||||||||||||||||||||||||||||||||||||||||||
job.SchemaState = model.StateWriteReorganization | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
// Stage 3, get key ranges. | ||||||||||||||||||||||||||||||||||||||||||||||||||
case model.StateWriteReorganization: | ||||||||||||||||||||||||||||||||||||||||||||||||||
sess, err := w.sessPool.get() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateCancelled | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO support flashback in unistore | ||||||||||||||||||||||||||||||||||||||||||||||||||
Defined2014 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if inFlashbackTest { | ||||||||||||||||||||||||||||||||||||||||||||||||||
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateDone | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.SchemaState = model.StatePublic | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
defer w.sessPool.put(sess) | ||||||||||||||||||||||||||||||||||||||||||||||||||
_, err = GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
for _, ranges := range keyRanges { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = flashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.State = model.JobStateDone | ||||||||||||||||||||||||||||||||||||||||||||||||||
job.SchemaState = model.StatePublic | ||||||||||||||||||||||||||||||||||||||||||||||||||
return ver, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -298,27 +460,38 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve | |||||||||||||||||||||||||||||||||||||||||||||||||
func finishFlashbackCluster(w *worker, job *model.Job) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
var flashbackTS uint64 | ||||||||||||||||||||||||||||||||||||||||||||||||||
var pdScheduleValue map[string]interface{} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
var readOnlyValue string | ||||||||||||||||||||||||||||||||||||||||||||||||||
var gcEnabled bool | ||||||||||||||||||||||||||||||||||||||||||||||||||
var jobID int64 | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
sess, err := w.sessPool.get() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return errors.Trace(err) | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
defer w.sessPool.put(sess) | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||
t := meta.NewMeta(txn) | ||||||||||||||||||||||||||||||||||||||||||||||||||
jobID, err := t.GetFlashbackClusterJobID() | ||||||||||||||||||||||||||||||||||||||||||||||||||
jobID, err = t.GetFlashbackClusterJobID() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if jobID == job.ID { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if pdScheduleValue != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = recoverPDSchedule(pdScheduleValue); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = recoverPDSchedule(pdScheduleValue); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = enableGC(w); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
err = t.SetFlashbackClusterJobID(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if gcEnabled { | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = gcutil.EnableGC(sess); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
if err = t.SetFlashbackClusterJobID(0); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set
MaxExecutionDurationMs
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be set to
timeout
in client-go.https://github.com/tikv/client-go/blob/426055b5755b4a64df7b92b278aee6e65479d53c/internal/locate/region_request.go#L962-L965