Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: backup empty databases #34385

Merged
merged 10 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func newCheckSumCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
if schema.Table == nil {
continue
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
Expand Down Expand Up @@ -216,6 +219,10 @@ func newBackupMetaValidateCommand() *cobra.Command {
tableIDMap := make(map[int64]int64)
// Simulate to create table
for _, table := range tables {
if table.Info == nil {
// empty database.
continue
}
indexIDAllocator := mockid.NewIDAllocator()
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func BuildBackupRangeAndSchema(
}

if len(tables) == 0 {
log.Warn("It's not necessary for backing up empty database",
zap.Stringer("db", dbInfo.Name))
log.Info("backup empty database", zap.Stringer("db", dbInfo.Name))
backupSchemas.addSchema(dbInfo, nil)
continue
}

Expand Down
60 changes: 35 additions & 25 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func newBackupSchemas() *Schemas {
func (ss *Schemas) addSchema(
dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
if tableInfo == nil {
ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{
dbInfo: dbInfo,
}
return
}
name := fmt.Sprintf("%s.%s",
utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
ss.schemas[name] = &schemaInfo{
Expand Down Expand Up @@ -95,30 +101,31 @@ func (ss *Schemas) BackupSchemas(
}

workerPool.ApplyOnErrorGroup(errg, func() error {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
)

if !skipChecksum {
logger.Info("table checksum start")
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
if schema.tableInfo != nil {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
)

if !skipChecksum {
logger.Info("table checksum start")
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
logger.Info("table checksum finished",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("take", time.Since(start)))
}
logger.Info("table checksum finished",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("take", time.Since(start)))
}
if statsHandle != nil {
if err := schema.dumpStatsToJSON(statsHandle); err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
if statsHandle != nil {
if err := schema.dumpStatsToJSON(statsHandle); err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
}
}
}

// Send schema to metawriter
s, err := schema.encodeToSchema()
if err != nil {
Expand Down Expand Up @@ -187,11 +194,14 @@ func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
return nil, errors.Trace(err)
}

tableBytes, err := json.Marshal(s.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
var tableBytes []byte
if s.tableInfo != nil {
tableBytes, err = json.Marshal(s.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}

}
var statsBytes []byte
if s.stats != nil {
statsBytes, err = json.Marshal(s.stats)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)
require.NotNil(t, backupSchemas)

// Database is not exist.
fooFilter, err := filter.Parse([]string{"foo.t1"})
Expand All @@ -117,7 +117,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)
require.NotNil(t, backupSchemas)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/checksum/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func FastChecksum(
checksum := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
if tbl.Info == nil {
// empty database
continue
}
for _, file := range tbl.Files {
checksum ^= file.Crc64Xor
totalKvs += file.TotalKvs
Expand Down
36 changes: 23 additions & 13 deletions br/pkg/metautil/metafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,21 +291,26 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
tableMap := make(map[int64]*Table, MaxBatchSize)
err := receiveBatch(ctx, errCh, ch, MaxBatchSize, func(item interface{}) error {
s := item.(*backuppb.Schema)
tableInfo := &model.TableInfo{}
if err := json.Unmarshal(s.Table, tableInfo); err != nil {
return errors.Trace(err)
}
dbInfo := &model.DBInfo{}
if err := json.Unmarshal(s.Db, dbInfo); err != nil {
return errors.Trace(err)
}

var tableInfo *model.TableInfo
if s.Table != nil {
tableInfo = &model.TableInfo{}
if err := json.Unmarshal(s.Table, tableInfo); err != nil {
return errors.Trace(err)
}
}
var stats *handle.JSONTable
if s.Stats != nil {
stats = &handle.JSONTable{}
if err := json.Unmarshal(s.Stats, stats); err != nil {
return errors.Trace(err)
}
}

table := &Table{
DB: dbInfo,
Info: tableInfo,
Expand All @@ -315,18 +320,23 @@ func (reader *MetaReader) ReadSchemasFiles(ctx context.Context, output chan<- *T
TiFlashReplicas: int(s.TiflashReplicas),
Stats: stats,
}
if files, ok := fileMap[tableInfo.ID]; ok {
table.Files = append(table.Files, files...)
}
if tableInfo.Partition != nil {
// Partition table can have many table IDs (partition IDs).
for _, p := range tableInfo.Partition.Definitions {
if files, ok := fileMap[p.ID]; ok {
table.Files = append(table.Files, files...)
if tableInfo != nil {
if files, ok := fileMap[tableInfo.ID]; ok {
table.Files = append(table.Files, files...)
}
if tableInfo.Partition != nil {
// Partition table can have many table IDs (partition IDs).
for _, p := range tableInfo.Partition.Definitions {
if files, ok := fileMap[p.ID]; ok {
table.Files = append(table.Files, files...)
}
}
}
tableMap[tableInfo.ID] = table
} else {
// empty database
tableMap[0] = table
}
tableMap[tableInfo.ID] = table
return nil
})
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error {
}
schemasMap[utils.EncloseName(dbName)] = struct{}{}
for _, table := range db.Tables {
if table.Info == nil {
// we may back up empty database.
continue
}
tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.O)] = struct{}{}
}
}
Expand Down Expand Up @@ -583,19 +587,18 @@ func filterRestoreFiles(
cfg *RestoreConfig,
) (files []*backuppb.File, tables []*metautil.Table, dbs []*utils.Database) {
for _, db := range client.GetDatabases() {
createdDatabase := false
dbName := db.Info.Name.O
if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok {
dbName = name
}
if !cfg.TableFilter.MatchSchema(dbName) {
continue
}
dbs = append(dbs, db)
for _, table := range db.Tables {
if !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) {
if table.Info == nil || !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) {
continue
}
if !createdDatabase {
dbs = append(dbs, db)
createdDatabase = true
}
files = append(files, table.Files...)
tables = append(tables, table)
}
Expand Down
29 changes: 23 additions & 6 deletions br/tests/br_backup_empty/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,51 @@ set -eu
DB="$TEST_NAME"

# backup empty.
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_db"
echo "backup empty cluster start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_cluster"
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on backup empty cluster!"
exit 1
fi

# restore empty.
echo "restore start..."
run_br restore full -s "local://$TEST_DIR/empty_db" --pd $PD_ADDR --ratelimit 1024
echo "restore empty cluster start..."
run_br restore full -s "local://$TEST_DIR/empty_cluster" --pd $PD_ADDR --ratelimit 1024
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on restore empty cluster!"
exit 1
fi

# backup and restore empty tables.
run_sql "CREATE DATABASE $DB;"
echo "backup empty db start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_db"
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on backup empty cluster!"
exit 1
fi

run_sql "DROP DATABASE $DB"

# restore empty.
echo "restore empty db start..."
run_br restore full -s "local://$TEST_DIR/empty_db" --pd $PD_ADDR --ratelimit 1024
if [ $? -ne 0 ]; then
echo "TEST: [$TEST_NAME] failed on restore empty cluster!"
exit 1
fi

run_sql "CREATE TABLE $DB.usertable1 ( \
YCSB_KEY varchar(64) NOT NULL, \
FIELD0 varchar(1) DEFAULT NULL, \
PRIMARY KEY (YCSB_KEY) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;"

echo "backup start..."
echo "backup empty table start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/empty_table"

run_sql "DROP DATABASE $DB;"
echo "restore start..."
echo "restore empty table start..."
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/empty_table"

# insert one row to make sure table is restored.
Expand Down