Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pitr: add ingest recorder to repair indexes (#41670) #46418

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"golang.org/x/term"
)

const OnlyOneTask int = -1

var spinnerText []string = []string{".", "..", "..."}

type pbProgress struct {
bar *mpb.Bar
progress *mpb.Progress
Expand Down Expand Up @@ -113,8 +117,30 @@ func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
}

func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
if total == OnlyOneTask {
return buildOneTaskBar(pb, title, 1)
}
return buildProgressBar(pb, title, total, extraFields...)
}

func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
greenTitle := color.GreenString(title)
bar := pb.New(int64(total),
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
Expand All @@ -128,15 +154,16 @@ func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, ex
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}
var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
)
}
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/prealloc_table_id",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
Expand Down
106 changes: 106 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
Expand Down Expand Up @@ -2609,6 +2610,78 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
return nil
}

const (
alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n"
alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)"
alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)"
alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED"
)

// RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
ingestRecorder.UpdateIndexInfo(allSchema)
console := glue.GetConsole(g)
err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error {
var (
addSQL strings.Builder
addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs))
progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName)
)
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
if info.IsPrimary {
addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName)
addArgs = append(addArgs, info.ColumnArgs...)
} else if info.IndexInfo.Unique {
addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
} else {
addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
}
// USING BTREE/HASH/RTREE
indexTypeStr := info.IndexInfo.Tp.String()
if len(indexTypeStr) > 0 {
addSQL.WriteString(" USING ")
addSQL.WriteString(indexTypeStr)
}

// COMMENT [...]
if len(info.IndexInfo.Comment) > 0 {
addSQL.WriteString(" COMMENT %?")
addArgs = append(addArgs, info.IndexInfo.Comment)
}

if info.IndexInfo.Invisible {
addSQL.WriteString(" INVISIBLE")
} else {
addSQL.WriteString(" VISIBLE")
}

if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil {
return errors.Trace(err)
}
if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
return errors.Trace(err)
}
w.Close()
return nil
})
return errors.Trace(err)
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"
Expand Down Expand Up @@ -2828,6 +2901,39 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage
})
}

// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error {
err := recorder.RewriteTableID(func(tableID int64) (int64, bool, error) {
rewriteRule, exists := rewriteRules[tableID]
if !exists {
// since the table's files will be skipped restoring, here also skips.
return 0, true, nil
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
return 0, false, errors.Errorf("newTableID is 0, tableID: %d", tableID)
}
return newTableID, false, nil
})
return errors.Trace(err)
/* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature
filter := rtree.NewRangeTree()
err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error {
// range after table ID rewritten
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
rg := rtree.Range{
StartKey: codec.EncodeBytes([]byte{}, startKey),
EndKey: codec.EncodeBytes([]byte{}, endKey),
}
filter.InsertRange(rg)
return nil
})
return errors.Trace(err)
*/
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/restore/ingestrec/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ingestrec",
srcs = ["ingest_recorder.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec",
visibility = ["//visibility:public"],
deps = [
"//parser/model",
"//types",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "ingestrec_test",
srcs = ["ingest_recorder_test.go"],
deps = [
":ingestrec",
"//parser/model",
"@com_github_pkg_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Loading