Skip to content

Commit

Permalink
*: prepare and recover flashback env
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Aug 29, 2022
1 parent 59840f6 commit c2cb5d2
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 74 deletions.
198 changes: 174 additions & 24 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,213 @@
package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/oracle"
)

func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
var flashbackTS uint64
if err := job.DecodeArgs(&flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
var pdScheduleKey = []string{
"hot-region-schedule-limit",
"leader-schedule-limit",
"merge-schedule-limit",
"region-schedule-limit",
"replica-schedule-limit",
}

func closePDSchedule(job *model.Job) error {
if err := savePDSchedule(job); err != nil {
return err
}
saveValue := make(map[string]interface{})
for _, key := range pdScheduleKey {
saveValue[key] = 0
}
return infosync.SetPDScheduleConfig(context.Background(), saveValue)
}

nowSchemaVersion, err := t.GetSchemaVersion()
func savePDSchedule(job *model.Job) error {
retValue, err := infosync.GetPDScheduleConfig(context.Background())
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
return err
}
saveValue := make(map[string]interface{})
for _, key := range pdScheduleKey {
saveValue[key] = retValue[key]
}
job.Args = append(job.Args, saveValue)
return nil
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
func recoverPDSchedule(pdScheduleParam map[string]interface{}) error {
if pdScheduleParam == nil {
return nil
}
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
if flashbackSchemaVersion != nowSchemaVersion {
job.State = model.JobStateCancelled
return ver, errors.Errorf("schema version not same, have done ddl during [flashbackTS, now)")
// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD.
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
}

return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func checkFlashbackCluster(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
sess, err := w.sessPool.get()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
return errors.Trace(err)
}
defer w.sessPool.put(sess)

jobs, err := GetAllDDLJobs(sess, t)
if err = gcutil.DisableGC(sess); err != nil {
return err
}
if err = closePDSchedule(job); err != nil {
return err
}
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
return errors.Trace(err)
}

// Other non-flashback ddl jobs in queue, return error.
flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
if flashbackSchemaVersion != nowSchemaVersion {
return errors.Errorf("schema version not same, have done ddl during [flashbackTS, now)")
}

jobs, err := GetAllDDLJobs(sess, t)
if err != nil {
return errors.Trace(err)
}
// Other ddl jobs in queue, return error.
if len(jobs) != 1 {
job.State = model.JobStateCancelled
var otherJob *model.Job
for _, j := range jobs {
if j.ID != job.ID {
otherJob = j
break
}
}
return ver, errors.Errorf("have other ddl jobs(jobID: %d) in queue, can't do flashback", otherJob.ID)
return errors.Errorf("have other ddl jobs(jobID: %d) in queue, can't do flashback", otherJob.ID)
}
return nil
}

// A Flashback has 3 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before reorg start, check timestamp, disable GC and close PD schedule.
// 3. before reorg done.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
var flashbackTS uint64
if err := job.DecodeArgs(&flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return ver, err
}

// stage 1, check and set FlashbackClusterJobID.
if flashbackJobID == 0 {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
return meta.NewMeta(txn).SetFlashbackClusterJobID(job.ID)
})
if err != nil {
job.State = model.JobStateCancelled
}
return ver, errors.Trace(err)
} else if flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
}

// stage 2, before reorg start, SnapshotVer == 0 means, job has not started reorg
if job.SnapshotVer == 0 {
if err = checkFlashbackCluster(w, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// get the current version for reorganization.
snapVer, err := getValidCurrentVersion(d.store)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.SnapshotVer = snapVer.Ver
return ver, nil
}

job.State = model.JobStateDone
return ver, errors.Trace(err)
}

func finishFlashbackCluster(w *worker, job *model.Job) error {
var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
return errors.Trace(err)
}

err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return err
}
if jobID == job.ID {
if pdScheduleValue != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
}
if err = enableGC(w); err != nil {
return err
}
err = t.SetFlashbackClusterJobID(0)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
job := &model.Job{
Type: model.ActionFlashbackCluster,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{flashbackTS},
Args: []interface{}{flashbackTS, map[string]interface{}{}},
}
err := d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down
29 changes: 29 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err := t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -383,6 +390,13 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err = t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -541,6 +555,8 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
switch job.Type {
case model.ActionRecoverTable:
err = finishRecoverTable(w, job)
case model.ActionFlashbackCluster:
err = finishFlashbackCluster(w, job)
case model.ActionCreateTables:
if job.IsCancelled() {
// it may be too large that it can not be added to the history queue, too
Expand Down Expand Up @@ -1094,6 +1110,19 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if job.Type != model.ActionMultiSchemaChange {
logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
}

// Should check flashbackClusterJobID.
// Some ddl jobs maybe added between check and insert into ddl job table.
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if flashbackJobID != 0 && flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Can't do ddl job, cluster is flashing back now")
}

timeStart := time.Now()
if job.RealStartTS == 0 {
job.RealStartTS = t.StartTS
Expand Down
1 change: 1 addition & 0 deletions domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"info.go",
"label_manager.go",
"placement_manager.go",
"schedule_manager.go",
"region.go",
"tiflash_manager.go",
],
Expand Down
27 changes: 27 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type InfoSyncer struct {
modifyTime time.Time
labelRuleManager LabelRuleManager
placementManager PlacementManager
scheduleManager ScheduleManager
tiflashPlacementManager TiFlashPlacementManager
}

Expand Down Expand Up @@ -193,6 +194,7 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
}
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.tiflashPlacementManager = initTiFlashPlacementManager(etcdCli)
setGlobalInfoSyncer(is)
return is, nil
Expand Down Expand Up @@ -247,6 +249,13 @@ func initTiFlashPlacementManager(etcdCli *clientv3.Client) TiFlashPlacementManag
return &TiFlashPDPlacementManager{etcdCli: etcdCli}
}

func initScheduleManager(etcdCli *clientv3.Client) ScheduleManager {
if etcdCli == nil {
return &mockScheduleManager{}
}
return &PDScheduleManager{etcdCli: etcdCli}
}

// GetMockTiFlash can only be used in tests to get MockTiFlash
func GetMockTiFlash() *MockTiFlash {
is, err := getGlobalInfoSyncer()
Expand Down Expand Up @@ -1167,3 +1176,21 @@ func DeleteInternalSession(se interface{}) {
}
sm.DeleteInternalSession(se)
}

// GetPDScheduleConfig gets the schedule information from pd
func GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, errors.Trace(err)
}
return is.scheduleManager.GetPDScheduleConfig(ctx)
}

// SetPDScheduleConfig sets the schedule information for pd
func SetPDScheduleConfig(ctx context.Context, config map[string]interface{}) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return errors.Trace(err)
}
return is.scheduleManager.SetPDScheduleConfig(ctx, config)
}
Loading

0 comments on commit c2cb5d2

Please sign in to comment.