Skip to content

Commit

Permalink
Merge branch 'release-5.4' into release-5.4-2dd0074e4ece
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Apr 26, 2022
2 parents ce80ee7 + 11dbb91 commit 7bfee1a
Show file tree
Hide file tree
Showing 58 changed files with 1,197 additions and 429 deletions.
50 changes: 41 additions & 9 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"context"
"path/filepath"
"sort"
"strings"

"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -30,12 +32,30 @@ import (

type MDDatabaseMeta struct {
Name string
SchemaFile string
SchemaFile FileInfo
Tables []*MDTableMeta
Views []*MDTableMeta
charSet string
}

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error) {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.L().Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
schema = nil
}
schemaStr := strings.TrimSpace(string(schema))
// set default if schema sql is empty
if len(schemaStr) == 0 {
schemaStr = "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
}

return schemaStr, nil
}

type MDTableMeta struct {
DB string
Name string
Expand Down Expand Up @@ -219,7 +239,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
// setup database schema
if len(s.dbSchemas) != 0 {
for _, fileInfo := range s.dbSchemas {
if _, dbExists := s.insertDB(fileInfo.TableName.Schema, fileInfo.FileMeta.Path); dbExists && s.loader.router == nil {
if _, dbExists := s.insertDB(fileInfo); dbExists && s.loader.router == nil {
return errors.Errorf("invalid database schema file, duplicated item - %s", fileInfo.FileMeta.Path)
}
}
Expand Down Expand Up @@ -406,23 +426,29 @@ func (s *mdLoaderSetup) route() error {
return nil
}

func (s *mdLoaderSetup) insertDB(dbName string, path string) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[dbName]
func (s *mdLoaderSetup) insertDB(f FileInfo) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[f.TableName.Schema]
if ok {
return s.loader.dbs[dbIndex], true
}
s.dbIndexMap[dbName] = len(s.loader.dbs)
s.dbIndexMap[f.TableName.Schema] = len(s.loader.dbs)
ptr := &MDDatabaseMeta{
Name: dbName,
SchemaFile: path,
Name: f.TableName.Schema,
SchemaFile: f,
charSet: s.loader.charSet,
}
s.loader.dbs = append(s.loader.dbs, ptr)
return ptr, false
}

func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
tableIndex, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
return dbMeta.Tables[tableIndex], dbExists, true
Expand All @@ -442,7 +468,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool
}

func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
_, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
meta := &MDTableMeta{
Expand Down
29 changes: 19 additions & 10 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (s *testMydumpLoaderSuite) TestTableInfoNotFound(c *C) {
loader, err := md.NewMyDumpLoader(ctx, s.cfg)
c.Assert(err, IsNil)
for _, dbMeta := range loader.GetDatabases() {
dbSQL, err := dbMeta.GetSchema(ctx, store)
c.Assert(err, IsNil)
c.Assert(dbSQL, Equals, "CREATE DATABASE IF NOT EXISTS `db`")
for _, tblMeta := range dbMeta.Tables {
sql, err := tblMeta.GetSchema(ctx, store)
c.Assert(sql, Equals, "")
Expand Down Expand Up @@ -272,8 +275,14 @@ func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) {
mdl, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "",
Name: "db",
SchemaFile: md.FileInfo{
TableName: filter.Table{
Schema: "db",
Name: "",
},
FileMeta: md.SourceFileMeta{Type: md.SourceTypeSchemaSchema},
},
Tables: []*md.MDTableMeta{{
DB: "db",
Name: "tbl",
Expand Down Expand Up @@ -302,7 +311,7 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) {
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "db-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: ""}, FileMeta: md.SourceFileMeta{Path: "db-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "db",
Expand Down Expand Up @@ -396,7 +405,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "a1",
SchemaFile: "a1-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a1-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "a1",
Expand Down Expand Up @@ -427,11 +436,11 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "d0",
SchemaFile: "d0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d0", Name: ""}, FileMeta: md.SourceFileMeta{Path: "d0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
},
{
Name: "b",
SchemaFile: "a0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "b", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "b",
Expand All @@ -449,7 +458,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "c",
SchemaFile: "c0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: ""}, FileMeta: md.SourceFileMeta{Path: "c0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "c",
Expand All @@ -463,7 +472,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "v",
SchemaFile: "e0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: ""}, FileMeta: md.SourceFileMeta{Path: "e0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "v",
Expand Down Expand Up @@ -552,7 +561,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "d1",
SchemaFile: filepath.FromSlash("d1/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d1",
Expand Down Expand Up @@ -605,7 +614,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
},
{
Name: "d2",
SchemaFile: filepath.FromSlash("d2/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d2", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d2/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d2",
Expand Down
24 changes: 19 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)

for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
Expand Down Expand Up @@ -1125,17 +1126,23 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
})
}
loop:
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
select {
case ch <- common.UniqueTable(tbl.DB, tbl.Name):
case <-gCtx.Done():
break loop
}

}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
return errors.Annotate(err, "check table contains data failed")
}

if len(tableNames) > 0 {
Expand All @@ -1147,13 +1154,20 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
func tableContainsData(ctx context.Context, db utils.DBExecutor, tableName string) (bool, error) {
failpoint.Inject("CheckTableEmptyFailed", func() {
failpoint.Return(false, errors.New("mock error"))
})
query := "select 1 from " + tableName + " limit 1"
exec := common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)
err := exec.QueryRow(ctx, "check table empty", query, &dump)

switch {
case err == sql.ErrNoRows:
case errors.ErrorEqual(err, sql.ErrNoRows):
return false, nil
case err != nil:
return false, errors.Trace(err)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -476,6 +477,9 @@ func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnError(mysql.NewErr(errno.ErrPDServerTimeout))
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
Expand Down Expand Up @@ -541,6 +545,18 @@ func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed", `return`)
c.Assert(err, IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed")
}()

// restrict the concurrency to ensure there are more tables than workers
rc.cfg.App.RegionConcurrency = 1
// test check tables not stuck but return the right error
err = rc.checkTableEmpty(ctx)
c.Assert(err, ErrorMatches, ".*check table contains data failed: mock error.*")
}

func (s *checkInfoSuite) TestLocalResource(c *C) {
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -327,12 +323,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0))
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
return nil, errors.Trace(err)
}

return e.checksumDB(ctx, tableInfo)
return e.checksumDB(ctx, tableInfo, ts)
}

type tableChecksumTS struct {
Expand Down
Loading

0 comments on commit 7bfee1a

Please sign in to comment.