From 8ed1f0f99ce37a1442e29de10db7d9a74068dff6 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 5 Sep 2022 10:25:50 +0800 Subject: [PATCH 01/10] *: flashback support send RPC --- ddl/cluster.go | 112 ++++++++++++++++++++++++++++++- go.mod | 4 +- go.sum | 4 ++ sessionctx/variable/sysvar.go | 4 ++ sessionctx/variable/tidb_vars.go | 29 ++++---- sessionctx/variable/varsutil.go | 11 +++ statistics/handle/ddl.go | 32 +++++++++ 7 files changed, 181 insertions(+), 15 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index e3d7363304ab5..32e85f350bd62 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -15,10 +15,14 @@ package ddl import ( + "bytes" "context" "strings" + "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -27,10 +31,18 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tidb/util/gcutil" + "github.com/pingcap/tidb/util/logutil" + tikverr "github.com/tikv/client-go/v2/error" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/txnkv/rangetask" + "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -42,6 +54,11 @@ var pdScheduleKey = []string{ "replica-schedule-limit", } +const ( + flashbackMaxBackoff = 300000 // 300s + flashbackTimeout = 30 * time.Second // 30s +) + func closePDSchedule() error { closeMap := make(map[string]interface{}) for _, key := range pdScheduleKey { @@ -223,6 +240,91 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa return keyRanges, nil } +func flashbackToVersion( + ctx context.Context, + s tikv.Storage, + version uint64, + r tikvstore.KeyRange, +) (rangetask.TaskStat, error) { + startKey, rangeEndKey := r.StartKey, r.EndKey + var taskStat rangetask.TaskStat + for { + select { + case <-ctx.Done(): + return taskStat, errors.WithStack(ctx.Err()) + default: + } + + if len(rangeEndKey) > 0 && bytes.Compare(startKey, rangeEndKey) >= 0 { + break + } + + bo := tikv.NewBackoffer(ctx, flashbackMaxBackoff) + loc, err := s.GetRegionCache().LocateKey(bo, startKey) + if err != nil { + return taskStat, err + } + + endKey := loc.EndKey + isLast := len(endKey) == 0 || (len(rangeEndKey) > 0 && bytes.Compare(endKey, rangeEndKey) >= 0) + // If it is the last region + if isLast { + endKey = rangeEndKey + } + + req := tikvrpc.NewRequest(tikvrpc.CmdFlashbackToVersion, &kvrpcpb.FlashbackToVersionRequest{ + Version: version, + StartKey: startKey, + EndKey: endKey, + }) + + resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout) + if err != nil { + return taskStat, err + } + regionErr, err := resp.GetRegionError() + if err != nil { + return taskStat, err + } + if regionErr != nil { + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) + if err != nil { + return taskStat, err + } + continue + } + if resp.Resp == nil { + return taskStat, errors.WithStack(tikverr.ErrBodyMissing) + } + flashbackToVersionResp := resp.Resp.(*kvrpcpb.FlashbackToVersionResponse) + if err := flashbackToVersionResp.GetError(); err != "" { + return taskStat, errors.Errorf("unexpected flashback to version err: %v", err) + } + taskStat.CompletedRegions++ + if isLast { + break + } + startKey = endKey + } + return taskStat, nil +} + +func FlashbackToVersion( + ctx context.Context, + d *ddlCtx, + version uint64, + startKey []byte, endKey []byte, +) (err error) { + return rangetask.NewRangeTaskRunner( + "flashback-to-version-runner", + d.store.(tikv.Storage), + int(variable.GetFlashbackConcurrency()), + func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + return flashbackToVersion(ctx, d.store.(tikv.Storage), version, r) + }, + ).RunOnRange(ctx, startKey, endKey) +} + // A Flashback has 3 different stages. // 1. before lock flashbackClusterJobID, check clusterJobID and lock it. // 2. before flashback start, check timestamp, disable GC and close PD schedule. @@ -283,11 +385,19 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, errors.Trace(err) } defer w.sessPool.put(sess) - _, err = GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) + keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) } + for _, ranges := range keyRanges { + if err = FlashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { + logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) + return ver, err + } + } + + asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) job.State = model.JobStateDone job.SchemaState = model.StatePublic return ver, nil diff --git a/go.mod b/go.mod index 1929e002808c1..57839093aaf65 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee + github.com/pingcap/kvproto v0.0.0-20220906053631-2e37953b2b43 github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -80,7 +80,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.1-0.20220830073839-0130f767386c + github.com/tikv/client-go/v2 v2.0.1-0.20220906062821-e425355a106d github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 5c04abdcd04b2..2527950191ad2 100644 --- a/go.sum +++ b/go.sum @@ -761,6 +761,8 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee h1:s1al2ci3MEj5VnNuUCGAfeqpbCxcMeZibOyxw8ClHLE= github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220906053631-2e37953b2b43 h1:5q7Ns0R7q6Uj+fpa3lDTijrcqgId4lNdGa2AG7izB5c= +github.com/pingcap/kvproto v0.0.0-20220906053631-2e37953b2b43/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -915,6 +917,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tikv/client-go/v2 v2.0.1-0.20220830073839-0130f767386c h1:pZoPlKWCecxJKL8oRq/se71RTljYDrQlZQ2NzKkMYi0= github.com/tikv/client-go/v2 v2.0.1-0.20220830073839-0130f767386c/go.mod h1:DqgQZKxPtMRazyELFyj7Ic2iOo+5XGTetxDp2KYnODs= +github.com/tikv/client-go/v2 v2.0.1-0.20220906062821-e425355a106d h1:2MJLD/S7r0zNw2kWbZiqINp0rgsb3ScURE+O4TDnlr0= +github.com/tikv/client-go/v2 v2.0.1-0.20220906062821-e425355a106d/go.mod h1:tkKDJ88lryb16v7FfCh8pvvfwwCkh4aGeSOqHviPaaE= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww= github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 5554e61d746f4..bfdb01b75e378 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -507,6 +507,10 @@ var defaultSysVars = []*SysVar{ MemQuotaBindingCache.Store(TidbOptInt64(val, DefTiDBMemQuotaBindingCache)) return nil }}, + {Scope: ScopeGlobal, Name: TiDBFlashbackConcurrency, Value: strconv.Itoa(DefTiDBFlashbackConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { + SetFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBFlashbackConcurrency))) + return nil + }}, {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 8ace83b279fa4..b2b2cadb1daff 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -444,6 +444,9 @@ const ( // TiDBDDLReorgWorkerCount defines the count of ddl reorg workers. TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" + // TiDBFlashbackConcurrency defines the count of ddl flashback workers. + TiDBFlashbackConcurrency = "tidb_flashback_concurrency" + // TiDBDDLReorgBatchSize defines the transaction batch size of ddl reorg workers. TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size" @@ -902,6 +905,7 @@ const ( DefTiDBRowFormatV2 = 2 DefTiDBDDLReorgWorkerCount = 4 DefTiDBDDLReorgBatchSize = 256 + DefTiDBFlashbackConcurrency = 64 DefTiDBDDLErrorCountLimit = 512 DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBPlacementMode = PlacementModeStrict @@ -1036,18 +1040,19 @@ const ( // Process global variables. var ( - ProcessGeneralLog = atomic.NewBool(false) - RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) - GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) - QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) - EnablePProfSQLCPU = atomic.NewBool(false) - EnableBatchDML = atomic.NewBool(false) - EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) - ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount - ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize - ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit - ddlReorgRowFormat int64 = DefTiDBRowFormatV2 - maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount + ProcessGeneralLog = atomic.NewBool(false) + RunAutoAnalyze = atomic.NewBool(DefTiDBEnableAutoAnalyze) + GlobalLogMaxDays = atomic.NewInt32(int32(config.GetGlobalConfig().Log.File.MaxDays)) + QueryLogMaxLen = atomic.NewInt32(DefTiDBQueryLogMaxLen) + EnablePProfSQLCPU = atomic.NewBool(false) + EnableBatchDML = atomic.NewBool(false) + EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + ddlFlashbackConcurrency int32 = DefTiDBFlashbackConcurrency + ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit + ddlReorgRowFormat int64 = DefTiDBRowFormatV2 + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold ForcePriority = int32(DefTiDBForcePriority) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 72b6a8eec2ff3..c9a1d8e08a97a 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -46,6 +46,17 @@ func GetDDLReorgWorkerCounter() int32 { return atomic.LoadInt32(&ddlReorgWorkerCounter) } +// SetFlashbackConcurrency sets ddlFlashbackConcurrency count. +// Sysvar validation enforces the range to already be correct. +func SetFlashbackConcurrency(cnt int32) { + atomic.StoreInt32(&ddlFlashbackConcurrency, cnt) +} + +// GetFlashbackConcurrency gets ddlFlashbackConcurrency count. +func GetFlashbackConcurrency() int32 { + return atomic.LoadInt32(&ddlFlashbackConcurrency) +} + // SetDDLReorgBatchSize sets ddlReorgBatchSize size. // Sysvar validation enforces the range to already be correct. func SetDDLReorgBatchSize(cnt int32) { diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 7e628a34c674a..6cff4233a70d9 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -61,6 +61,8 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error { return err } } + case model.ActionFlashbackCluster: + return h.updateStatsVersion() } return nil } @@ -73,6 +75,36 @@ var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ ast.AnalyzeOptNumTopN: 20, } +func (h *Handle) updateStatsVersion() error { + h.mu.Lock() + defer h.mu.Unlock() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(ctx, "begin") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?", startTS); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?", startTS); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set version = %?", startTS); err != nil { + return err + } + + return nil +} + // updateGlobalStats will trigger the merge of global-stats when we drop table partition func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error { // We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode. From 7465000b8658c4fe13dfe47479fd5bfd5b3c241d Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 7 Sep 2022 13:38:39 +0800 Subject: [PATCH 02/10] add some tests --- ddl/cluster.go | 111 ++++++++++++++---- ddl/cluster_test.go | 69 +++++++++++ ddl/ddl_api.go | 7 +- executor/recover_test.go | 5 + tests/realtikvtest/brietest/BUILD.bazel | 1 + tests/realtikvtest/brietest/flashback_test.go | 61 ++++++++++ 6 files changed, 231 insertions(+), 23 deletions(-) create mode 100644 tests/realtikvtest/brietest/flashback_test.go diff --git a/ddl/cluster.go b/ddl/cluster.go index 32e85f350bd62..60be6ad520042 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" @@ -57,6 +58,9 @@ var pdScheduleKey = []string{ const ( flashbackMaxBackoff = 300000 // 300s flashbackTimeout = 30 * time.Second // 30s + + readOnlyArgsOffset = 2 + gcEnabledArgsOffset = 3 ) func closePDSchedule() error { @@ -113,6 +117,24 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint) } +func setTiDBRestrictedReadOnly(sess sessionctx.Context, value bool) error { + var setValue string + if value == true { + setValue = variable.On + } else { + setValue = variable.Off + } + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBRestrictedReadOnly, setValue) +} + +func getTiDBRestrictedReadOnly(sess sessionctx.Context) (bool, error) { + val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBRestrictedReadOnly) + if err != nil { + return false, errors.Trace(err) + } + return variable.TiDBOptOn(val), nil +} + func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil { return err @@ -124,6 +146,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = closePDSchedule(); err != nil { return err } + if err = setTiDBRestrictedReadOnly(sess, variable.TiDBOptOn(variable.On)); err != nil { + return err + } nowSchemaVersion, err := t.GetSchemaVersion() if err != nil { @@ -330,12 +355,32 @@ func FlashbackToVersion( // 2. before flashback start, check timestamp, disable GC and close PD schedule. // 3. before flashback done, get key ranges, send flashback RPC. func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + inFlashbackTest := false + failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) { + if val.(bool) { + inFlashbackTest = true + } + }) + // TODO support flashback in unistore + if d.store.Name() != "TiKV" && !inFlashbackTest { + job.State = model.JobStateCancelled + return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") + } + var flashbackTS uint64 var pdScheduleValue map[string]interface{} - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil { + var readOnlyValue, gcEnabledValue bool + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + sess, err := w.sessPool.get() + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + defer w.sessPool.put(sess) switch job.SchemaState { // Stage 1, check and set FlashbackClusterJobID, and save the PD schedule. @@ -357,6 +402,18 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } + readOnlyValue, err = getTiDBRestrictedReadOnly(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[readOnlyArgsOffset] = &readOnlyValue + gcEnableValue, err := gcutil.CheckGCEnable(sess) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + job.Args[gcEnabledArgsOffset] = &gcEnableValue } else { job.State = model.JobStateCancelled return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID) @@ -365,26 +422,26 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, nil // Stage 2, check flashbackTS, close GC and PD schedule. case model.StateWriteOnly: - sess, err := w.sessPool.get() - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - defer w.sessPool.put(sess) if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + // A hack way to make global variables are synchronized to all tidb. + // TiKV will block read/write request during flashback cluster, + // So it's not very dangerous when sync failed. + time.Sleep(1 * time.Second) job.SchemaState = model.StateWriteReorganization return ver, nil // Stage 3, get key ranges. case model.StateWriteReorganization: - sess, err := w.sessPool.get() - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) + // TODO support flashback in unistore + if inFlashbackTest { + asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) + job.State = model.JobStateDone + job.SchemaState = model.StatePublic + return ver, nil } - defer w.sessPool.put(sess) + keyRanges, err := GetFlashbackKeyRanges(sess, tablecodec.EncodeTablePrefix(0)) if err != nil { return ver, errors.Trace(err) @@ -408,27 +465,37 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve func finishFlashbackCluster(w *worker, job *model.Job) error { var flashbackTS uint64 var pdScheduleValue map[string]interface{} - if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil { + var readOnlyValue, gcEnabled bool + var jobID int64 + + if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil { + return errors.Trace(err) + } + sess, err := w.sessPool.get() + if err != nil { return errors.Trace(err) } + defer w.sessPool.put(sess) - err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { + err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - jobID, err := t.GetFlashbackClusterJobID() + jobID, err = t.GetFlashbackClusterJobID() if err != nil { return err } if jobID == job.ID { - if pdScheduleValue != nil { - if err = recoverPDSchedule(pdScheduleValue); err != nil { - return err - } + if err = recoverPDSchedule(pdScheduleValue); err != nil { + return err } - if err = enableGC(w); err != nil { + if err = setTiDBRestrictedReadOnly(sess, readOnlyValue); err != nil { return err } - err = t.SetFlashbackClusterJobID(0) - if err != nil { + if gcEnabled == true { + if err = gcutil.EnableGC(sess); err != nil { + return err + } + } + if err = t.SetFlashbackClusterJobID(0); err != nil { return err } } diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 0621ae64dcfb7..daeed766f75f2 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/dbterror" @@ -116,6 +117,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { tk := testkit.NewTestKit(t, store) injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", @@ -154,6 +156,71 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { require.NoError(t, err) require.EqualValues(t, finishValue["hot-region-schedule-limit"], 1) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) +} + +func TestGlobalVariablesOnFlashback(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + originHook := dom.DDL().GetHook() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobRunBeforeExported = func(job *model.Job) { + assert.Equal(t, model.ActionFlashbackCluster, job.Type) + if job.SchemaState == model.StateWriteReorganization { + rs, err := tk.Exec("show variables like 'tidb_restricted_read_only'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + assert.NoError(t, err) + assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + } + } + dom.DDL().SetHook(hook) + // first try with `tidb_gc_enable` = on and `tidb_restricted_read_only` = off + tk.MustExec("set global tidb_gc_enable = on") + tk.MustExec("set global tidb_restricted_read_only = off") + + tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) + rs, err := tk.Exec("show variables like 'tidb_restricted_read_only'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + + // second try with `tidb_gc_enable` = off and `tidb_restricted_read_only` = on + tk.MustExec("set global tidb_gc_enable = off") + tk.MustExec("set global tidb_restricted_read_only = on") + + tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) + rs, err = tk.Exec("show variables like 'tidb_restricted_read_only'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) + rs, err = tk.Exec("show variables like 'tidb_gc_enable'") + require.NoError(t, err) + require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) + + dom.DDL().SetHook(originHook) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } @@ -166,6 +233,7 @@ func TestCancelFlashbackCluster(t *testing.T) { require.NoError(t, err) injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", @@ -193,6 +261,7 @@ func TestCancelFlashbackCluster(t *testing.T) { dom.DDL().SetHook(originHook) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index bbbce9ac1437a..086f34b986d5c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2599,7 +2599,12 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error job := &model.Job{ Type: model.ActionFlashbackCluster, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{flashbackTS, map[string]interface{}{}}, + // The value for global variables is meaningless, it will cover during flashback cluster. + Args: []interface{}{ + flashbackTS, + map[string]interface{}{}, + true, /* tidb_restricted_read_only */ + true /* tidb_gc_enable */}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) diff --git a/executor/recover_test.go b/executor/recover_test.go index 8d9f3d169e6be..6b5287c4382f8 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -297,10 +297,14 @@ func TestRecoverTableMeetError(t *testing.T) { func TestRecoverClusterMeetError(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + + tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "Not support flashback cluster in non-TiKV env") + ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) flashbackTs := oracle.GetTimeFromTS(ts) injectSafeTS := oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", @@ -333,6 +337,7 @@ func TestRecoverClusterMeetError(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) } func TestRecoverClusterWithTiFlash(t *testing.T) { diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index 969420c9b9c38..d1a250d4397a8 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -6,6 +6,7 @@ go_test( srcs = [ "backup_restore_test.go", "binlog_test.go", + "flashback_test.go", "main_test.go", ], flaky = True, diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go new file mode 100644 index 0000000000000..055637ca7d110 --- /dev/null +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -0,0 +1,61 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brietest + +import ( + "context" + "fmt" + "github.com/pingcap/failpoint" + "testing" + "time" + + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/tests/realtikvtest" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +func TestFlashback(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + } +} From c653c5154dca75851621aadc05029467c0a6bd59 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 7 Sep 2022 13:59:08 +0800 Subject: [PATCH 03/10] fix fmt --- ddl/cluster.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 60be6ad520042..cbcdb8d1650ce 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -119,7 +119,7 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack func setTiDBRestrictedReadOnly(sess sessionctx.Context, value bool) error { var setValue string - if value == true { + if value { setValue = variable.On } else { setValue = variable.Off @@ -490,7 +490,7 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = setTiDBRestrictedReadOnly(sess, readOnlyValue); err != nil { return err } - if gcEnabled == true { + if gcEnabled { if err = gcutil.EnableGC(sess); err != nil { return err } From e92456448ed4e85cf100b27cd38891bdf6edd80b Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 7 Sep 2022 14:07:36 +0800 Subject: [PATCH 04/10] fix fmt --- ddl/cluster.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index cbcdb8d1650ce..438ca4cdebdb5 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -265,7 +265,7 @@ func GetFlashbackKeyRanges(sess sessionctx.Context, startKey kv.Key) ([]kv.KeyRa return keyRanges, nil } -func flashbackToVersion( +func sendFlashbackToVersionRPC( ctx context.Context, s tikv.Storage, version uint64, @@ -334,7 +334,7 @@ func flashbackToVersion( return taskStat, nil } -func FlashbackToVersion( +func flashbackToVersion( ctx context.Context, d *ddlCtx, version uint64, @@ -345,7 +345,7 @@ func FlashbackToVersion( d.store.(tikv.Storage), int(variable.GetFlashbackConcurrency()), func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - return flashbackToVersion(ctx, d.store.(tikv.Storage), version, r) + return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) }, ).RunOnRange(ctx, startKey, endKey) } @@ -448,7 +448,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } for _, ranges := range keyRanges { - if err = FlashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { + if err = flashbackToVersion(context.Background(), d, flashbackTS, ranges.StartKey, ranges.EndKey); err != nil { logutil.BgLogger().Warn("[ddl] Get error when do flashback", zap.Error(err)) return ver, err } From b2409e388aec2d886e6d64fd5f074eec4c6593bf Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 7 Sep 2022 14:19:14 +0800 Subject: [PATCH 05/10] fix fmt --- tests/realtikvtest/brietest/flashback_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go index 055637ca7d110..dad596ad6d3cb 100644 --- a/tests/realtikvtest/brietest/flashback_test.go +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -17,10 +17,10 @@ package brietest import ( "context" "fmt" - "github.com/pingcap/failpoint" "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" From 54b8f817e79cb123373ab90ff0528e39809e4c94 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Wed, 7 Sep 2022 14:56:30 +0800 Subject: [PATCH 06/10] fix test --- executor/recover_test.go | 5 ++- tests/realtikvtest/brietest/flashback_test.go | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/executor/recover_test.go b/executor/recover_test.go index 6b5287c4382f8..a1a131771aa36 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -337,7 +337,7 @@ func TestRecoverClusterMeetError(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } func TestRecoverClusterWithTiFlash(t *testing.T) { @@ -367,6 +367,8 @@ func TestFlashbackWithSafeTs(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -416,6 +418,7 @@ func TestFlashbackWithSafeTs(t *testing.T) { } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } // MockGC is used to make GC work in the test environment. diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go index dad596ad6d3cb..410c46c36080e 100644 --- a/tests/realtikvtest/brietest/flashback_test.go +++ b/tests/realtikvtest/brietest/flashback_test.go @@ -21,17 +21,48 @@ import ( "time" "github.com/pingcap/failpoint" + ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) +// MockGC is used to make GC work in the test environment. +func MockGC(tk *testkit.TestKit) (string, string, string, func()) { + originGC := ddlutil.IsEmulatorGCEnable() + resetGC := func() { + if originGC { + ddlutil.EmulatorGCEnable() + } else { + ddlutil.EmulatorGCDisable() + } + } + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddlutil.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(gcTimeFormat) + timeAfterDrop := time.Now().Add(48 * 60 * 60 * time.Second).Format(gcTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + // clear GC variables first. + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + return timeBeforeDrop, timeAfterDrop, safePointSQL, resetGC +} + func TestFlashback(t *testing.T) { if *realtikvtest.WithRealTiKV { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, index i(a))") From 29d46d466d2380a41d6f3c49f3da8cd2d8af6565 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 8 Sep 2022 11:24:12 +0800 Subject: [PATCH 07/10] follow comments --- ddl/cluster.go | 30 +++++++++++++----------------- ddl/cluster_test.go | 16 ++++++++-------- ddl/ddl_api.go | 2 +- sessionctx/variable/sysvar.go | 2 +- sessionctx/variable/tidb_vars.go | 4 ++-- statistics/handle/ddl.go | 2 ++ 6 files changed, 27 insertions(+), 29 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 438ca4cdebdb5..0ed1ad9d6dee0 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -117,22 +117,16 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint) } -func setTiDBRestrictedReadOnly(sess sessionctx.Context, value bool) error { - var setValue string - if value { - setValue = variable.On - } else { - setValue = variable.Off - } - return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBRestrictedReadOnly, setValue) +func setTiDBSuperReadOnly(sess sessionctx.Context, value string) error { + return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBSuperReadOnly, value) } -func getTiDBRestrictedReadOnly(sess sessionctx.Context) (bool, error) { - val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBRestrictedReadOnly) +func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) { + val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSuperReadOnly) if err != nil { - return false, errors.Trace(err) + return "", errors.Trace(err) } - return variable.TiDBOptOn(val), nil + return val, nil } func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { @@ -146,7 +140,7 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta if err = closePDSchedule(); err != nil { return err } - if err = setTiDBRestrictedReadOnly(sess, variable.TiDBOptOn(variable.On)); err != nil { + if err = setTiDBSuperReadOnly(sess, variable.On); err != nil { return err } @@ -369,7 +363,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve var flashbackTS uint64 var pdScheduleValue map[string]interface{} - var readOnlyValue, gcEnabledValue bool + var readOnlyValue string + var gcEnabledValue bool if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabledValue); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -402,7 +397,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } - readOnlyValue, err = getTiDBRestrictedReadOnly(sess) + readOnlyValue, err = getTiDBSuperReadOnly(sess) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -465,7 +460,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve func finishFlashbackCluster(w *worker, job *model.Job) error { var flashbackTS uint64 var pdScheduleValue map[string]interface{} - var readOnlyValue, gcEnabled bool + var readOnlyValue string + var gcEnabled bool var jobID int64 if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &readOnlyValue, &gcEnabled); err != nil { @@ -487,7 +483,7 @@ func finishFlashbackCluster(w *worker, job *model.Job) error { if err = recoverPDSchedule(pdScheduleValue); err != nil { return err } - if err = setTiDBRestrictedReadOnly(sess, readOnlyValue); err != nil { + if err = setTiDBSuperReadOnly(sess, readOnlyValue); err != nil { return err } if gcEnabled { diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index daeed766f75f2..6fa7ace0c0b8c 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -126,7 +126,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { oldValue := map[string]interface{}{ "hot-region-schedule-limit": 1, } - infosync.SetPDScheduleConfig(context.Background(), oldValue) + require.NoError(t, infosync.SetPDScheduleConfig(context.Background(), oldValue)) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -186,7 +186,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { hook.OnJobRunBeforeExported = func(job *model.Job) { assert.Equal(t, model.ActionFlashbackCluster, job.Type) if job.SchemaState == model.StateWriteReorganization { - rs, err := tk.Exec("show variables like 'tidb_restricted_read_only'") + rs, err := tk.Exec("show variables like 'tidb_super_read_only'") assert.NoError(t, err) assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) rs, err = tk.Exec("show variables like 'tidb_gc_enable'") @@ -195,24 +195,24 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { } } dom.DDL().SetHook(hook) - // first try with `tidb_gc_enable` = on and `tidb_restricted_read_only` = off + // first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off tk.MustExec("set global tidb_gc_enable = on") - tk.MustExec("set global tidb_restricted_read_only = off") + tk.MustExec("set global tidb_super_read_only = off") tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) - rs, err := tk.Exec("show variables like 'tidb_restricted_read_only'") + rs, err := tk.Exec("show variables like 'tidb_super_read_only'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off) rs, err = tk.Exec("show variables like 'tidb_gc_enable'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) - // second try with `tidb_gc_enable` = off and `tidb_restricted_read_only` = on + // second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on tk.MustExec("set global tidb_gc_enable = off") - tk.MustExec("set global tidb_restricted_read_only = on") + tk.MustExec("set global tidb_super_read_only = on") tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts))) - rs, err = tk.Exec("show variables like 'tidb_restricted_read_only'") + rs, err = tk.Exec("show variables like 'tidb_super_read_only'") require.NoError(t, err) require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On) rs, err = tk.Exec("show variables like 'tidb_gc_enable'") diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index d0b6bc14de5c9..2554dcfa7ccad 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2650,7 +2650,7 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error Args: []interface{}{ flashbackTS, map[string]interface{}{}, - true, /* tidb_restricted_read_only */ + variable.On, /* tidb_super_read_only */ true /* tidb_gc_enable */}, } err := d.DoDDLJob(ctx, job) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index b60f2764f9c1f..4adc63dcca62d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -507,7 +507,7 @@ var defaultSysVars = []*SysVar{ MemQuotaBindingCache.Store(TidbOptInt64(val, DefTiDBMemQuotaBindingCache)) return nil }}, - {Scope: ScopeGlobal, Name: TiDBFlashbackConcurrency, Value: strconv.Itoa(DefTiDBFlashbackConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal, Name: TiDBDDLFlashbackConcurrency, Value: strconv.Itoa(DefTiDBFlashbackConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { SetFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBFlashbackConcurrency))) return nil }}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 505739a2cba11..315f93ffcaf84 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -444,8 +444,8 @@ const ( // TiDBDDLReorgWorkerCount defines the count of ddl reorg workers. TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt" - // TiDBFlashbackConcurrency defines the count of ddl flashback workers. - TiDBFlashbackConcurrency = "tidb_flashback_concurrency" + // TiDBDDLFlashbackConcurrency defines the count of ddl flashback workers. + TiDBDDLFlashbackConcurrency = "tidb_ddl_flashback_concurrency" // TiDBDDLReorgBatchSize defines the transaction batch size of ddl reorg workers. TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size" diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 6cff4233a70d9..0bb02d647f86b 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -75,6 +75,8 @@ var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ ast.AnalyzeOptNumTopN: 20, } +// updateStatsVersion will set statistics version to the newest TS, +// then tidb-server will reload automatic. func (h *Handle) updateStatsVersion() error { h.mu.Lock() defer h.mu.Unlock() From 411885ba55305cbef2e615c96b56f19b013e1493 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 8 Sep 2022 11:31:51 +0800 Subject: [PATCH 08/10] update --- ddl/cluster.go | 2 +- sessionctx/variable/sysvar.go | 4 ++-- sessionctx/variable/tidb_vars.go | 4 ++-- sessionctx/variable/varsutil.go | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index 0ed1ad9d6dee0..db81887263f89 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -337,7 +337,7 @@ func flashbackToVersion( return rangetask.NewRangeTaskRunner( "flashback-to-version-runner", d.store.(tikv.Storage), - int(variable.GetFlashbackConcurrency()), + int(variable.GetDDLFlashbackConcurrency()), func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { return sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), version, r) }, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 4adc63dcca62d..da1180dfea470 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -507,8 +507,8 @@ var defaultSysVars = []*SysVar{ MemQuotaBindingCache.Store(TidbOptInt64(val, DefTiDBMemQuotaBindingCache)) return nil }}, - {Scope: ScopeGlobal, Name: TiDBDDLFlashbackConcurrency, Value: strconv.Itoa(DefTiDBFlashbackConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { - SetFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBFlashbackConcurrency))) + {Scope: ScopeGlobal, Name: TiDBDDLFlashbackConcurrency, Value: strconv.Itoa(DefTiDBDDLFlashbackConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { + SetDDLFlashbackConcurrency(int32(tidbOptPositiveInt32(val, DefTiDBDDLFlashbackConcurrency))) return nil }}, {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetGlobal: func(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 315f93ffcaf84..c7f67f4c24d2f 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -909,7 +909,7 @@ const ( DefTiDBRowFormatV2 = 2 DefTiDBDDLReorgWorkerCount = 4 DefTiDBDDLReorgBatchSize = 256 - DefTiDBFlashbackConcurrency = 64 + DefTiDBDDLFlashbackConcurrency = 64 DefTiDBDDLErrorCountLimit = 512 DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBPlacementMode = PlacementModeStrict @@ -1054,7 +1054,7 @@ var ( EnableTmpStorageOnOOM = atomic.NewBool(DefTiDBEnableTmpStorageOnOOM) ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize - ddlFlashbackConcurrency int32 = DefTiDBFlashbackConcurrency + ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit ddlReorgRowFormat int64 = DefTiDBRowFormatV2 maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index c9a1d8e08a97a..e0c31623a235c 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -46,14 +46,14 @@ func GetDDLReorgWorkerCounter() int32 { return atomic.LoadInt32(&ddlReorgWorkerCounter) } -// SetFlashbackConcurrency sets ddlFlashbackConcurrency count. +// SetDDLFlashbackConcurrency sets ddlFlashbackConcurrency count. // Sysvar validation enforces the range to already be correct. -func SetFlashbackConcurrency(cnt int32) { +func SetDDLFlashbackConcurrency(cnt int32) { atomic.StoreInt32(&ddlFlashbackConcurrency, cnt) } -// GetFlashbackConcurrency gets ddlFlashbackConcurrency count. -func GetFlashbackConcurrency() int32 { +// GetDDLFlashbackConcurrency gets ddlFlashbackConcurrency count. +func GetDDLFlashbackConcurrency() int32 { return atomic.LoadInt32(&ddlFlashbackConcurrency) } From 2ded02f3aa899cee630424527d2e0f7904fc5bfa Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Thu, 8 Sep 2022 16:10:26 +0800 Subject: [PATCH 09/10] update --- sessionctx/variable/sysvar_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 93908d12ed34c..1ca4eca74e029 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -620,6 +620,32 @@ func TestTiDBCommitterConcurrency(t *testing.T) { require.NoError(t, err) } +func TestTiDBDDLFlashbackConcurrency(t *testing.T) { + sv := GetSysVar(TiDBDDLFlashbackConcurrency) + vars := NewSessionVars() + + newVal := 128 + val, err := sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal) + require.Equal(t, val, "128") + require.NoError(t, err) + + // out of range + newVal = MaxConfigurableConcurrency + 1 + expected := MaxConfigurableConcurrency + val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal) + // expected to truncate + require.Equal(t, val, fmt.Sprintf("%d", expected)) + require.NoError(t, err) + + // min value out of range + newVal = 0 + expected = 1 + val, err = sv.Validate(vars, fmt.Sprintf("%d", newVal), ScopeGlobal) + // expected to set to min value + require.Equal(t, val, fmt.Sprintf("%d", expected)) + require.NoError(t, err) +} + func TestDefaultMemoryDebugModeValue(t *testing.T) { vars := NewSessionVars() val, err := vars.GetSessionOrGlobalSystemVar(TiDBMemoryDebugModeMinHeapInUse) From 8f4bc1246646a5a4bd4437e4f477f447386f9aaa Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Thu, 8 Sep 2022 21:12:58 +0800 Subject: [PATCH 10/10] update comments --- ddl/cluster.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddl/cluster.go b/ddl/cluster.go index db81887263f89..55208c4f4c74f 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -355,7 +355,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve inFlashbackTest = true } }) - // TODO support flashback in unistore + // TODO: Support flashback in unistore. if d.store.Name() != "TiKV" && !inFlashbackTest { job.State = model.JobStateCancelled return ver, errors.Errorf("Not support flashback cluster in non-TiKV env") @@ -421,15 +421,15 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.State = model.JobStateCancelled return ver, errors.Trace(err) } - // A hack way to make global variables are synchronized to all tidb. - // TiKV will block read/write request during flashback cluster, + // A hack way to make global variables are synchronized to all TiDB. + // TiKV will block read/write requests during flashback cluster. // So it's not very dangerous when sync failed. time.Sleep(1 * time.Second) job.SchemaState = model.StateWriteReorganization return ver, nil // Stage 3, get key ranges. case model.StateWriteReorganization: - // TODO support flashback in unistore + // TODO: Support flashback in unistore. if inFlashbackTest { asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) job.State = model.JobStateDone