Skip to content

Commit

Permalink
pitr: add ingest recorder to repair indexes (#41670) (#46418)
Browse files Browse the repository at this point in the history
ref #38045, close #41668
  • Loading branch information
ti-chi-bot committed Sep 8, 2023
1 parent cbfa816 commit a1af97e
Show file tree
Hide file tree
Showing 14 changed files with 788 additions and 37 deletions.
47 changes: 37 additions & 10 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"golang.org/x/term"
)

const OnlyOneTask int = -1

var spinnerText []string = []string{".", "..", "..."}

type pbProgress struct {
bar *mpb.Bar
progress *mpb.Progress
Expand Down Expand Up @@ -113,8 +117,30 @@ func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
}

func adjustTotal(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
if total == OnlyOneTask {
return buildOneTaskBar(pb, title, 1)
}
return buildProgressBar(pb, title, total, extraFields...)
}

func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...ExtraField) *mpb.Bar {
greenTitle := color.GreenString(title)
bar := pb.New(int64(total),
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
Expand All @@ -128,15 +154,16 @@ func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, ex
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

// If total is zero, finish right now.
if total == 0 {
bar.SetTotal(0, true)
}
var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

return pbProgress{
bar: bar,
ops: ops,
progress: pb,
}
func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
)
}
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/restore/ingestrec",
"//br/pkg/restore/prealloc_table_id",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
Expand Down
106 changes: 106 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
Expand Down Expand Up @@ -2617,6 +2618,78 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
return nil
}

const (
alterTableDropIndexSQL = "ALTER TABLE %n.%n DROP INDEX %n"
alterTableAddIndexFormat = "ALTER TABLE %%n.%%n ADD INDEX %%n(%s)"
alterTableAddUniqueIndexFormat = "ALTER TABLE %%n.%%n ADD UNIQUE KEY %%n(%s)"
alterTableAddPrimaryFormat = "ALTER TABLE %%n.%%n ADD PRIMARY KEY (%s) NONCLUSTERED"
)

// RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
ingestRecorder.UpdateIndexInfo(allSchema)
console := glue.GetConsole(g)
err = ingestRecorder.Iterate(func(_, _ int64, info *ingestrec.IngestIndexInfo) error {
var (
addSQL strings.Builder
addArgs []interface{} = make([]interface{}, 0, 5+len(info.ColumnArgs))
progressTitle string = fmt.Sprintf("repair ingest index %s for table %s.%s", info.IndexInfo.Name.O, info.SchemaName, info.TableName)
)
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
if info.IsPrimary {
addSQL.WriteString(fmt.Sprintf(alterTableAddPrimaryFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName)
addArgs = append(addArgs, info.ColumnArgs...)
} else if info.IndexInfo.Unique {
addSQL.WriteString(fmt.Sprintf(alterTableAddUniqueIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
} else {
addSQL.WriteString(fmt.Sprintf(alterTableAddIndexFormat, info.ColumnList))
addArgs = append(addArgs, info.SchemaName, info.TableName, info.IndexInfo.Name.O)
addArgs = append(addArgs, info.ColumnArgs...)
}
// USING BTREE/HASH/RTREE
indexTypeStr := info.IndexInfo.Tp.String()
if len(indexTypeStr) > 0 {
addSQL.WriteString(" USING ")
addSQL.WriteString(indexTypeStr)
}

// COMMENT [...]
if len(info.IndexInfo.Comment) > 0 {
addSQL.WriteString(" COMMENT %?")
addArgs = append(addArgs, info.IndexInfo.Comment)
}

if info.IndexInfo.Invisible {
addSQL.WriteString(" INVISIBLE")
} else {
addSQL.WriteString(" VISIBLE")
}

if err := rc.db.se.ExecuteInternal(ctx, alterTableDropIndexSQL, info.SchemaName, info.TableName, info.IndexInfo.Name.O); err != nil {
return errors.Trace(err)
}
if err := rc.db.se.ExecuteInternal(ctx, addSQL.String(), addArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
return errors.Trace(err)
}
w.Close()
return nil
})
return errors.Trace(err)
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"
Expand Down Expand Up @@ -2836,6 +2909,39 @@ func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage
})
}

// RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder
// TODO: need to implement the range filter out feature
func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error {
err := recorder.RewriteTableID(func(tableID int64) (int64, bool, error) {
rewriteRule, exists := rewriteRules[tableID]
if !exists {
// since the table's files will be skipped restoring, here also skips.
return 0, true, nil
}
newTableID := GetRewriteTableID(tableID, rewriteRule)
if newTableID == 0 {
return 0, false, errors.Errorf("newTableID is 0, tableID: %d", tableID)
}
return newTableID, false, nil
})
return errors.Trace(err)
/* TODO: we can use range filter to skip restoring the index kv using accelerated indexing feature
filter := rtree.NewRangeTree()
err = recorder.Iterate(func(tableID int64, indexID int64, info *ingestrec.IngestIndexInfo) error {
// range after table ID rewritten
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
rg := rtree.Range{
StartKey: codec.EncodeBytes([]byte{}, startKey),
EndKey: codec.EncodeBytes([]byte{}, endKey),
}
filter.InsertRange(rg)
return nil
})
return errors.Trace(err)
*/
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/restore/ingestrec/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ingestrec",
srcs = ["ingest_recorder.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec",
visibility = ["//visibility:public"],
deps = [
"//parser/model",
"//types",
"@com_github_pingcap_errors//:errors",
],
)

go_test(
name = "ingestrec_test",
srcs = ["ingest_recorder_test.go"],
deps = [
":ingestrec",
"//parser/model",
"@com_github_pkg_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit a1af97e

Please sign in to comment.