Skip to content

Commit

Permalink
Merge branch 'master' into pitr-date-format
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Jul 21, 2022
2 parents 64c7634 + 5f3252c commit 80d6788
Show file tree
Hide file tree
Showing 138 changed files with 10,838 additions and 2,107 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
20 changes: 20 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,26 @@ using
require.True(t, tk.NotHasKeywordInOperatorInfo("select * from t1 where exists(select 1 from t2 where t1.id=t2.id)", "semi join"))
}

func TestBindCTEMerge(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int)")
require.True(t, tk.HasPlan("with cte as (select * from t1) select * from cte", "CTEFullScan"))
require.False(t, tk.HasPlan("with cte as (select /*+ MERGE() */ * from t1) select * from cte", "CTEFullScan"))
tk.MustExec(`
create global binding for
with cte as (select * from t1) select * from cte
using
with cte as (select /*+ MERGE() */ * from t1) select * from cte
`)

require.False(t, tk.HasPlan("with cte as (select * from t1) select * from cte", "CTEFullScan"))
}

// TestBindingSymbolList tests sql with "?, ?, ?, ?", fixes #13871
func TestBindingSymbolList(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//meta",
"//meta/autoid",
"//parser/model",
"//sessionctx",
"//statistics/handle",
"//util",
"//util/codec",
Expand Down
21 changes: 16 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -472,21 +473,31 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastBackupTS, backupTS uint64) error {
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
snapMeta := meta.NewSnapshotMeta(snapshot)
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
lastSnapMeta := meta.NewSnapshotMeta(lastSnapshot)
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
}
allJobs, err := ddl.GetAllDDLJobs(snapMeta)
backupSchemaVersion, err := snapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
}

version, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
}
newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs, err := ddl.GetAllDDLJobs(se, newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
historyJobs, err := ddl.GetAllHistoryDDLJobs(snapMeta)
historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -500,7 +511,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestSkipUnsupportedDDLJob(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.cluster.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestFilterDDLJobs(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestFilterDDLJobsV2(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down
50 changes: 26 additions & 24 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type TaskStatus struct {
Info backuppb.StreamBackupTaskInfo
// paused checks whether the task is paused.
paused bool
// global checkpoint from storage
globalCheckpoint uint64
// Checkpoints collects the checkpoints.
Checkpoints []Checkpoint
// Total QPS of the task in recent seconds.
Expand Down Expand Up @@ -130,14 +132,13 @@ func (p *printByTable) AddTask(task TaskStatus) {
pTime := oracle.GetTimeFromTS(ts)
gap := now.Sub(pTime).Round(time.Second)
gapColor := color.New(color.FgGreen)
if gap > 5*time.Minute {
if gap > 10*time.Minute {
gapColor = color.New(color.FgRed)
}
info := fmt.Sprintf("%s; gap=%s", FormatDate(pTime), gapColor.Sprint(gap))
return info
}
cp := task.GetMinStoreCheckpoint()
table.Add("checkpoint[global]", formatTS(cp.TS))
table.Add("checkpoint[global]", formatTS(task.globalCheckpoint))
p.addCheckpoints(&task, table, formatTS)
for store, e := range task.LastErrors {
table.Add(fmt.Sprintf("error[store=%d]", store), e.ErrorCode)
Expand Down Expand Up @@ -191,16 +192,15 @@ func (p *printByJSON) PrintTasks() {
LastError backuppb.StreamBackupError `json:"last_error"`
}
type jsonTask struct {
Name string `json:"name"`
StartTS uint64 `json:"start_ts,omitempty"`
EndTS uint64 `json:"end_ts,omitempty"`
TableFilter []string `json:"table_filter"`
Progress []storeProgress `json:"progress"`
Storage string `json:"storage"`
CheckpointTS uint64 `json:"checkpoint"`
EstQPS float64 `json:"estimate_qps"`
LastErrors []storeLastError `json:"last_errors"`
AllCheckpoints []Checkpoint `json:"all_checkpoints"`
Name string `json:"name"`
StartTS uint64 `json:"start_ts,omitempty"`
EndTS uint64 `json:"end_ts,omitempty"`
TableFilter []string `json:"table_filter"`
Progress []storeProgress `json:"progress"`
Storage string `json:"storage"`
CheckpointTS uint64 `json:"checkpoint"`
EstQPS float64 `json:"estimate_qps"`
LastErrors []storeLastError `json:"last_errors"`
}
taskToJSON := func(t TaskStatus) jsonTask {
s := storage.FormatBackendURL(t.Info.GetStorage())
Expand All @@ -220,18 +220,16 @@ func (p *printByJSON) PrintTasks() {
LastError: lastError,
})
}
cp := t.GetMinStoreCheckpoint()
return jsonTask{
Name: t.Info.GetName(),
StartTS: t.Info.GetStartTs(),
EndTS: t.Info.GetEndTs(),
TableFilter: t.Info.GetTableFilter(),
Progress: sp,
Storage: s.String(),
CheckpointTS: cp.TS,
EstQPS: t.QPS,
LastErrors: se,
AllCheckpoints: t.Checkpoints,
Name: t.Info.GetName(),
StartTS: t.Info.GetStartTs(),
EndTS: t.Info.GetEndTs(),
TableFilter: t.Info.GetTableFilter(),
Progress: sp,
Storage: s.String(),
CheckpointTS: t.globalCheckpoint,
EstQPS: t.QPS,
LastErrors: se,
}
}
mustMarshal := func(i interface{}) string {
Expand Down Expand Up @@ -359,6 +357,10 @@ func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatu
return s, errors.Annotatef(err, "failed to get progress of task %s", s.Info.Name)
}

if s.globalCheckpoint, err = task.GetStorageCheckpoint(ctx); err != nil {
return s, errors.Annotatef(err, "failed to get storage checkpoint of task %s", s.Info.Name)
}

s.LastErrors, err = task.LastError(ctx)
if err != nil {
return s, err
Expand Down
4 changes: 1 addition & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,11 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpoint(ctx context.Context) (uin
nextRun []kv.KeyRange
)
defer c.recordTimeCost("record all")
cx, cancel := context.WithTimeout(ctx, c.cfg.MaxBackoffTime)
defer cancel()
for {
coll := NewClusterCollector(ctx, c.env)
coll.setOnSuccessHook(c.cache.InsertRange)
for _, u := range thisRun {
err := c.GetCheckpointInRange(cx, u.StartKey, u.EndKey, coll)
err := c.GetCheckpointInRange(ctx, u.StartKey, u.EndKey, coll)
if err != nil {
return 0, err
}
Expand Down
31 changes: 31 additions & 0 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -299,6 +300,13 @@ type Task struct {
Info backuppb.StreamBackupTaskInfo
}

func NewTask(client *MetaDataClient, info backuppb.StreamBackupTaskInfo) *Task {
return &Task{
cli: client,
Info: info,
}
}

// Pause is a shorthand for `metaCli.PauseTask`.
func (t *Task) Pause(ctx context.Context) error {
return t.cli.PauseTask(ctx, t.Info.Name)
Expand Down Expand Up @@ -352,6 +360,29 @@ func (t *Task) NextBackupTSList(ctx context.Context) ([]Checkpoint, error) {
return cps, nil
}

func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error) {
prefix := StorageCheckpointOf(t.Info.Name)
scanner := scanEtcdPrefix(t.cli.Client, prefix)
kvs, err := scanner.AllPages(ctx, 1024)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}

var storageCheckpoint = t.Info.StartTs
for _, kv := range kvs {
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the value isn't 64bits (it is %d bytes, value = %s)",
len(kv.Value),
redact.Key(kv.Value))
}
ts := binary.BigEndian.Uint64(kv.Value)
storageCheckpoint = mathutil.Max(storageCheckpoint, ts)
}

return storageCheckpoint, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
key := CheckPointOf(t.Info.Name, store)
Expand Down
29 changes: 12 additions & 17 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (

const (
flagBackoffTime = "backoff-time"
flagMaxBackoffTime = "max-backoff-time"
flagTickInterval = "tick-interval"
flagFullScanDiffTick = "full-scan-tick"
flagAdvancingByCache = "advancing-by-cache"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 3 * time.Minute
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
DefaultAdvanceByCache = true
)

var (
Expand All @@ -26,8 +29,6 @@ var (
type Config struct {
// The gap between two retries.
BackoffTime time.Duration `toml:"backoff-time" json:"backoff-time"`
// When after this time we cannot collect the safe resolved ts, give up.
MaxBackoffTime time.Duration `toml:"max-backoff-time" json:"max-backoff-time"`
// The gap between calculating checkpoints.
TickDuration time.Duration `toml:"tick-interval" json:"tick-interval"`
// The backoff time of full scan.
Expand All @@ -39,20 +40,18 @@ type Config struct {
}

func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
f.Duration(flagBackoffTime, 5*time.Second, "The gap between two retries.")
f.Duration(flagMaxBackoffTime, 20*time.Minute, "After how long we should advance the checkpoint.")
f.Duration(flagTickInterval, 12*time.Second, "From how log we trigger the tick (advancing the checkpoint).")
f.Bool(flagAdvancingByCache, true, "Whether enable the optimization -- use a cached heap to advancing the global checkpoint.")
f.Int(flagFullScanDiffTick, 4, "The backoff of full scan.")
f.Duration(flagBackoffTime, DefaultBackOffTime, "The gap between two retries.")
f.Duration(flagTickInterval, DefaultTickInterval, "From how long we trigger the tick (advancing the checkpoint).")
f.Bool(flagAdvancingByCache, DefaultAdvanceByCache, "Whether enable the optimization -- use a cached heap to advancing the global checkpoint.")
f.Int(flagFullScanDiffTick, DefaultFullScanTick, "The backoff of full scan.")
}

func Default() Config {
return Config{
BackoffTime: 5 * time.Second,
MaxBackoffTime: 20 * time.Minute,
TickDuration: 12 * time.Second,
FullScanTick: 4,
AdvancingByCache: true,
BackoffTime: DefaultBackOffTime,
TickDuration: DefaultTickInterval,
FullScanTick: DefaultFullScanTick,
AdvancingByCache: DefaultAdvanceByCache,
}
}

Expand All @@ -62,10 +61,6 @@ func (conf *Config) GetFromFlags(f *pflag.FlagSet) error {
if err != nil {
return err
}
conf.MaxBackoffTime, err = f.GetDuration(flagMaxBackoffTime)
if err != nil {
return err
}
conf.TickDuration, err = f.GetDuration(flagTickInterval)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 80d6788

Please sign in to comment.