Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

pkg/utils: speed up LoadBackupMeta (#744) #745

Merged
merged 2 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
package utils

import (
"bytes"
"encoding/json"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/tablecodec"

"github.com/pingcap/br/pkg/logutil"
)

const (
Expand All @@ -32,7 +34,7 @@ type Table struct {
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
Files []*backup.File
Files []*kvproto.File
TiFlashReplicas int
Stats *handle.JSONTable
}
Expand Down Expand Up @@ -66,7 +68,15 @@ func (db *Database) GetTable(name string) *Table {
}

// LoadBackupTables loads schemas from BackupMeta.
func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) {
func LoadBackupTables(meta *kvproto.BackupMeta) (map[string]*Database, error) {
filesMap := make(map[int64][]*kvproto.File, len(meta.Schemas))
for _, file := range meta.Files {
tableID := tablecodec.DecodeTableID(file.GetStartKey())
if tableID == 0 {
log.Panic("tableID must not equal to 0", logutil.File(file))
}
filesMap[tableID] = append(filesMap[tableID], file)
}
databases := make(map[string]*Database)
for _, schema := range meta.Schemas {
// Parse the database schema.
Expand Down Expand Up @@ -99,26 +109,22 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) {
return nil, errors.Trace(err)
}
}
partitions := make(map[int64]bool)
partitions := make(map[int64]struct{})
if tableInfo.Partition != nil {
for _, p := range tableInfo.Partition.Definitions {
partitions[p.ID] = true
partitions[p.ID] = struct{}{}
}
}
// Find the files belong to the table
tableFiles := make([]*backup.File, 0)
for _, file := range meta.Files {
// If the file do not contains any table data, skip it.
if !bytes.HasPrefix(file.GetStartKey(), tablecodec.TablePrefix()) &&
!bytes.HasPrefix(file.GetEndKey(), tablecodec.TablePrefix()) {
continue
}
startTableID := tablecodec.DecodeTableID(file.GetStartKey())
// If the file contains a part of the data of the table, append it to the slice.
if ok := partitions[startTableID]; ok || startTableID == tableInfo.ID {
tableFiles = append(tableFiles, file)
}
tableFiles := make([]*kvproto.File, 0)
if files, exists := filesMap[tableInfo.ID]; exists {
tableFiles = append(tableFiles, files...)
}
// If the file contains a part of the data of the table, append it to the slice.
for partitionID := range partitions {
tableFiles = append(tableFiles, filesMap[partitionID]...)
}

table := &Table{
DB: dbInfo,
Info: tableInfo,
Expand All @@ -136,7 +142,7 @@ func LoadBackupTables(meta *backup.BackupMeta) (map[string]*Database, error) {
}

// ArchiveSize returns the total size of the backup archive.
func ArchiveSize(meta *backup.BackupMeta) uint64 {
func ArchiveSize(meta *kvproto.BackupMeta) uint64 {
total := uint64(meta.Size())
for _, file := range meta.Files {
total += file.Size_
Expand Down
168 changes: 168 additions & 0 deletions pkg/utils/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package utils

import (
"encoding/json"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/backup"
Expand Down Expand Up @@ -79,3 +80,170 @@ func (r *testSchemaSuite) TestLoadBackupMeta(c *C) {
c.Assert(tbl.Files, HasLen, 1)
c.Assert(tbl.Files[0].Name, Equals, "1.sst")
}

func (r *testSchemaSuite) TestLoadBackupMetaPartionTable(c *C) {
tblName := model.NewCIStr("t1")
dbName := model.NewCIStr("test")
tblID := int64(123)
partID1 := int64(124)
partID2 := int64(125)
mockTbl := &model.TableInfo{
ID: tblID,
Name: tblName,
Partition: &model.PartitionInfo{
Definitions: []model.PartitionDefinition{
{ID: partID1},
{ID: partID2},
},
},
}
mockStats := handle.JSONTable{
DatabaseName: dbName.String(),
TableName: tblName.String(),
}
mockDB := model.DBInfo{
ID: 1,
Name: dbName,
Tables: []*model.TableInfo{
mockTbl,
},
}
dbBytes, err := json.Marshal(mockDB)
c.Assert(err, IsNil)
tblBytes, err := json.Marshal(mockTbl)
c.Assert(err, IsNil)
statsBytes, err := json.Marshal(mockStats)
c.Assert(err, IsNil)

mockSchemas := []*backup.Schema{
{
Db: dbBytes,
Table: tblBytes,
Stats: statsBytes,
},
}

mockFiles := []*backup.File{
// should include 1.sst - 3.sst
{
Name: "1.sst",
StartKey: tablecodec.EncodeRowKey(partID1, []byte("a")),
EndKey: tablecodec.EncodeRowKey(partID1, []byte("b")),
},
{
Name: "2.sst",
StartKey: tablecodec.EncodeRowKey(partID1, []byte("b")),
EndKey: tablecodec.EncodeRowKey(partID2, []byte("a")),
},
{
Name: "3.sst",
StartKey: tablecodec.EncodeRowKey(partID2, []byte("a")),
EndKey: tablecodec.EncodeRowKey(partID2+1, []byte("b")),
},
// shouldn't include 4.sst
{
Name: "4.sst",
StartKey: tablecodec.EncodeRowKey(tblID-1, []byte("a")),
EndKey: tablecodec.EncodeRowKey(tblID, []byte("a")),
},
}

meta := mockBackupMeta(mockSchemas, mockFiles)
dbs, err := LoadBackupTables(meta)
tbl := dbs[dbName.String()].GetTable(tblName.String())
c.Assert(err, IsNil)
c.Assert(tbl.Files, HasLen, 3)
contains := func(name string) bool {
for i := range tbl.Files {
if tbl.Files[i].Name == name {
return true
}
}
return false
}
c.Assert(contains("1.sst"), IsTrue)
c.Assert(contains("2.sst"), IsTrue)
c.Assert(contains("3.sst"), IsTrue)
}

func buildTableAndFiles(name string, tableID, fileCount int) (*model.TableInfo, []*backup.File) {
tblName := model.NewCIStr(name)
tblID := int64(tableID)
mockTbl := &model.TableInfo{
ID: tblID,
Name: tblName,
}

mockFiles := make([]*backup.File, 0, fileCount)
for i := 0; i < fileCount; i++ {
mockFiles = append(mockFiles, &backup.File{
Name: fmt.Sprintf("%d-%d.sst", tableID, i),
StartKey: tablecodec.EncodeRowKey(tblID, []byte(fmt.Sprintf("%09d", i))),
EndKey: tablecodec.EncodeRowKey(tblID, []byte(fmt.Sprintf("%09d", i+1))),
})
}
return mockTbl, mockFiles
}

func buildBenchmarkBackupmeta(c *C, dbName string, tableCount, fileCountPerTable int) *backup.BackupMeta {
mockFiles := make([]*backup.File, 0, tableCount*fileCountPerTable)
mockSchemas := make([]*backup.Schema, 0, tableCount)
for i := 1; i <= tableCount; i++ {
mockTbl, files := buildTableAndFiles(fmt.Sprintf("mock%d", i), i, fileCountPerTable)
mockFiles = append(mockFiles, files...)

mockDB := model.DBInfo{
ID: 1,
Name: model.NewCIStr(dbName),
Tables: []*model.TableInfo{
mockTbl,
},
}
dbBytes, err := json.Marshal(mockDB)
c.Assert(err, IsNil)
tblBytes, err := json.Marshal(mockTbl)
c.Assert(err, IsNil)
mockSchemas = append(mockSchemas, &backup.Schema{
Db: dbBytes,
Table: tblBytes,
})
}
return mockBackupMeta(mockSchemas, mockFiles)
}

// Run `go test github.com/pingcap/br/pkg/utils -check.b -test.v` to get benchmark result.
func (r *testSchemaSuite) BenchmarkLoadBackupMeta64(c *C) {
meta := buildBenchmarkBackupmeta(c, "bench", 64, 64)
c.ResetTimer()
for i := 0; i < c.N; i++ {
dbs, err := LoadBackupTables(meta)
c.Assert(err, IsNil)
c.Assert(dbs, HasLen, 1)
c.Assert(dbs, HasKey, "bench")
c.Assert(dbs["bench"].Tables, HasLen, 64)
}
}

func (r *testSchemaSuite) BenchmarkLoadBackupMeta1024(c *C) {
meta := buildBenchmarkBackupmeta(c, "bench", 1024, 64)
c.ResetTimer()
for i := 0; i < c.N; i++ {
dbs, err := LoadBackupTables(meta)
c.Assert(err, IsNil)
c.Assert(dbs, HasLen, 1)
c.Assert(dbs, HasKey, "bench")
c.Assert(dbs["bench"].Tables, HasLen, 1024)
}
}

func (r *testSchemaSuite) BenchmarkLoadBackupMeta10240(c *C) {
meta := buildBenchmarkBackupmeta(c, "bench", 10240, 64)
c.ResetTimer()
for i := 0; i < c.N; i++ {
dbs, err := LoadBackupTables(meta)
c.Assert(err, IsNil)
c.Assert(dbs, HasLen, 1)
c.Assert(dbs, HasKey, "bench")
c.Assert(dbs["bench"].Tables, HasLen, 10240)
}
}