diff --git a/br/pkg/glue/progressing.go b/br/pkg/glue/progressing.go index ed3e323727bf6..8d5808c00c39a 100644 --- a/br/pkg/glue/progressing.go +++ b/br/pkg/glue/progressing.go @@ -16,10 +16,6 @@ import ( "golang.org/x/term" ) -const OnlyOneTask int = -1 - -var spinnerText []string = []string{".", "..", "..."} - type pbProgress struct { bar *mpb.Bar progress *mpb.Progress @@ -117,30 +113,8 @@ func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int, func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter { pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond)) - bar := adjustTotal(pb, title, total, extraFields...) - - // If total is zero, finish right now. - if total == 0 { - bar.SetTotal(0, true) - } - - return pbProgress{ - bar: bar, - ops: ops, - progress: pb, - } -} - -func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar { - if total == OnlyOneTask { - return buildOneTaskBar(pb, title, 1) - } - return buildProgressBar(pb, title, total, extraFields...) -} - -func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar { greenTitle := color.GreenString(title) - return pb.New(int64(total), + bar := pb.New(int64(total), // Play as if the old BR style. mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"), mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller { @@ -154,16 +128,15 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ... mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))), mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))), ) -} -var ( - spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE")) -) + // If total is zero, finish right now. + if total == 0 { + bar.SetTotal(0, true) + } -func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar { - return pb.New(int64(total), - mpb.NopStyle(), - mpb.PrependDecorators(decor.Name(title)), - mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))), - ) + return pbProgress{ + bar: bar, + ops: ops, + progress: pb, + } } diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index af9e47f059335..2de1eb9fbe308 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -34,7 +34,6 @@ go_library( "//br/pkg/metautil", "//br/pkg/pdutil", "//br/pkg/redact", - "//br/pkg/restore/ingestrec", "//br/pkg/restore/prealloc_table_id", "//br/pkg/restore/split", "//br/pkg/restore/tiflashrec", diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 753a04a3c0ff4..7e4953bad8306 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/redact" - "github.com/pingcap/tidb/br/pkg/restore/ingestrec" tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" @@ -2618,78 +2617,6 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error { return nil } -const ( - alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n" - alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)" - alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)" - alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED" -) - -// RepairIngestIndex drops the indexes from IngestRecorder and re-add them. -func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error { - dom, err := g.GetDomain(storage) - if err != nil { - return errors.Trace(err) - } - info := dom.InfoSchema() - allSchema := info.AllSchemas() - ingestRecorder.UpdateIndexInfo(allSchema) - console := glue.GetConsole(g) - err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error { - var ( - addSQL strings.Builder - addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs)) - progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName) - ) - w := console.StartProgressBar(progressTitle, glue.OnlyOneTask) - if info.IsPrimary { - addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList)) - addArgs = append(addArgs, info.SchemaName, info.TableName) - addArgs = append(addArgs, info.ColumnArgs...) - } else if info.IndexInfo.Unique { - addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList)) - addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O) - addArgs = append(addArgs, info.ColumnArgs...) - } else { - addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList)) - addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O) - addArgs = append(addArgs, info.ColumnArgs...) - } - // USING BTREE/HASH/RTREE - indexTypeStr := info.IndexInfo.Tp.String() - if len(indexTypeStr) > 0 { - addSQL.WriteString(" USING ") - addSQL.WriteString(indexTypeStr) - } - - // COMMENT [...] - if len(info.IndexInfo.Comment) > 0 { - addSQL.WriteString(" COMMENT %?") - addArgs = append(addArgs, info.IndexInfo.Comment) - } - - if info.IndexInfo.Invisible { - addSQL.WriteString(" INVISIBLE") - } else { - addSQL.WriteString(" VISIBLE") - } - - if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil { - return errors.Trace(err) - } - if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil { - return errors.Trace(err) - } - w.Inc() - if err := w.Wait(ctx); err != nil { - return errors.Trace(err) - } - w.Close() - return nil - }) - return errors.Trace(err) -} - const ( insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES ` insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)" @@ -2909,39 +2836,6 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage }) } -// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder -// TODO: need to implement the range filter out feature -func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error { - err := recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { - rewriteRule, exists := rewriteRules[tableID] - if !exists { - // since the table's files will be skipped restoring, here also skips. - return 0, true, nil - } - newTableID := GetRewriteTableID(tableID, rewriteRule) - if newTableID == 0 { - return 0, false, errors.Errorf("newTableID is 0, tableID: %d", tableID) - } - return newTableID, false, nil - }) - return errors.Trace(err) - /* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature - filter := rtree.NewRangeTree() - err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error { - // range after table ID rewritten - startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - rg := rtree.Range{ - StartKey: codec.EncodeBytes([]byte{}, startKey), - EndKey: codec.EncodeBytes([]byte{}, endKey), - } - filter.InsertRange(rg) - return nil - }) - return errors.Trace(err) - */ -} - // MockClient create a fake client used to test. func MockClient(dbs map[string]*utils.Database) *Client { return &Client{databases: dbs} diff --git a/br/pkg/restore/ingestrec/BUILD.bazel b/br/pkg/restore/ingestrec/BUILD.bazel deleted file mode 100644 index 45904583ca9b9..0000000000000 --- a/br/pkg/restore/ingestrec/BUILD.bazel +++ /dev/null @@ -1,24 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "ingestrec", - srcs = ["ingest_recorder.go"], - importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec", - visibility = ["//visibility:public"], - deps = [ - "//parser/model", - "//types", - "@com_github_pingcap_errors//:errors", - ], -) - -go_test( - name = "ingestrec_test", - srcs = ["ingest_recorder_test.go"], - deps = [ - ":ingestrec", - "//parser/model", - "@com_github_pkg_errors//:errors", - "@com_github_stretchr_testify//require", - ], -) diff --git a/br/pkg/restore/ingestrec/ingest_recorder.go b/br/pkg/restore/ingestrec/ingest_recorder.go deleted file mode 100644 index 278b878f80e36..0000000000000 --- a/br/pkg/restore/ingestrec/ingest_recorder.go +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ingestrec - -import ( - "fmt" - "strings" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/types" -) - -// IngestIndexInfo records the information used to generate index drop/re-add SQL. -type IngestIndexInfo struct { - SchemaName string - TableName string - ColumnList string - ColumnArgs []interface{} - IsPrimary bool - IndexInfo *model.IndexInfo - Updated bool -} - -// IngestRecorder records the indexes information that use ingest mode to construct kvs. -// Currently log backup cannot backed up these ingest kvs. So need to re-construct them. -type IngestRecorder struct { - // Table ID -> Index ID -> Index info - items map[int64]map[int64]*IngestIndexInfo -} - -// Return an empty IngestRecorder -func New() *IngestRecorder { - return &IngestRecorder{ - items: make(map[int64]map[int64]*IngestIndexInfo), - } -} - -func notIngestJob(job *model.Job) bool { - return job.ReorgMeta == nil || - job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge -} - -func notAddIndexJob(job *model.Job) bool { - /* support new index using accelerated indexing in future: - * // 1. skip if the new index didn't generate new kvs - * // 2. shall the ReorgTp of ModifyColumnJob be ReorgTypeLitMerge if use accelerated indexing? - * if job.RowCount > 0 && notIngestJob(job) { - * // ASSERT: select new indexes, which have the highest IDs in this job's BinlogInfo - * newIndexesInfo := getIndexesWithTheHighestIDs(len(indexIDs)) - * for _, newIndexInfo := range newIndexesInfo { - * tableindexes[newIndexInfo.ID] = ... - * } - * } - */ - return job.Type != model.ActionAddIndex && - job.Type != model.ActionAddPrimaryKey -} - -func notSynced(job *model.Job) bool { - return job.State != model.JobStateSynced -} - -// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder. -func (i *IngestRecorder) AddJob(job *model.Job) error { - if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job) { - return nil - } - - var indexID int64 = 0 - if err := job.DecodeArgs(&indexID); err != nil { - return errors.Trace(err) - } - - tableindexes, exists := i.items[job.TableID] - if !exists { - tableindexes = make(map[int64]*IngestIndexInfo) - i.items[job.TableID] = tableindexes - } - - // the current information of table/index might be modified by other ddl jobs, - // therefore update the index information at last - tableindexes[indexID] = &IngestIndexInfo{ - IsPrimary: job.Type == model.ActionAddPrimaryKey, - Updated: false, - } - - return nil -} - -// RerwiteTableID rewrites the table id of the items in the IngestRecorder -func (i *IngestRecorder) RewriteTableID(rewriteFunc func(tableID int64) (int64, bool, error)) error { - newItems := make(map[int64]map[int64]*IngestIndexInfo) - for tableID, item := range i.items { - newTableID, skip, err := rewriteFunc(tableID) - if err != nil { - return errors.Annotatef(err, "failed to rewrite table id: %d", tableID) - } - if skip { - continue - } - newItems[newTableID] = item - } - i.items = newItems - return nil -} - -// UpdateIndexInfo uses the newest schemas to update the ingest index's information -func (i *IngestRecorder) UpdateIndexInfo(dbInfos []*model.DBInfo) { - for _, dbInfo := range dbInfos { - for _, tblInfo := range dbInfo.Tables { - tableindexes, tblexists := i.items[tblInfo.ID] - if !tblexists { - continue - } - for _, indexInfo := range tblInfo.Indices { - index, idxexists := tableindexes[indexInfo.ID] - if !idxexists { - continue - } - var columnListBuilder strings.Builder - var columnListArgs []interface{} = make([]interface{}, 0, len(indexInfo.Columns)) - var isFirst bool = true - for _, column := range indexInfo.Columns { - if !isFirst { - columnListBuilder.WriteByte(',') - } - isFirst = false - - // expression / column - col := tblInfo.Columns[column.Offset] - if col.Hidden { - // (expression) - // the generated expression string can be directly add into sql - columnListBuilder.WriteByte('(') - columnListBuilder.WriteString(col.GeneratedExprString) - columnListBuilder.WriteByte(')') - } else { - // columnName - columnListBuilder.WriteString("%n") - columnListArgs = append(columnListArgs, column.Name.O) - if column.Length != types.UnspecifiedLength { - columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length)) - } - } - } - index.ColumnList = columnListBuilder.String() - index.ColumnArgs = columnListArgs - index.IndexInfo = indexInfo - index.SchemaName = dbInfo.Name.O - index.TableName = tblInfo.Name.O - index.Updated = true - } - } - } -} - -// Iterate iterates all the ingest index. -func (i *IngestRecorder) Iterate(f func(tableID int64, indexID int64, info *IngestIndexInfo) error) error { - for tableID, is := range i.items { - for indexID, info := range is { - if !info.Updated { - continue - } - if err := f(tableID, indexID, info); err != nil { - return errors.Trace(err) - } - } - } - return nil -} diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go deleted file mode 100644 index 50aefb72a9077..0000000000000 --- a/br/pkg/restore/ingestrec/ingest_recorder_test.go +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ingestrec_test - -import ( - "encoding/json" - "testing" - - "github.com/pingcap/tidb/br/pkg/restore/ingestrec" - "github.com/pingcap/tidb/parser/model" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" -) - -const ( - SchemaName string = "test_db" - TableName string = "test_tbl" - TableID int64 = 80 -) - -func fakeJob(reorgTp model.ReorgType, jobTp model.ActionType, state model.JobState, rowCnt int64, indices []*model.IndexInfo, rawArgs json.RawMessage) *model.Job { - return &model.Job{ - SchemaName: SchemaName, - TableName: TableName, - TableID: TableID, - Type: jobTp, - State: state, - RowCount: rowCnt, - RawArgs: rawArgs, - ReorgMeta: &model.DDLReorgMeta{ - ReorgTp: reorgTp, - }, - BinlogInfo: &model.HistoryInfo{ - TableInfo: &model.TableInfo{ - Indices: indices, - }, - }, - } -} - -func getIndex(id int64, columnsName []string) *model.IndexInfo { - columns := make([]*model.IndexColumn, 0, len(columnsName)) - for _, columnName := range columnsName { - columns = append(columns, &model.IndexColumn{ - Name: model.CIStr{ - O: columnName, - L: columnName, - }, - }) - } - return &model.IndexInfo{ - ID: id, - Name: model.CIStr{ - O: columnsName[0], - L: columnsName[0], // noused - }, - Columns: columns, - } -} - -type iterateFunc func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error - -func noItem(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { - return errors.Errorf("should no items, but have one: [%d, %d, %v]", tableID, indexID, info) -} - -func hasOneItem(idxID int64, columnList string, columnArgs []interface{}) (iterateFunc, *int) { - count := 0 - return func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { - count += 1 - if indexID != idxID || info.ColumnList != columnList { - return errors.Errorf("should has one items, but have another one: [%d, %d, %v]", tableID, indexID, info) - } - for i, arg := range info.ColumnArgs { - if columnArgs[i] != arg { - return errors.Errorf("should has one items, but have another one: [%d, %d, %v]", tableID, indexID, info) - } - } - return nil - }, &count -} - -func TestAddIngestRecorder(t *testing.T) { - allSchemas := []*model.DBInfo{ - { - Name: model.NewCIStr(SchemaName), - Tables: []*model.TableInfo{ - { - ID: TableID, - Name: model.NewCIStr(TableName), - Columns: []*model.ColumnInfo{ - { - Name: model.NewCIStr("x"), - Hidden: false, - }, - { - Name: model.NewCIStr("y"), - Hidden: false, - }, - }, - Indices: []*model.IndexInfo{ - { - ID: 1, - Name: model.NewCIStr("x"), - Table: model.NewCIStr(TableName), - Columns: []*model.IndexColumn{ - { - Name: model.NewCIStr("x"), - Offset: 0, - Length: -1, - }, - { - Name: model.NewCIStr("y"), - Offset: 1, - Length: -1, - }, - }, - Comment: "123", - Tp: model.IndexTypeBtree, - }, - }, - }, - }, - }, - } - recorder := ingestrec.New() - // no ingest job, should ignore it - err := recorder.AddJob(fakeJob( - model.ReorgTypeTxn, - model.ActionAddIndex, - model.JobStateSynced, - 100, - []*model.IndexInfo{ - getIndex(1, []string{"x", "y"}), - }, - nil, - )) - require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) - err = recorder.Iterate(noItem) - require.NoError(t, err) - - // no add-index job, should ignore it - err = recorder.AddJob(fakeJob( - model.ReorgTypeLitMerge, - model.ActionDropIndex, - model.JobStateSynced, - 100, - []*model.IndexInfo{ - getIndex(1, []string{"x", "y"}), - }, - nil, - )) - require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) - err = recorder.Iterate(noItem) - require.NoError(t, err) - - // no synced job, should ignore it - err = recorder.AddJob(fakeJob( - model.ReorgTypeLitMerge, - model.ActionAddIndex, - model.JobStateRollbackDone, - 100, - []*model.IndexInfo{ - getIndex(1, []string{"x", "y"}), - }, - nil, - )) - require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) - err = recorder.Iterate(noItem) - require.NoError(t, err) - - { - recorder := ingestrec.New() - // a normal ingest add index job - err = recorder.AddJob(fakeJob( - model.ReorgTypeLitMerge, - model.ActionAddIndex, - model.JobStateSynced, - 1000, - []*model.IndexInfo{ - getIndex(1, []string{"x", "y"}), - }, - json.RawMessage(`[1, "a"]`), - )) - require.NoError(t, err) - f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) - recorder.UpdateIndexInfo(allSchemas) - err = recorder.Iterate(f) - require.NoError(t, err) - require.Equal(t, *cnt, 1) - } - - { - recorder := ingestrec.New() - // a normal ingest add primary index job - err = recorder.AddJob(fakeJob( - model.ReorgTypeLitMerge, - model.ActionAddPrimaryKey, - model.JobStateSynced, - 1000, - []*model.IndexInfo{ - getIndex(1, []string{"x", "y"}), - }, - json.RawMessage(`[1, "a"]`), - )) - require.NoError(t, err) - f, cnt := hasOneItem(1, "%n,%n", []interface{}{"x", "y"}) - recorder.UpdateIndexInfo(allSchemas) - err = recorder.Iterate(f) - require.NoError(t, err) - require.Equal(t, *cnt, 1) - } -} - -func TestIndexesKind(t *testing.T) { - allSchemas := []*model.DBInfo{ - { - Name: model.NewCIStr(SchemaName), - Tables: []*model.TableInfo{ - { - ID: TableID, - Name: model.NewCIStr(TableName), - Columns: []*model.ColumnInfo{ - { - Name: model.NewCIStr("x"), - Hidden: false, - }, - { - Name: model.NewCIStr("_V$_x_0"), - Hidden: true, - GeneratedExprString: "`x` * 2", - }, - { - Name: model.NewCIStr("z"), - Hidden: false, - }, - }, - Indices: []*model.IndexInfo{ - { - ID: 1, - Name: model.NewCIStr("x"), - Table: model.NewCIStr(TableName), - Columns: []*model.IndexColumn{ - { - Name: model.NewCIStr("x"), - Offset: 0, - Length: -1, - }, - { - Name: model.NewCIStr("_V$_x_0"), - Offset: 1, - Length: -1, - }, - { - Name: model.NewCIStr("z"), - Offset: 2, - Length: 4, - }, - }, - Comment: "123", - Tp: model.IndexTypeHash, - Invisible: true, - }, - }, - }, - }, - }, - } - - recorder := ingestrec.New() - err := recorder.AddJob(fakeJob( - model.ReorgTypeLitMerge, - model.ActionAddIndex, - model.JobStateSynced, - 1000, - []*model.IndexInfo{ - getIndex(1, []string{"x"}), - }, - json.RawMessage(`[1, "a"]`), - )) - require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) - var ( - tableID int64 - indexID int64 - info *ingestrec.IngestIndexInfo - count int = 0 - ) - recorder.Iterate(func(tblID, idxID int64, i *ingestrec.IngestIndexInfo) error { - tableID = tblID - indexID = idxID - info = i - count++ - return nil - }) - require.Equal(t, 1, count) - require.Equal(t, TableID, tableID) - require.Equal(t, int64(1), indexID) - require.Equal(t, SchemaName, info.SchemaName) - require.Equal(t, "%n,(`x` * 2),%n(4)", info.ColumnList) - require.Equal(t, []interface{}{"x", "z"}, info.ColumnArgs) - require.Equal(t, TableName, info.IndexInfo.Table.O) -} - -func TestRewriteTableID(t *testing.T) { - allSchemas := []*model.DBInfo{ - { - Name: model.NewCIStr(SchemaName), - Tables: []*model.TableInfo{ - { - ID: TableID, - Name: model.NewCIStr(TableName), - Columns: []*model.ColumnInfo{ - { - Name: model.NewCIStr("x"), - Hidden: false, - }, - { - Name: model.NewCIStr("y"), - Hidden: false, - }, - }, - Indices: []*model.IndexInfo{ - { - ID: 1, - Name: model.NewCIStr("x"), - Table: model.NewCIStr(TableName), - Columns: []*model.IndexColumn{ - { - Name: model.NewCIStr("x"), - Offset: 0, - Length: -1, - }, - { - Name: model.NewCIStr("y"), - Offset: 1, - Length: -1, - }, - }, - Comment: "123", - Tp: model.IndexTypeBtree, - }, - }, - }, - }, - }, - } - recorder := ingestrec.New() - err := recorder.AddJob(fakeJob( - model.ReorgTypeLitMerge, - model.ActionAddIndex, - model.JobStateSynced, - 1000, - []*model.IndexInfo{ - getIndex(1, []string{"x", "y"}), - }, - json.RawMessage(`[1, "a"]`), - )) - require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) - recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { - return tableID + 1, false, nil - }) - err = recorder.Iterate(func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { - require.Equal(t, TableID+1, tableID) - return nil - }) - require.NoError(t, err) - recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { - return tableID + 1, true, nil - }) - err = recorder.Iterate(noItem) - require.NoError(t, err) -} diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 8b77ee12b1e0f..a707d0f086ce9 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/multierr" "go.uber.org/zap" @@ -429,14 +428,3 @@ func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRu return s, nil } - -// GetRewriteTableID gets rewrite table id by the rewrite rule and original table id -func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { - tableKey := tablecodec.GenTableRecordPrefix(tableID) - rule := matchOldPrefix(tableKey, rewriteRules) - if rule == nil { - return 0 - } - - return tablecodec.DecodeTableID(rule.GetNewKeyPrefix()) -} diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index 95f2cd5dbed56..f75f9f37d81ea 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "//br/pkg/glue", "//br/pkg/httputil", "//br/pkg/logutil", - "//br/pkg/restore/ingestrec", "//br/pkg/storage", "//br/pkg/streamhelper", "//br/pkg/utils", diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 9d771f6197e62..348fb8a7ea440 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" @@ -56,7 +55,6 @@ type DBReplace struct { type SchemasReplace struct { DbMap map[OldID]*DBReplace globalTableIdMap map[OldID]NewID - ingestRecorder *ingestrec.IngestRecorder RewriteTS uint64 // used to rewrite commit ts in meta kv. TableFilter filter.Filter // used to filter schema/table genGenGlobalID func(ctx context.Context) (int64, error) @@ -109,7 +107,6 @@ func NewSchemasReplace( return &SchemasReplace{ DbMap: dbMap, globalTableIdMap: globalTableIdMap, - ingestRecorder: ingestrec.New(), RewriteTS: restoreTS, TableFilter: tableFilter, genGenGlobalID: genID, @@ -521,10 +518,6 @@ func (sr *SchemasReplace) rewriteValue( return r.NewValue, r.NeedRewrite, nil } -func (sr *SchemasReplace) GetIngestRecorder() *ingestrec.IngestRecorder { - return sr.ingestRecorder -} - // RewriteKvEntry uses to rewrite tableID/dbID in entry.key and entry.value func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, error) { // skip mDDLJob @@ -539,7 +532,7 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err return nil, nil } - return nil, sr.restoreFromHistory(job) + return nil, sr.tryToGCJob(job) } return nil, nil } @@ -570,22 +563,21 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err } } -func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error { +func (sr *SchemasReplace) tryToGCJob(job *model.Job) error { if !job.IsCancelled() { switch job.Type { case model.ActionAddIndex, model.ActionAddPrimaryKey: if job.State == model.JobStateRollbackDone { return sr.deleteRange(job) } - err := sr.ingestRecorder.AddJob(job) - return errors.Trace(err) + return nil case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: return sr.deleteRange(job) case model.ActionMultiSchemaChange: for _, sub := range job.MultiSchemaInfo.SubJobs { proxyJob := sub.ToProxyJob(job) - if err := sr.restoreFromHistory(&proxyJob); err != nil { + if err := sr.tryToGCJob(&proxyJob); err != nil { return err } } diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index 7aae69cac2ba8..00b966ae061b4 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -719,7 +719,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } // drop indexes(multi-schema-change) for table0 - err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0) + err = schemaReplace.tryToGCJob(multiSchemaChangeJob0) require.NoError(t, err) for l := 0; l < 2; l++ { for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { @@ -732,7 +732,7 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { } // drop indexes(multi-schema-change) for table1 - err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1) + err = schemaReplace.tryToGCJob(multiSchemaChangeJob1) require.NoError(t, err) for l := 0; l < 2; l++ { iargs = <-midr.indexCh diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index af04b4d392e97..68b7fdad61f6d 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1264,11 +1264,6 @@ func restoreStream( rewriteRules := initRewriteRules(schemasReplace) - ingestRecorder := schemasReplace.GetIngestRecorder() - if err := client.RangeFilterFromIngestRecorder(ingestRecorder, rewriteRules); err != nil { - return errors.Trace(err) - } - logFilesIter, err := client.LoadDMLFiles(ctx) if err != nil { return errors.Annotate(err, "failed to initialize the dml iterator") @@ -1293,10 +1288,6 @@ func restoreStream( return errors.Annotate(err, "failed to insert rows into gc_delete_range") } - if err = client.RepairIngestIndex(ctx, ingestRecorder, g, mgr.GetStorage()); err != nil { - return errors.Annotate(err, "failed to repair ingest index") - } - if cfg.tiflashRecorder != nil { sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema()) log.Info("Generating SQLs for restoring TiFlash Replica", diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index afdb18cf0536d..1b71dcaea24c8 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -53,6 +53,7 @@ go_library( ], deps = [ "//br/pkg/lightning/common", + "//br/pkg/utils", "//config", "//ddl/ingest", "//ddl/label", diff --git a/ddl/index.go b/ddl/index.go index 3c004edb666b1..ec09a3c802379 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/infoschema" @@ -607,7 +608,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo switch indexInfo.State { case model.StateNone: // none -> delete only - reorgTp := pickBackfillType(job) + reorgTp := pickBackfillType(w, job) if reorgTp.NeedMergeProcess() { // Increase telemetryAddIndexIngestUsage telemetryAddIndexIngestUsage.Inc() @@ -692,14 +693,14 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // pickBackfillType determines which backfill process will be used. -func pickBackfillType(job *model.Job) model.ReorgType { +func pickBackfillType(w *worker, job *model.Job) model.ReorgType { if job.ReorgMeta.ReorgTp != model.ReorgTypeNone { // The backfill task has been started. // Don't switch the backfill process. return job.ReorgMeta.ReorgTp } if IsEnableFastReorg() { - canUseIngest := canUseIngest() + canUseIngest := canUseIngest(w) if ingest.LitInitialized && canUseIngest { job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge return model.ReorgTypeLitMerge @@ -716,16 +717,22 @@ func pickBackfillType(job *model.Job) model.ReorgType { } // canUseIngest indicates whether it can use ingest way to backfill index. -func canUseIngest() bool { +func canUseIngest(w *worker) bool { // We only allow one task to use ingest at the same time, in order to limit the CPU usage. - activeJobIDs := ingest.LitBackCtxMgr.Keys() - if len(activeJobIDs) > 0 { - logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job", - zap.Int64("job ID", activeJobIDs[0])) + if len(ingest.LitBackCtxMgr.Keys()) > 0 { return false } - - return true + ctx, err := w.sessPool.get() + if err != nil { + return false + } + defer w.sessPool.put(ctx) + failpoint.Inject("EnablePiTR", func() { + logutil.BgLogger().Info("lightning: mock enable PiTR") + failpoint.Return(true) + }) + // Ingest way is not compatible with PiTR. + return !utils.IsLogBackupInUse(ctx) } // IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed. @@ -784,7 +791,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { - bfProcess := pickBackfillType(job) + bfProcess := pickBackfillType(w, job) if !bfProcess.NeedMergeProcess() { return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) } diff --git a/tests/realtikvtest/addindextest/common.go b/tests/realtikvtest/addindextest/common.go index ab4b6677105c4..d62edc11be5cd 100644 --- a/tests/realtikvtest/addindextest/common.go +++ b/tests/realtikvtest/addindextest/common.go @@ -505,6 +505,7 @@ type failpointsPath struct { } var failpoints = []failpointsPath{ + {"github.com/pingcap/tidb/ddl/EnablePiTR", "return"}, {"github.com/pingcap/tidb/ddl/mockHighLoadForAddIndex", "return"}, {"github.com/pingcap/tidb/ddl/mockBackfillRunErr", "1*return"}, {"github.com/pingcap/tidb/ddl/mockBackfillSlow", "return"}, @@ -517,7 +518,7 @@ var failpoints = []failpointsPath{ func useFailpoints(ctx *suiteContext, failpos int) { defer ctx.failSync.Done() logutil.BgLogger().Info("stack", zap.Stack("cur stack"), zap.Int("id:", failpos)) - failpos %= 7 + failpos %= 8 require.NoError(ctx.t, failpoint.Enable(failpoints[failpos].failpath, failpoints[failpos].inTerm)) logutil.BgLogger().Info("stack", zap.Stack("cur stack"), zap.Int("id:", failpos), zap.Bool("enable failpoints:", true)) time.Sleep(10 * time.Second)