diff --git a/ddl/cluster.go b/ddl/cluster.go index e3d7363304ab5..55208c4f4c74f 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -15,10 +15,15 @@ package ddl import ( + "bytes" "context" "strings" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -27,10 +32,18 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tidb/util/gcutil" + "github.com/pingcap/tidb/util/logutil" + tikverr "github.com/tikv/client-go/v2/error" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/txnkv/rangetask" + "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -42,6 +55,14 @@ var pdScheduleKey = []string{ "replica-schedule-limit", } +const ( + flashbackMaxBackoff = 300000 // 300s + flashbackTimeout = 30 * time.Second // 30s + + readOnlyArgsOffset = 2 + gcEnabledArgsOffset = 3 +) + func closePDSchedule() error { closeMap := make(map[string]interface{}) for _, key := range pdScheduleKey { @@ -96,6 +117,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint) } +func setTiDBSuperReadOnly(sess sessionctx.Context, value string) error { + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBSuperReadOnly, value) +} + +func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) { + val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly) + if err != nil { + return "", errors.Trace(err) + } + return val, nil +} + func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil { return err @@ -107,6 +140,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = closePDSchedule(); err != nil { return err } + if err = setTiDBSuperReadOnly(sess, variable.On); err != nil { + return err + } nowSchemaVersion, err := t.GetSchemaVersion() if err != nil { @@ -223,17 +259,123 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa return keyRanges, nil } +func sendFlashbackToVersionRPC( + ctx context.Context, + s tikv.Storage, + version uint64, + r tikvstore.KeyRange, +) (rangetask.TaskStat, error) { + startKey, rangeEndKey := r.StartKey, r.EndKey + var taskStat rangetask.TaskStat + for { + select { + case <-ctx.Done(): + return taskStat, errors.WithStack(ctx.Err()) + default: + } + + if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 { + break + } + + bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) + loc, err := s.GetRegionCache().LocateKey(bo, startKey) + if err != nil { + return taskStat, err + } + + endKey := loc.EndKey + isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) + // If it is the last region + if isLast { + endKey = rangeEndKey + } + + req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ + Version: version, + StartKey: startKey, + EndKey: endKey, + }) + + resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) + if err != nil { + return taskStat, err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return taskStat, err + } + if regionErr != nil { + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + if err != nil { + return taskStat, err + } + continue + } + if resp.Resp == nil { + return taskStat, errors.WithStack(tikverr.ErrBodyMissing) + } + flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse) + if err := flashbackToVersionResp.GetError(); err != "" { + return taskStat, errors.Errorf("unexpected flashback to version err: %v", err) + } + taskStat.CompletedRegions++ + if isLast { + break + } + startKey = endKey + } + return taskStat, nil +} + +func flashbackToVersion( + ctx context.Context, + d *ddlCtx, + version uint64, + startKey []byte, endKey []byte, +) (err error) { + return rangetask.NewRangeTaskRunner( + "flashback-to-version-runner", + d.store.(tikv.Storage), + int(variable.GetDDLFlashbackConcurrency()), + func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) + }, + ).RunOnRange(ctx, startKey, endKey) +} + // A Flashback has 3 different stages. // 1. before lock flashbackClusterJobID, check clusterJobID and lock it. // 2. before flashback start, check timestamp, disable GC and close PD schedule. // 3. before flashback done, get key ranges, send flashback RPC. func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + inFlashbackTest := false + failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) { + if val.(bool) { + inFlashbackTest = true + } + }) + // TODO: Support flashback in unistore. + if d.store.Name() != "TiKV" && !inFlashbackTest { + job.State = model.JobStateCancelled + return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") + } + var flashbackTS uint64 var pdScheduleValue map[string]interface{} - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil { + var readOnlyValue string + var gcEnabledValue bool + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + sess, err := w.sessPool.get() + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + defer w.sessPool.put(sess) switch job.SchemaState { // Stage 1, check and set FlashbackClusterJobID, and save the PD schedule. @@ -255,6 +397,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } + readOnlyValue, err = getTiDBSuperReadOnly(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[readOnlyArgsOffset] = &readOnlyValue + gcEnableValue, err := gcutil.CheckGCEnable(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[gcEnabledArgsOffset] = &gcEnableValue } else { job.State = model.JobStateCancelled return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) @@ -263,31 +417,39 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. case model.StateWriteOnly: - sess, err := w.sessPool.get() - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - defer w.sessPool.put(sess) if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + // A hack way to make global variables are synchronized to all TiDB. + // TiKV will block read/write requests during flashback cluster. + // So it's not very dangerous when sync failed. + time.Sleep(1 * time.Second) job.SchemaState = model.StateWriteReorganization return ver, nil // Stage 3, get key ranges. case model.StateWriteReorganization: - sess, err := w.sessPool.get() - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) + // TODO: Support flashback in unistore. + if inFlashbackTest { + asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) + job.State = model.JobStateDone + job.SchemaState = model.StatePublic + return ver, nil } - defer w.sessPool.put(sess) - _, err = GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + + keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) } + for _, ranges := range keyRanges { + if err = flashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { + logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) + return ver, err + } + } + + asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) job.State = model.JobStateDone job.SchemaState = model.StatePublic return ver, nil @@ -298,27 +460,38 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve func finishFlashbackCluster(w *worker, job *model.Job) error { var flashbackTS uint64 var pdScheduleValue map[string]interface{} - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil { + var readOnlyValue string + var gcEnabled bool + var jobID int64 + + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil { return errors.Trace(err) } + sess, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(sess) - err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { + err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - jobID, err := t.GetFlashbackClusterJobID() + jobID, err = t.GetFlashbackClusterJobID() if err != nil { return err } if jobID == job.ID { - if pdScheduleValue != nil { - if err = recoverPDSchedule(pdScheduleValue); err != nil { - return err - } + if err = recoverPDSchedule(pdScheduleValue); err != nil { + return err } - if err = enableGC(w); err != nil { + if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { return err } - err = t.SetFlashbackClusterJobID(0) - if err != nil { + if gcEnabled { + if err = gcutil.EnableGC(sess); err != nil { + return err + } + } + if err = t.SetFlashbackClusterJobID(0); err != nil { return err } } diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 0621ae64dcfb7..6fa7ace0c0b8c 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/dbterror" @@ -116,6 +117,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { tk := testkit.NewTestKit(t, store) injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", @@ -124,7 +126,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { oldValue := map[string]interface{}{ "hot-region-schedule-limit": 1, } - infosync.SetPDScheduleConfig(context.Background(), oldValue) + require.NoError(t, infosync.SetPDScheduleConfig(context.Background(), oldValue)) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -154,6 +156,71 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { require.NoError(t, err) require.EqualValues(t, finishValue["hot-region-schedule-limit"], 1) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) +} + +func TestGlobalVariablesOnFlashback(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + originHook := dom.DDL().GetHook() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobRunBeforeExported = func(job *model.Job) { + assert.Equal(t, model.ActionFlashbackCluster, job.Type) + if job.SchemaState == model.StateWriteReorganization { + rs, err := tk.Exec("show variables like 'tidb_super_read_only'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + } + } + dom.DDL().SetHook(hook) + // first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off + tk.MustExec("set global tidb_gc_enable = on") + tk.MustExec("set global tidb_super_read_only = off") + + tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) + rs, err := tk.Exec("show variables like 'tidb_super_read_only'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + + // second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on + tk.MustExec("set global tidb_gc_enable = off") + tk.MustExec("set global tidb_super_read_only = on") + + tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) + rs, err = tk.Exec("show variables like 'tidb_super_read_only'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + + dom.DDL().SetHook(originHook) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } @@ -166,6 +233,7 @@ func TestCancelFlashbackCluster(t *testing.T) { require.NoError(t, err) injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", @@ -193,6 +261,7 @@ func TestCancelFlashbackCluster(t *testing.T) { dom.DDL().SetHook(originHook) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 715db0b32598b..2554dcfa7ccad 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2646,7 +2646,12 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error job := &model.Job{ Type: model.ActionFlashbackCluster, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{flashbackTS, map[string]interface{}{}}, + // The value for global variables is meaningless, it will cover during flashback cluster. + Args: []interface{}{ + flashbackTS, + map[string]interface{}{}, + variable.On, /* tidb_super_read_only */ + true /* tidb_gc_enable */}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) diff --git a/executor/recover_test.go b/executor/recover_test.go index 8d9f3d169e6be..a1a131771aa36 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -297,10 +297,14 @@ func TestRecoverTableMeetError(t *testing.T) { func TestRecoverClusterMeetError(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + + tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "Not support flashback cluster in non-TiKV env") + ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) flashbackTs := oracle.GetTimeFromTS(ts) injectSafeTS := oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", @@ -333,6 +337,7 @@ func TestRecoverClusterMeetError(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } func TestRecoverClusterWithTiFlash(t *testing.T) { @@ -362,6 +367,8 @@ func TestFlashbackWithSafeTs(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -411,6 +418,7 @@ func TestFlashbackWithSafeTs(t *testing.T) { } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } // MockGC is used to make GC work in the test environment. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index d80d0e0da86a3..3749592152353 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -507,6 +507,10 @@ var defaultSysVars = []*SysVar{ MemQuotaBindingCache.Store(TidbOptInt64(val, DefTiDBMemQuotaBindingCache)) return nil }}, + {Scope: ScopeGlobal, Name: TiDBDDLFlashbackConcurrency, Value: strconv.Itoa(DefTiDBDDLFlashbackConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { + SetDDLFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBDDLFlashbackConcurrency))) + return nil + }}, {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 93908d12ed34c..1ca4eca74e029 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -620,6 +620,32 @@ func TestTiDBCommitterConcurrency(t *testing.T) { require.NoError(t, err) } +func TestTiDBDDLFlashbackConcurrency(t *testing.T) { + sv := GetSysVar(TiDBDDLFlashbackConcurrency) + vars := NewSessionVars() + + newVal := 128 + val, err := sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal) + require.Equal(t, val, "128") + require.NoError(t, err) + + // out of range + newVal = MaxConfigurableConcurrency + 1 + expected := MaxConfigurableConcurrency + val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal) + // expected to truncate + require.Equal(t, val, fmt.Sprintf("%d", expected)) + require.NoError(t, err) + + // min value out of range + newVal = 0 + expected = 1 + val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal) + // expected to set to min value + require.Equal(t, val, fmt.Sprintf("%d", expected)) + require.NoError(t, err) +} + func TestDefaultMemoryDebugModeValue(t *testing.T) { vars := NewSessionVars() val, err := vars.GetSessionOrGlobalSystemVar(TiDBMemoryDebugModeMinHeapInUse) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 46c3df00a15a8..5d0af4cff50a4 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -447,6 +447,9 @@ const ( // TiDBDDLReorgWorkerCount defines the count of ddl reorg workers. TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" + // TiDBDDLFlashbackConcurrency defines the count of ddl flashback workers. + TiDBDDLFlashbackConcurrency = "tidb_ddl_flashback_concurrency" + // TiDBDDLReorgBatchSize defines the transaction batch size of ddl reorg workers. TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size" @@ -914,6 +917,7 @@ const ( DefTiDBRowFormatV2 = 2 DefTiDBDDLReorgWorkerCount = 4 DefTiDBDDLReorgBatchSize = 256 + DefTiDBDDLFlashbackConcurrency = 64 DefTiDBDDLErrorCountLimit = 512 DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBPlacementMode = PlacementModeStrict @@ -1050,18 +1054,19 @@ const ( // Process global variables. var ( - ProcessGeneralLog = atomic.NewBool(false) - RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) - GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) - QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) - EnablePProfSQLCPU = atomic.NewBool(false) - EnableBatchDML = atomic.NewBool(false) - EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) - ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount - ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize - ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit - ddlReorgRowFormat int64 = DefTiDBRowFormatV2 - maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount + ProcessGeneralLog = atomic.NewBool(false) + RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) + GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) + QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) + EnablePProfSQLCPU = atomic.NewBool(false) + EnableBatchDML = atomic.NewBool(false) + EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency + ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit + ddlReorgRowFormat int64 = DefTiDBRowFormatV2 + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold ForcePriority = int32(DefTiDBForcePriority) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 72b6a8eec2ff3..e0c31623a235c 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -46,6 +46,17 @@ func GetDDLReorgWorkerCounter() int32 { return atomic.LoadInt32(&ddlReorgWorkerCounter) } +// SetDDLFlashbackConcurrency sets ddlFlashbackConcurrency count. +// Sysvar validation enforces the range to already be correct. +func SetDDLFlashbackConcurrency(cnt int32) { + atomic.StoreInt32(&ddlFlashbackConcurrency, cnt) +} + +// GetDDLFlashbackConcurrency gets ddlFlashbackConcurrency count. +func GetDDLFlashbackConcurrency() int32 { + return atomic.LoadInt32(&ddlFlashbackConcurrency) +} + // SetDDLReorgBatchSize sets ddlReorgBatchSize size. // Sysvar validation enforces the range to already be correct. func SetDDLReorgBatchSize(cnt int32) { diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 7e628a34c674a..0bb02d647f86b 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -61,6 +61,8 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error { return err } } + case model.ActionFlashbackCluster: + return h.updateStatsVersion() } return nil } @@ -73,6 +75,38 @@ var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ ast.AnalyzeOptNumTopN: 20, } +// updateStatsVersion will set statistics version to the newest TS, +// then tidb-server will reload automatic. +func (h *Handle) updateStatsVersion() error { + h.mu.Lock() + defer h.mu.Unlock() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(ctx, "begin") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?", startTS); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?", startTS); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set version = %?", startTS); err != nil { + return err + } + + return nil +} + // updateGlobalStats will trigger the merge of global-stats when we drop table partition func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error { // We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode. diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index 969420c9b9c38..d1a250d4397a8 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -6,6 +6,7 @@ go_test( srcs = [ "backup_restore_test.go", "binlog_test.go", + "flashback_test.go", "main_test.go", ], flaky = True, diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go new file mode 100644 index 0000000000000..410c46c36080e --- /dev/null +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -0,0 +1,92 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brietest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/tests/realtikvtest" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +// MockGC is used to make GC work in the test environment. +func MockGC(tk *testkit.TestKit) (string, string, string, func()) { + originGC := ddlutil.IsEmulatorGCEnable() + resetGC := func() { + if originGC { + ddlutil.EmulatorGCEnable() + } else { + ddlutil.EmulatorGCDisable() + } + } + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddlutil.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(gcTimeFormat) + timeAfterDrop := time.Now().Add(48 * 60 * 60 * time.Second).Format(gcTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + // clear GC variables first. + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + return timeBeforeDrop, timeAfterDrop, safePointSQL, resetGC +} + +func TestFlashback(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + } +}