Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: merge feature/flashback-cluster to master #37529

Merged
merged 4 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
srcs = [
"backfilling.go",
"callback.go",
"cluster.go",
"column.go",
"constant.go",
"ddl.go",
Expand Down
227 changes: 227 additions & 0 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/oracle"
)

var pdScheduleKey = []string{
"hot-region-schedule-limit",
"leader-schedule-limit",
"merge-schedule-limit",
"region-schedule-limit",
"replica-schedule-limit",
}

func closePDSchedule(job *model.Job) error {
if err := savePDSchedule(job); err != nil {
return err
}
saveValue := make(map[string]interface{})
for _, key := range pdScheduleKey {
saveValue[key] = 0
}
return infosync.SetPDScheduleConfig(context.Background(), saveValue)
}

func savePDSchedule(job *model.Job) error {
retValue, err := infosync.GetPDScheduleConfig(context.Background())
if err != nil {
return err
}
saveValue := make(map[string]interface{})
for _, key := range pdScheduleKey {
saveValue[key] = retValue[key]
}
job.Args = append(job.Args, saveValue)
return nil
}

func recoverPDSchedule(pdScheduleParam map[string]interface{}) error {
if pdScheduleParam == nil {
return nil
}
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
}

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD.
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
}

return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func checkFlashbackCluster(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
sess, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(sess)

if err = gcutil.DisableGC(sess); err != nil {
return err
}
if err = closePDSchedule(job); err != nil {
return err
}
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
if flashbackSchemaVersion != nowSchemaVersion {
return errors.Errorf("schema version not same, have done ddl during [flashbackTS, now)")
}

jobs, err := GetAllDDLJobs(sess, t)
if err != nil {
return errors.Trace(err)
}
// Other ddl jobs in queue, return error.
if len(jobs) != 1 {
var otherJob *model.Job
for _, j := range jobs {
if j.ID != job.ID {
otherJob = j
break
}
}
return errors.Errorf("have other ddl jobs(jobID: %d) in queue, can't do flashback", otherJob.ID)
}
return nil
}

// A Flashback has 3 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before reorg start, check timestamp, disable GC and close PD schedule.
// 3. before reorg done.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
var flashbackTS uint64
if err := job.DecodeArgs(&flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return ver, err
}

// stage 1, check and set FlashbackClusterJobID.
if flashbackJobID == 0 {
err = kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
return meta.NewMeta(txn).SetFlashbackClusterJobID(job.ID)
})
if err != nil {
job.State = model.JobStateCancelled
}
return ver, errors.Trace(err)
} else if flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Other flashback job(ID: %d) is running", job.ID)
}

// stage 2, before reorg start, SnapshotVer == 0 means, job has not started reorg
if job.SnapshotVer == 0 {
if err = checkFlashbackCluster(w, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// get the current version for reorganization.
snapVer, err := getValidCurrentVersion(d.store)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.SnapshotVer = snapVer.Ver
return ver, nil
}

job.State = model.JobStateDone
return ver, errors.Trace(err)
}

func finishFlashbackCluster(w *worker, job *model.Job) error {
var flashbackTS uint64
var pdScheduleValue map[string]interface{}
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue); err != nil {
return errors.Trace(err)
}

err := kv.RunInNewTxn(w.ctx, w.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return err
}
if jobID == job.ID {
if pdScheduleValue != nil {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
return err
}
}
if err = enableGC(w); err != nil {
return err
}
err = t.SetFlashbackClusterJobID(0)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
Expand Down
11 changes: 11 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,17 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo
}
}

func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error {
job := &model.Job{
Type: model.ActionFlashbackCluster,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{flashbackTS, map[string]interface{}{}},
}
err := d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
Expand Down
31 changes: 31 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ func (d *ddl) addBatchDDLJobs2Queue(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
return kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err := t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -383,6 +390,13 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
jobID, err := t.GetFlashbackClusterJobID()
if err != nil {
return errors.Trace(err)
}
if jobID != 0 {
return errors.Errorf("Can't add to ddl table, cluster is flashing back now")
}
ids, err = t.GenGlobalIDs(len(tasks))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -541,6 +555,8 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
switch job.Type {
case model.ActionRecoverTable:
err = finishRecoverTable(w, job)
case model.ActionFlashbackCluster:
err = finishFlashbackCluster(w, job)
case model.ActionCreateTables:
if job.IsCancelled() {
// it may be too large that it can not be added to the history queue, too
Expand Down Expand Up @@ -1094,6 +1110,19 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if job.Type != model.ActionMultiSchemaChange {
logutil.Logger(w.logCtx).Info("[ddl] run DDL job", zap.String("job", job.String()))
}

// Should check flashbackClusterJobID.
// Some ddl jobs maybe added between check and insert into ddl job table.
flashbackJobID, err := t.GetFlashbackClusterJobID()
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if flashbackJobID != 0 && flashbackJobID != job.ID {
job.State = model.JobStateCancelled
return ver, errors.Errorf("Can't do ddl job, cluster is flashing back now")
}

timeStart := time.Now()
if job.RealStartTS == 0 {
job.RealStartTS = t.StartTS
Expand Down Expand Up @@ -1219,6 +1248,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onAlterCacheTable(d, t, job)
case model.ActionAlterNoCacheTable:
ver, err = onAlterNoCacheTable(d, t, job)
case model.ActionFlashbackCluster:
ver, err = w.onFlashbackCluster(d, t, job)
case model.ActionMultiSchemaChange:
ver, err = onMultiSchemaChange(w, d, t, job)
default:
Expand Down
6 changes: 6 additions & 0 deletions ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ func (d Checker) RecoverTable(ctx sessionctx.Context, recoverInfo *ddl.RecoverIn
panic("implement me")
}

// FlashbackCluster implements the DDL interface.
func (d Checker) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) (err error) {
//TODO implement me
panic("implement me")
}

// DropView implements the DDL interface.
func (d Checker) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
err = d.realDDL.DropView(ctx, stmt)
Expand Down
5 changes: 5 additions & 0 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ func (d SchemaTracker) RecoverTable(ctx sessionctx.Context, recoverInfo *ddl.Rec
return nil
}

// FlashbackCluster implements the DDL interface, which is no-op in DM's case.
func (d SchemaTracker) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) (err error) {
return nil
}

// DropView implements the DDL interface.
func (d SchemaTracker) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
notExistTables := make([]string, 0, len(stmt.Tables))
Expand Down
1 change: 1 addition & 0 deletions domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"info.go",
"label_manager.go",
"placement_manager.go",
"schedule_manager.go",
"region.go",
"tiflash_manager.go",
],
Expand Down
Loading