Skip to content

Commit

Permalink
cherry pick pingcap#34385 to release-5.4
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
3pointer authored and ti-srebot committed Jun 2, 2022
1 parent 4d888ce commit 5da1655
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 52 deletions.
7 changes: 7 additions & 0 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func newCheckSumCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
if schema.Table == nil {
continue
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
Expand Down Expand Up @@ -216,6 +219,10 @@ func newBackupMetaValidateCommand() *cobra.Command {
tableIDMap := make(map[int64]int64)
// Simulate to create table
for _, table := range tables {
if table.Info == nil {
// empty database.
continue
}
indexIDAllocator := mockid.NewIDAllocator()
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func BuildBackupRangeAndSchema(
}

if len(tables) == 0 {
log.Warn("It's not necessary for backing up empty database",
zap.Stringer("db", dbInfo.Name))
log.Info("backup empty database", zap.Stringer("db", dbInfo.Name))
backupSchemas.addSchema(dbInfo, nil)
continue
}

Expand Down
60 changes: 35 additions & 25 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func newBackupSchemas() *Schemas {
func (ss *Schemas) addSchema(
dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
if tableInfo == nil {
ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{
dbInfo: dbInfo,
}
return
}
name := fmt.Sprintf("%s.%s",
utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
ss.schemas[name] = &schemaInfo{
Expand Down Expand Up @@ -95,30 +101,31 @@ func (ss *Schemas) BackupSchemas(
}

workerPool.ApplyOnErrorGroup(errg, func() error {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
)

if !skipChecksum {
logger.Info("table checksum start")
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
if schema.tableInfo != nil {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
)

if !skipChecksum {
logger.Info("table checksum start")
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
logger.Info("table checksum finished",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("take", time.Since(start)))
}
logger.Info("table checksum finished",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("take", time.Since(start)))
}
if statsHandle != nil {
if err := schema.dumpStatsToJSON(statsHandle); err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
if statsHandle != nil {
if err := schema.dumpStatsToJSON(statsHandle); err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
}
}
}

// Send schema to metawriter
s, err := schema.encodeToSchema()
if err != nil {
Expand Down Expand Up @@ -187,11 +194,14 @@ func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
return nil, errors.Trace(err)
}

tableBytes, err := json.Marshal(s.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
var tableBytes []byte
if s.tableInfo != nil {
tableBytes, err = json.Marshal(s.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}

}
var statsBytes []byte
if s.stats != nil {
statsBytes, err = json.Marshal(s.stats)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,19 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {

// Table t1 is not exist.
testFilter, err := filter.Parse([]string{"test.t1"})
<<<<<<< HEAD
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(
s.mock.Storage, testFilter, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas, IsNil)
=======
require.NoError(t, err)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.NotNil(t, backupSchemas)
>>>>>>> 9339955f0... backup: backup empty databases (#34385)

// Database is not exist.
fooFilter, err := filter.Parse([]string{"foo.t1"})
Expand All @@ -117,11 +125,19 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
// Empty database.
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
<<<<<<< HEAD
c.Assert(err, IsNil)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
s.mock.Storage, noFilter, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas, IsNil)
=======
require.NoError(t, err)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64, false)
require.NoError(t, err)
require.NotNil(t, backupSchemas)
>>>>>>> 9339955f0... backup: backup empty databases (#34385)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/checksum/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func FastChecksum(
checksum := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
if tbl.Info == nil {
// empty database
continue
}
for _, file := range tbl.Files {
checksum ^= file.Crc64Xor
totalKvs += file.TotalKvs
Expand Down
36 changes: 23 additions & 13 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,21 +291,26 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
tableMap := make(map[int64]*Table, MaxBatchSize)
err := receiveBatch(ctx, errCh, ch, MaxBatchSize, func(item interface{}) error {
s := item.(*backuppb.Schema)
tableInfo := &model.TableInfo{}
if err := json.Unmarshal(s.Table, tableInfo); err != nil {
return errors.Trace(err)
}
dbInfo := &model.DBInfo{}
if err := json.Unmarshal(s.Db, dbInfo); err != nil {
return errors.Trace(err)
}

var tableInfo *model.TableInfo
if s.Table != nil {
tableInfo = &model.TableInfo{}
if err := json.Unmarshal(s.Table, tableInfo); err != nil {
return errors.Trace(err)
}
}
var stats *handle.JSONTable
if s.Stats != nil {
stats = &handle.JSONTable{}
if err := json.Unmarshal(s.Stats, stats); err != nil {
return errors.Trace(err)
}
}

table := &Table{
DB: dbInfo,
Info: tableInfo,
Expand All @@ -315,18 +320,23 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
TiFlashReplicas: int(s.TiflashReplicas),
Stats: stats,
}
if files, ok := fileMap[tableInfo.ID]; ok {
table.Files = append(table.Files, files...)
}
if tableInfo.Partition != nil {
// Partition table can have many table IDs (partition IDs).
for _, p := range tableInfo.Partition.Definitions {
if files, ok := fileMap[p.ID]; ok {
table.Files = append(table.Files, files...)
if tableInfo != nil {
if files, ok := fileMap[tableInfo.ID]; ok {
table.Files = append(table.Files, files...)
}
if tableInfo.Partition != nil {
// Partition table can have many table IDs (partition IDs).
for _, p := range tableInfo.Partition.Definitions {
if files, ok := fileMap[p.ID]; ok {
table.Files = append(table.Files, files...)
}
}
}
tableMap[tableInfo.ID] = table
} else {
// empty database
tableMap[0] = table
}
tableMap[tableInfo.ID] = table
return nil
})
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error {
}
schemasMap[utils.EncloseName(dbName)] = struct{}{}
for _, table := range db.Tables {
if table.Info == nil {
// we may back up empty database.
continue
}
tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.O)] = struct{}{}
}
}
Expand Down Expand Up @@ -538,19 +542,18 @@ func filterRestoreFiles(
cfg *RestoreConfig,
) (files []*backuppb.File, tables []*metautil.Table, dbs []*utils.Database) {
for _, db := range client.GetDatabases() {
createdDatabase := false
dbName := db.Info.Name.O
if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok {
dbName = name
}
if !cfg.TableFilter.MatchSchema(dbName) {
continue
}
dbs = append(dbs, db)
for _, table := range db.Tables {
if !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) {
if table.Info == nil || !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) {
continue
}
if !createdDatabase {
dbs = append(dbs, db)
createdDatabase = true
}
files = append(files, table.Files...)
tables = append(tables, table)
}
Expand Down
29 changes: 23 additions & 6 deletions br/tests/br_backup_empty/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,51 @@ set -eu
DB="$TEST_NAME"

# backup empty.
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_db"
echo "backup empty cluster start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_cluster"
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on backup empty cluster!"
exit 1
fi

# restore empty.
echo "restore start..."
run_br restore full -s "local://$TEST_DIR/empty_db" --pd $PD_ADDR --ratelimit 1024
echo "restore empty cluster start..."
run_br restore full -s "local://$TEST_DIR/empty_cluster" --pd $PD_ADDR --ratelimit 1024
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on restore empty cluster!"
exit 1
fi

# backup and restore empty tables.
run_sql "CREATE DATABASE $DB;"
echo "backup empty db start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_db"
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on backup empty cluster!"
exit 1
fi

run_sql "DROP DATABASE $DB"

# restore empty.
echo "restore empty db start..."
run_br restore full -s "local://$TEST_DIR/empty_db" --pd $PD_ADDR --ratelimit 1024
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on restore empty cluster!"
exit 1
fi

run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

echo "backup start..."
echo "backup empty table start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_table"

run_sql "DROP DATABASE $DB;"
echo "restore start..."
echo "restore empty table start..."
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/empty_table"

# insert one row to make sure table is restored.
Expand Down

0 comments on commit 5da1655

Please sign in to comment.