Skip to content

Commit

Permalink
br: make br compatible with infoschemaV2 (#52718)
Browse files Browse the repository at this point in the history
close #52717
  • Loading branch information
Leavrth authored Jul 25, 2024
1 parent 98b7858 commit 1acb8f7
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 293 deletions.
4 changes: 1 addition & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,7 @@ func BuildBackupRangeAndInitSchema(
return nil, nil, nil, nil
}
return ranges, NewBackupSchemas(func(storage kv.Storage, fn func(*model.DBInfo, *model.TableInfo)) error {
return BuildBackupSchemas(storage, tableFilter, backupTS, isFullBackup, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) {
fn(dbInfo, tableInfo)
})
return BuildBackupSchemas(storage, tableFilter, backupTS, isFullBackup, fn)
}, schemasNum), policies, nil
}

Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ go_library(
deps = [
"//br/pkg/conn",
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/utils",
"//pkg/domain",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser/model",
"//pkg/util",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down Expand Up @@ -49,8 +52,9 @@ go_test(
"//br/pkg/mock",
"//br/pkg/pdutil",
"//br/pkg/utiltest",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/session",
"@com_github_coreos_go_semver//semver",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/restore/ingestrec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/parser/model",
"//pkg/types",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

Expand All @@ -20,7 +23,11 @@ go_test(
shard_count = 3,
deps = [
":ingestrec",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser/model",
"//pkg/session",
"//pkg/store/mockstore",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
94 changes: 55 additions & 39 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ package ingestrec
import (
"fmt"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/types"
"go.uber.org/zap"
)

// IngestIndexInfo records the information used to generate index drop/re-add SQL.
Expand Down Expand Up @@ -136,53 +140,65 @@ func (i *IngestRecorder) RewriteTableID(rewriteFunc func(tableID int64) (int64,
}

// UpdateIndexInfo uses the newest schemas to update the ingest index's information
func (i *IngestRecorder) UpdateIndexInfo(dbInfos []*model.DBInfo) {
for _, dbInfo := range dbInfos {
for _, tblInfo := range dbInfo.Tables {
tableindexes, tblexists := i.items[tblInfo.ID]
if !tblexists {
func (i *IngestRecorder) UpdateIndexInfo(infoSchema infoschema.InfoSchema) error {
log.Info("start to update index information for ingest index")
start := time.Now()
defer func() {
log.Info("finish updating index information for ingest index", zap.Duration("takes", time.Since(start)))
}()

for tableID, tableIndexes := range i.items {
tblInfo, tblexists := infoSchema.TableInfoByID(tableID)
if !tblexists || tblInfo == nil {
log.Info("skip repair ingest index, table is dropped", zap.Int64("table id", tableID))
continue
}
// TODO: here only need an interface function like `SchemaNameByID`
dbInfo, dbexists := infoSchema.SchemaByID(tblInfo.DBID)
if !dbexists || dbInfo == nil {
return errors.Errorf("failed to repair ingest index because table exists but cannot find database."+
"[table-id:%d][db-id:%d]", tableID, tblInfo.DBID)
}
for _, indexInfo := range tblInfo.Indices {
index, idxexists := tableIndexes[indexInfo.ID]
if !idxexists {
continue
}
for _, indexInfo := range tblInfo.Indices {
index, idxexists := tableindexes[indexInfo.ID]
if !idxexists {
continue
var columnListBuilder strings.Builder
var columnListArgs []any = make([]any, 0, len(indexInfo.Columns))
var isFirst bool = true
for _, column := range indexInfo.Columns {
if !isFirst {
columnListBuilder.WriteByte(',')
}
var columnListBuilder strings.Builder
var columnListArgs []any = make([]any, 0, len(indexInfo.Columns))
var isFirst bool = true
for _, column := range indexInfo.Columns {
if !isFirst {
columnListBuilder.WriteByte(',')
}
isFirst = false

// expression / column
col := tblInfo.Columns[column.Offset]
if col.Hidden {
// (expression)
// the generated expression string can be directly add into sql
columnListBuilder.WriteByte('(')
columnListBuilder.WriteString(col.GeneratedExprString)
columnListBuilder.WriteByte(')')
} else {
// columnName
columnListBuilder.WriteString("%n")
columnListArgs = append(columnListArgs, column.Name.O)
if column.Length != types.UnspecifiedLength {
columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length))
}
isFirst = false

// expression / column
col := tblInfo.Columns[column.Offset]
if col.Hidden {
// (expression)
// the generated expression string can be directly add into sql
columnListBuilder.WriteByte('(')
columnListBuilder.WriteString(col.GeneratedExprString)
columnListBuilder.WriteByte(')')
} else {
// columnName
columnListBuilder.WriteString("%n")
columnListArgs = append(columnListArgs, column.Name.O)
if column.Length != types.UnspecifiedLength {
columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length))
}
}
index.ColumnList = columnListBuilder.String()
index.ColumnArgs = columnListArgs
index.IndexInfo = indexInfo
index.SchemaName = dbInfo.Name
index.TableName = tblInfo.Name
index.Updated = true
}
index.ColumnList = columnListBuilder.String()
index.ColumnArgs = columnListArgs
index.IndexInfo = indexInfo
index.SchemaName = dbInfo.Name
index.TableName = tblInfo.Name
index.Updated = true
}
}
return nil
}

// Iterate iterates all the ingest index.
Expand Down
Loading

0 comments on commit 1acb8f7

Please sign in to comment.