diff --git a/br/pkg/glue/progressing.go b/br/pkg/glue/progressing.go index 8d5808c00c39a..ed3e323727bf6 100644 --- a/br/pkg/glue/progressing.go +++ b/br/pkg/glue/progressing.go @@ -16,6 +16,10 @@ import ( "golang.org/x/term" ) +const OnlyOneTask int = -1 + +var spinnerText []string = []string{".", "..", "..."} + type pbProgress struct { bar *mpb.Bar progress *mpb.Progress @@ -113,8 +117,30 @@ func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int, func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter { pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond)) + bar := adjustTotal(pb, title, total, extraFields...) + + // If total is zero, finish right now. + if total == 0 { + bar.SetTotal(0, true) + } + + return pbProgress{ + bar: bar, + ops: ops, + progress: pb, + } +} + +func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar { + if total == OnlyOneTask { + return buildOneTaskBar(pb, title, 1) + } + return buildProgressBar(pb, title, total, extraFields...) +} + +func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar { greenTitle := color.GreenString(title) - bar := pb.New(int64(total), + return pb.New(int64(total), // Play as if the old BR style. mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"), mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller { @@ -128,15 +154,16 @@ func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, ex mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))), mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))), ) +} - // If total is zero, finish right now. - if total == 0 { - bar.SetTotal(0, true) - } +var ( + spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE")) +) - return pbProgress{ - bar: bar, - ops: ops, - progress: pb, - } +func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar { + return pb.New(int64(total), + mpb.NopStyle(), + mpb.PrependDecorators(decor.Name(title)), + mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))), + ) } diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 2de1eb9fbe308..af9e47f059335 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//br/pkg/metautil", "//br/pkg/pdutil", "//br/pkg/redact", + "//br/pkg/restore/ingestrec", "//br/pkg/restore/prealloc_table_id", "//br/pkg/restore/split", "//br/pkg/restore/tiflashrec", diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 422c4cce3f63e..d73cee249b26c 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/redact" + "github.com/pingcap/tidb/br/pkg/restore/ingestrec" tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" @@ -2609,6 +2610,78 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error { return nil } +const ( + alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n" + alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)" + alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)" + alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED" +) + +// RepairIngestIndex drops the indexes from IngestRecorder and re-add them. +func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error { + dom, err := g.GetDomain(storage) + if err != nil { + return errors.Trace(err) + } + info := dom.InfoSchema() + allSchema := info.AllSchemas() + ingestRecorder.UpdateIndexInfo(allSchema) + console := glue.GetConsole(g) + err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error { + var ( + addSQL strings.Builder + addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs)) + progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName) + ) + w := console.StartProgressBar(progressTitle, glue.OnlyOneTask) + if info.IsPrimary { + addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList)) + addArgs = append(addArgs, info.SchemaName, info.TableName) + addArgs = append(addArgs, info.ColumnArgs...) + } else if info.IndexInfo.Unique { + addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList)) + addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O) + addArgs = append(addArgs, info.ColumnArgs...) + } else { + addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList)) + addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O) + addArgs = append(addArgs, info.ColumnArgs...) + } + // USING BTREE/HASH/RTREE + indexTypeStr := info.IndexInfo.Tp.String() + if len(indexTypeStr) > 0 { + addSQL.WriteString(" USING ") + addSQL.WriteString(indexTypeStr) + } + + // COMMENT [...] + if len(info.IndexInfo.Comment) > 0 { + addSQL.WriteString(" COMMENT %?") + addArgs = append(addArgs, info.IndexInfo.Comment) + } + + if info.IndexInfo.Invisible { + addSQL.WriteString(" INVISIBLE") + } else { + addSQL.WriteString(" VISIBLE") + } + + if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil { + return errors.Trace(err) + } + if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil { + return errors.Trace(err) + } + w.Inc() + if err := w.Wait(ctx); err != nil { + return errors.Trace(err) + } + w.Close() + return nil + }) + return errors.Trace(err) +} + const ( insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES ` insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)" @@ -2828,6 +2901,39 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage }) } +// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder +// TODO: need to implement the range filter out feature +func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error { + err := recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { + rewriteRule, exists := rewriteRules[tableID] + if !exists { + // since the table's files will be skipped restoring, here also skips. + return 0, true, nil + } + newTableID := GetRewriteTableID(tableID, rewriteRule) + if newTableID == 0 { + return 0, false, errors.Errorf("newTableID is 0, tableID: %d", tableID) + } + return newTableID, false, nil + }) + return errors.Trace(err) + /* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature + filter := rtree.NewRangeTree() + err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error { + // range after table ID rewritten + startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) + endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) + rg := rtree.Range{ + StartKey: codec.EncodeBytes([]byte{}, startKey), + EndKey: codec.EncodeBytes([]byte{}, endKey), + } + filter.InsertRange(rg) + return nil + }) + return errors.Trace(err) + */ +} + // MockClient create a fake client used to test. func MockClient(dbs map[string]*utils.Database) *Client { return &Client{databases: dbs} diff --git a/br/pkg/restore/ingestrec/BUILD.bazel b/br/pkg/restore/ingestrec/BUILD.bazel new file mode 100644 index 0000000000000..45904583ca9b9 --- /dev/null +++ b/br/pkg/restore/ingestrec/BUILD.bazel @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "ingestrec", + srcs = ["ingest_recorder.go"], + importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec", + visibility = ["//visibility:public"], + deps = [ + "//parser/model", + "//types", + "@com_github_pingcap_errors//:errors", + ], +) + +go_test( + name = "ingestrec_test", + srcs = ["ingest_recorder_test.go"], + deps = [ + ":ingestrec", + "//parser/model", + "@com_github_pkg_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/br/pkg/restore/ingestrec/ingest_recorder.go b/br/pkg/restore/ingestrec/ingest_recorder.go new file mode 100644 index 0000000000000..278b878f80e36 --- /dev/null +++ b/br/pkg/restore/ingestrec/ingest_recorder.go @@ -0,0 +1,183 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingestrec + +import ( + "fmt" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/types" +) + +// IngestIndexInfo records the information used to generate index drop/re-add SQL. +type IngestIndexInfo struct { + SchemaName string + TableName string + ColumnList string + ColumnArgs []interface{} + IsPrimary bool + IndexInfo *model.IndexInfo + Updated bool +} + +// IngestRecorder records the indexes information that use ingest mode to construct kvs. +// Currently log backup cannot backed up these ingest kvs. So need to re-construct them. +type IngestRecorder struct { + // Table ID -> Index ID -> Index info + items map[int64]map[int64]*IngestIndexInfo +} + +// Return an empty IngestRecorder +func New() *IngestRecorder { + return &IngestRecorder{ + items: make(map[int64]map[int64]*IngestIndexInfo), + } +} + +func notIngestJob(job *model.Job) bool { + return job.ReorgMeta == nil || + job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge +} + +func notAddIndexJob(job *model.Job) bool { + /* support new index using accelerated indexing in future: + * // 1. skip if the new index didn't generate new kvs + * // 2. shall the ReorgTp of ModifyColumnJob be ReorgTypeLitMerge if use accelerated indexing? + * if job.RowCount > 0 && notIngestJob(job) { + * // ASSERT: select new indexes, which have the highest IDs in this job's BinlogInfo + * newIndexesInfo := getIndexesWithTheHighestIDs(len(indexIDs)) + * for _, newIndexInfo := range newIndexesInfo { + * tableindexes[newIndexInfo.ID] = ... + * } + * } + */ + return job.Type != model.ActionAddIndex && + job.Type != model.ActionAddPrimaryKey +} + +func notSynced(job *model.Job) bool { + return job.State != model.JobStateSynced +} + +// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder. +func (i *IngestRecorder) AddJob(job *model.Job) error { + if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job) { + return nil + } + + var indexID int64 = 0 + if err := job.DecodeArgs(&indexID); err != nil { + return errors.Trace(err) + } + + tableindexes, exists := i.items[job.TableID] + if !exists { + tableindexes = make(map[int64]*IngestIndexInfo) + i.items[job.TableID] = tableindexes + } + + // the current information of table/index might be modified by other ddl jobs, + // therefore update the index information at last + tableindexes[indexID] = &IngestIndexInfo{ + IsPrimary: job.Type == model.ActionAddPrimaryKey, + Updated: false, + } + + return nil +} + +// RerwiteTableID rewrites the table id of the items in the IngestRecorder +func (i *IngestRecorder) RewriteTableID(rewriteFunc func(tableID int64) (int64, bool, error)) error { + newItems := make(map[int64]map[int64]*IngestIndexInfo) + for tableID, item := range i.items { + newTableID, skip, err := rewriteFunc(tableID) + if err != nil { + return errors.Annotatef(err, "failed to rewrite table id: %d", tableID) + } + if skip { + continue + } + newItems[newTableID] = item + } + i.items = newItems + return nil +} + +// UpdateIndexInfo uses the newest schemas to update the ingest index's information +func (i *IngestRecorder) UpdateIndexInfo(dbInfos []*model.DBInfo) { + for _, dbInfo := range dbInfos { + for _, tblInfo := range dbInfo.Tables { + tableindexes, tblexists := i.items[tblInfo.ID] + if !tblexists { + continue + } + for _, indexInfo := range tblInfo.Indices { + index, idxexists := tableindexes[indexInfo.ID] + if !idxexists { + continue + } + var columnListBuilder strings.Builder + var columnListArgs []interface{} = make([]interface{}, 0, len(indexInfo.Columns)) + var isFirst bool = true + for _, column := range indexInfo.Columns { + if !isFirst { + columnListBuilder.WriteByte(',') + } + isFirst = false + + // expression / column + col := tblInfo.Columns[column.Offset] + if col.Hidden { + // (expression) + // the generated expression string can be directly add into sql + columnListBuilder.WriteByte('(') + columnListBuilder.WriteString(col.GeneratedExprString) + columnListBuilder.WriteByte(')') + } else { + // columnName + columnListBuilder.WriteString("%n") + columnListArgs = append(columnListArgs, column.Name.O) + if column.Length != types.UnspecifiedLength { + columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length)) + } + } + } + index.ColumnList = columnListBuilder.String() + index.ColumnArgs = columnListArgs + index.IndexInfo = indexInfo + index.SchemaName = dbInfo.Name.O + index.TableName = tblInfo.Name.O + index.Updated = true + } + } + } +} + +// Iterate iterates all the ingest index. +func (i *IngestRecorder) Iterate(f func(tableID int64, indexID int64, info *IngestIndexInfo) error) error { + for tableID, is := range i.items { + for indexID, info := range is { + if !info.Updated { + continue + } + if err := f(tableID, indexID, info); err != nil { + return errors.Trace(err) + } + } + } + return nil +} diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go new file mode 100644 index 0000000000000..50aefb72a9077 --- /dev/null +++ b/br/pkg/restore/ingestrec/ingest_recorder_test.go @@ -0,0 +1,389 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ingestrec_test + +import ( + "encoding/json" + "testing" + + "github.com/pingcap/tidb/br/pkg/restore/ingestrec" + "github.com/pingcap/tidb/parser/model" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +const ( + SchemaName string = "test_db" + TableName string = "test_tbl" + TableID int64 = 80 +) + +func fakeJob(reorgTp model.ReorgType, jobTp model.ActionType, state model.JobState, rowCnt int64, indices []*model.IndexInfo, rawArgs json.RawMessage) *model.Job { + return &model.Job{ + SchemaName: SchemaName, + TableName: TableName, + TableID: TableID, + Type: jobTp, + State: state, + RowCount: rowCnt, + RawArgs: rawArgs, + ReorgMeta: &model.DDLReorgMeta{ + ReorgTp: reorgTp, + }, + BinlogInfo: &model.HistoryInfo{ + TableInfo: &model.TableInfo{ + Indices: indices, + }, + }, + } +} + +func getIndex(id int64, columnsName []string) *model.IndexInfo { + columns := make([]*model.IndexColumn, 0, len(columnsName)) + for _, columnName := range columnsName { + columns = append(columns, &model.IndexColumn{ + Name: model.CIStr{ + O: columnName, + L: columnName, + }, + }) + } + return &model.IndexInfo{ + ID: id, + Name: model.CIStr{ + O: columnsName[0], + L: columnsName[0], // noused + }, + Columns: columns, + } +} + +type iterateFunc func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error + +func noItem(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { + return errors.Errorf("should no items, but have one: [%d, %d, %v]", tableID, indexID, info) +} + +func hasOneItem(idxID int64, columnList string, columnArgs []interface{}) (iterateFunc, *int) { + count := 0 + return func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { + count += 1 + if indexID != idxID || info.ColumnList != columnList { + return errors.Errorf("should has one items, but have another one: [%d, %d, %v]", tableID, indexID, info) + } + for i, arg := range info.ColumnArgs { + if columnArgs[i] != arg { + return errors.Errorf("should has one items, but have another one: [%d, %d, %v]", tableID, indexID, info) + } + } + return nil + }, &count +} + +func TestAddIngestRecorder(t *testing.T) { + allSchemas := []*model.DBInfo{ + { + Name: model.NewCIStr(SchemaName), + Tables: []*model.TableInfo{ + { + ID: TableID, + Name: model.NewCIStr(TableName), + Columns: []*model.ColumnInfo{ + { + Name: model.NewCIStr("x"), + Hidden: false, + }, + { + Name: model.NewCIStr("y"), + Hidden: false, + }, + }, + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("x"), + Table: model.NewCIStr(TableName), + Columns: []*model.IndexColumn{ + { + Name: model.NewCIStr("x"), + Offset: 0, + Length: -1, + }, + { + Name: model.NewCIStr("y"), + Offset: 1, + Length: -1, + }, + }, + Comment: "123", + Tp: model.IndexTypeBtree, + }, + }, + }, + }, + }, + } + recorder := ingestrec.New() + // no ingest job, should ignore it + err := recorder.AddJob(fakeJob( + model.ReorgTypeTxn, + model.ActionAddIndex, + model.JobStateSynced, + 100, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + nil, + )) + require.NoError(t, err) + recorder.UpdateIndexInfo(allSchemas) + err = recorder.Iterate(noItem) + require.NoError(t, err) + + // no add-index job, should ignore it + err = recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionDropIndex, + model.JobStateSynced, + 100, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + nil, + )) + require.NoError(t, err) + recorder.UpdateIndexInfo(allSchemas) + err = recorder.Iterate(noItem) + require.NoError(t, err) + + // no synced job, should ignore it + err = recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionAddIndex, + model.JobStateRollbackDone, + 100, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + nil, + )) + require.NoError(t, err) + recorder.UpdateIndexInfo(allSchemas) + err = recorder.Iterate(noItem) + require.NoError(t, err) + + { + recorder := ingestrec.New() + // a normal ingest add index job + err = recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionAddIndex, + model.JobStateSynced, + 1000, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + json.RawMessage(`[1, "a"]`), + )) + require.NoError(t, err) + f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) + recorder.UpdateIndexInfo(allSchemas) + err = recorder.Iterate(f) + require.NoError(t, err) + require.Equal(t, *cnt, 1) + } + + { + recorder := ingestrec.New() + // a normal ingest add primary index job + err = recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionAddPrimaryKey, + model.JobStateSynced, + 1000, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + json.RawMessage(`[1, "a"]`), + )) + require.NoError(t, err) + f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) + recorder.UpdateIndexInfo(allSchemas) + err = recorder.Iterate(f) + require.NoError(t, err) + require.Equal(t, *cnt, 1) + } +} + +func TestIndexesKind(t *testing.T) { + allSchemas := []*model.DBInfo{ + { + Name: model.NewCIStr(SchemaName), + Tables: []*model.TableInfo{ + { + ID: TableID, + Name: model.NewCIStr(TableName), + Columns: []*model.ColumnInfo{ + { + Name: model.NewCIStr("x"), + Hidden: false, + }, + { + Name: model.NewCIStr("_V$_x_0"), + Hidden: true, + GeneratedExprString: "`x` * 2", + }, + { + Name: model.NewCIStr("z"), + Hidden: false, + }, + }, + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("x"), + Table: model.NewCIStr(TableName), + Columns: []*model.IndexColumn{ + { + Name: model.NewCIStr("x"), + Offset: 0, + Length: -1, + }, + { + Name: model.NewCIStr("_V$_x_0"), + Offset: 1, + Length: -1, + }, + { + Name: model.NewCIStr("z"), + Offset: 2, + Length: 4, + }, + }, + Comment: "123", + Tp: model.IndexTypeHash, + Invisible: true, + }, + }, + }, + }, + }, + } + + recorder := ingestrec.New() + err := recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionAddIndex, + model.JobStateSynced, + 1000, + []*model.IndexInfo{ + getIndex(1, []string{"x"}), + }, + json.RawMessage(`[1, "a"]`), + )) + require.NoError(t, err) + recorder.UpdateIndexInfo(allSchemas) + var ( + tableID int64 + indexID int64 + info *ingestrec.IngestIndexInfo + count int = 0 + ) + recorder.Iterate(func(tblID, idxID int64, i *ingestrec.IngestIndexInfo) error { + tableID = tblID + indexID = idxID + info = i + count++ + return nil + }) + require.Equal(t, 1, count) + require.Equal(t, TableID, tableID) + require.Equal(t, int64(1), indexID) + require.Equal(t, SchemaName, info.SchemaName) + require.Equal(t, "%n,(`x` * 2),%n(4)", info.ColumnList) + require.Equal(t, []interface{}{"x", "z"}, info.ColumnArgs) + require.Equal(t, TableName, info.IndexInfo.Table.O) +} + +func TestRewriteTableID(t *testing.T) { + allSchemas := []*model.DBInfo{ + { + Name: model.NewCIStr(SchemaName), + Tables: []*model.TableInfo{ + { + ID: TableID, + Name: model.NewCIStr(TableName), + Columns: []*model.ColumnInfo{ + { + Name: model.NewCIStr("x"), + Hidden: false, + }, + { + Name: model.NewCIStr("y"), + Hidden: false, + }, + }, + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("x"), + Table: model.NewCIStr(TableName), + Columns: []*model.IndexColumn{ + { + Name: model.NewCIStr("x"), + Offset: 0, + Length: -1, + }, + { + Name: model.NewCIStr("y"), + Offset: 1, + Length: -1, + }, + }, + Comment: "123", + Tp: model.IndexTypeBtree, + }, + }, + }, + }, + }, + } + recorder := ingestrec.New() + err := recorder.AddJob(fakeJob( + model.ReorgTypeLitMerge, + model.ActionAddIndex, + model.JobStateSynced, + 1000, + []*model.IndexInfo{ + getIndex(1, []string{"x", "y"}), + }, + json.RawMessage(`[1, "a"]`), + )) + require.NoError(t, err) + recorder.UpdateIndexInfo(allSchemas) + recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { + return tableID + 1, false, nil + }) + err = recorder.Iterate(func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { + require.Equal(t, TableID+1, tableID) + return nil + }) + require.NoError(t, err) + recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { + return tableID + 1, true, nil + }) + err = recorder.Iterate(noItem) + require.NoError(t, err) +} diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index a707d0f086ce9..8b77ee12b1e0f 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/multierr" "go.uber.org/zap" @@ -428,3 +429,14 @@ func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRu return s, nil } + +// GetRewriteTableID gets rewrite table id by the rewrite rule and original table id +func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { + tableKey := tablecodec.GenTableRecordPrefix(tableID) + rule := matchOldPrefix(tableKey, rewriteRules) + if rule == nil { + return 0 + } + + return tablecodec.DecodeTableID(rule.GetNewKeyPrefix()) +} diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index f75f9f37d81ea..95f2cd5dbed56 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//br/pkg/glue", "//br/pkg/httputil", "//br/pkg/logutil", + "//br/pkg/restore/ingestrec", "//br/pkg/storage", "//br/pkg/streamhelper", "//br/pkg/utils", diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 7398abdbb2cb9..fcf55a5008340 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" @@ -55,6 +56,7 @@ type DBReplace struct { type SchemasReplace struct { DbMap map[OldID]*DBReplace globalTableIdMap map[OldID]NewID + ingestRecorder *ingestrec.IngestRecorder RewriteTS uint64 // used to rewrite commit ts in meta kv. TableFilter filter.Filter // used to filter schema/table genGenGlobalID func(ctx context.Context) (int64, error) @@ -107,6 +109,7 @@ func NewSchemasReplace( return &SchemasReplace{ DbMap: dbMap, globalTableIdMap: globalTableIdMap, + ingestRecorder: ingestrec.New(), RewriteTS: restoreTS, TableFilter: tableFilter, genGenGlobalID: genID, @@ -504,6 +507,10 @@ func (sr *SchemasReplace) rewriteValue( return r.NewValue, r.NeedRewrite, nil } +func (sr *SchemasReplace) GetIngestRecorder() *ingestrec.IngestRecorder { + return sr.ingestRecorder +} + // RewriteKvEntry uses to rewrite tableID/dbID in entry.key and entry.value func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, error) { // skip mDDLJob @@ -518,7 +525,7 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err return nil, nil } - return nil, sr.tryToGCJob(job) + return nil, sr.restoreFromHistory(job) } return nil, nil } @@ -547,21 +554,22 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err } } -func (sr *SchemasReplace) tryToGCJob(job *model.Job) error { +func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error { if !job.IsCancelled() { switch job.Type { case model.ActionAddIndex, model.ActionAddPrimaryKey: if job.State == model.JobStateRollbackDone { return sr.deleteRange(job) } - return nil + err := sr.ingestRecorder.AddJob(job) + return errors.Trace(err) case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: return sr.deleteRange(job) case model.ActionMultiSchemaChange: for _, sub := range job.MultiSchemaInfo.SubJobs { proxyJob := sub.ToProxyJob(job) - if err := sr.tryToGCJob(&proxyJob); err != nil { + if err := sr.restoreFromHistory(&proxyJob); err != nil { return err } } diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index d2cbe24e8295d..e87e03895e15c 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -645,7 +645,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } // drop indexes(multi-schema-change) for table0 - err = schemaReplace.tryToGCJob(multiSchemaChangeJob0) + err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0) require.NoError(t, err) for l := 0; l < 2; l++ { for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { @@ -658,7 +658,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } // drop indexes(multi-schema-change) for table1 - err = schemaReplace.tryToGCJob(multiSchemaChangeJob1) + err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1) require.NoError(t, err) for l := 0; l < 2; l++ { iargs = <-midr.indexCh diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 68b7fdad61f6d..af04b4d392e97 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1264,6 +1264,11 @@ func restoreStream( rewriteRules := initRewriteRules(schemasReplace) + ingestRecorder := schemasReplace.GetIngestRecorder() + if err := client.RangeFilterFromIngestRecorder(ingestRecorder, rewriteRules); err != nil { + return errors.Trace(err) + } + logFilesIter, err := client.LoadDMLFiles(ctx) if err != nil { return errors.Annotate(err, "failed to initialize the dml iterator") @@ -1288,6 +1293,10 @@ func restoreStream( return errors.Annotate(err, "failed to insert rows into gc_delete_range") } + if err = client.RepairIngestIndex(ctx, ingestRecorder, g, mgr.GetStorage()); err != nil { + return errors.Annotate(err, "failed to repair ingest index") + } + if cfg.tiflashRecorder != nil { sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema()) log.Info("Generating SQLs for restoring TiFlash Replica", diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 1b71dcaea24c8..afdb18cf0536d 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -53,7 +53,6 @@ go_library( ], deps = [ "//br/pkg/lightning/common", - "//br/pkg/utils", "//config", "//ddl/ingest", "//ddl/label", diff --git a/ddl/index.go b/ddl/index.go index ec09a3c802379..3c004edb666b1 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/infoschema" @@ -608,7 +607,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo switch indexInfo.State { case model.StateNone: // none -> delete only - reorgTp := pickBackfillType(w, job) + reorgTp := pickBackfillType(job) if reorgTp.NeedMergeProcess() { // Increase telemetryAddIndexIngestUsage telemetryAddIndexIngestUsage.Inc() @@ -693,14 +692,14 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // pickBackfillType determines which backfill process will be used. -func pickBackfillType(w *worker, job *model.Job) model.ReorgType { +func pickBackfillType(job *model.Job) model.ReorgType { if job.ReorgMeta.ReorgTp != model.ReorgTypeNone { // The backfill task has been started. // Don't switch the backfill process. return job.ReorgMeta.ReorgTp } if IsEnableFastReorg() { - canUseIngest := canUseIngest(w) + canUseIngest := canUseIngest() if ingest.LitInitialized && canUseIngest { job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge return model.ReorgTypeLitMerge @@ -717,22 +716,16 @@ func pickBackfillType(w *worker, job *model.Job) model.ReorgType { } // canUseIngest indicates whether it can use ingest way to backfill index. -func canUseIngest(w *worker) bool { +func canUseIngest() bool { // We only allow one task to use ingest at the same time, in order to limit the CPU usage. - if len(ingest.LitBackCtxMgr.Keys()) > 0 { + activeJobIDs := ingest.LitBackCtxMgr.Keys() + if len(activeJobIDs) > 0 { + logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job", + zap.Int64("job ID", activeJobIDs[0])) return false } - ctx, err := w.sessPool.get() - if err != nil { - return false - } - defer w.sessPool.put(ctx) - failpoint.Inject("EnablePiTR", func() { - logutil.BgLogger().Info("lightning: mock enable PiTR") - failpoint.Return(true) - }) - // Ingest way is not compatible with PiTR. - return !utils.IsLogBackupInUse(ctx) + + return true } // IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed. @@ -791,7 +784,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { - bfProcess := pickBackfillType(w, job) + bfProcess := pickBackfillType(job) if !bfProcess.NeedMergeProcess() { return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) } diff --git a/tests/realtikvtest/addindextest/common.go b/tests/realtikvtest/addindextest/common.go index d62edc11be5cd..ab4b6677105c4 100644 --- a/tests/realtikvtest/addindextest/common.go +++ b/tests/realtikvtest/addindextest/common.go @@ -505,7 +505,6 @@ type failpointsPath struct { } var failpoints = []failpointsPath{ - {"github.com/pingcap/tidb/ddl/EnablePiTR", "return"}, {"github.com/pingcap/tidb/ddl/mockHighLoadForAddIndex", "return"}, {"github.com/pingcap/tidb/ddl/mockBackfillRunErr", "1*return"}, {"github.com/pingcap/tidb/ddl/mockBackfillSlow", "return"}, @@ -518,7 +517,7 @@ var failpoints = []failpointsPath{ func useFailpoints(ctx *suiteContext, failpos int) { defer ctx.failSync.Done() logutil.BgLogger().Info("stack", zap.Stack("cur stack"), zap.Int("id:", failpos)) - failpos %= 8 + failpos %= 7 require.NoError(ctx.t, failpoint.Enable(failpoints[failpos].failpath, failpoints[failpos].inTerm)) logutil.BgLogger().Info("stack", zap.Stack("cur stack"), zap.Int("id:", failpos), zap.Bool("enable failpoints:", true)) time.Sleep(10 * time.Second)