Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
*: skip creating Domain for backup withtout stats
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Mar 15, 2021
1 parent 1a00477 commit a39df36
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 167 deletions.
2 changes: 1 addition & 1 deletion cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func setPDConfigCommand() *cobra.Command {
return errors.Trace(err)
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
57 changes: 16 additions & 41 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,38 +266,42 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
func BuildBackupRangeAndSchema(
dom *domain.Domain,
storage kv.Storage,
tableFilter filter.Filter,
backupTS uint64,
ignoreStats bool,
) ([]rtree.Range, *Schemas, error) {
info, err := dom.GetSnapshotInfoSchema(backupTS)
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

ranges := make([]rtree.Range, 0)
backupSchemas := newBackupSchemas()
dbs, err := m.ListDatabases()
if err != nil {
return nil, nil, errors.Trace(err)
}

h := dom.StatsHandle()

ranges := make([]rtree.Range, 0)
backupSchemas := newBackupSchemas()
for _, dbInfo := range info.AllSchemas() {
for _, dbInfo := range dbs {
// skip system databases
if util.IsMemOrSysDB(dbInfo.Name.L) {
continue
}

var dbData []byte
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.RowIDAllocType)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.SequenceType)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.AutoRandomType)

if len(dbInfo.Tables) == 0 {
tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
}

if len(tables) == 0 {
log.Warn("It's not necessary for backing up empty database",
zap.Stringer("db", dbInfo.Name))
continue
}
for _, tableInfo := range dbInfo.Tables {

for _, tableInfo := range tables {
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
continue
Expand Down Expand Up @@ -346,36 +350,7 @@ func BuildBackupRangeAndSchema(
}
tableInfo.Indices = tableInfo.Indices[:n]

if dbData == nil {
dbData, err = json.Marshal(dbInfo)
if err != nil {
return nil, nil, errors.Trace(err)
}
}
tableData, err := json.Marshal(tableInfo)
if err != nil {
return nil, nil, errors.Trace(err)
}

var stats []byte
if !ignoreStats {
jsonTable, err := h.DumpStatsToJSON(dbInfo.Name.String(), tableInfo, nil)
if err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
} else {
stats, err = json.Marshal(jsonTable)
if err != nil {
logger.Error("dump table stats failed (cannot serialize)", logutil.ShortError(err))
}
}
}

schema := backuppb.Schema{
Db: dbData,
Table: tableData,
Stats: stats,
}
backupSchemas.pushPending(schema, dbInfo.Name.L, tableInfo.Name.L)
backupSchemas.addSchema(dbInfo, tableInfo)

tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
Expand Down
158 changes: 89 additions & 69 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)
Expand All @@ -30,121 +32,139 @@ const (
DefaultSchemaConcurrency = 64
)

type scheamInfo struct {
tableInfo *model.TableInfo
dbInfo *model.DBInfo
crc64xor uint64
totalKvs uint64
totalBytes uint64
stats *handle.JSONTable
}

// Schemas is task for backuping schemas.
type Schemas struct {
// name -> schema
schemas map[string]backuppb.Schema
backupSchemaCh chan backuppb.Schema
errCh chan error
schemas map[string]*scheamInfo
}

func newBackupSchemas() *Schemas {
return &Schemas{
schemas: make(map[string]backuppb.Schema),
backupSchemaCh: make(chan backuppb.Schema),
errCh: make(chan error),
schemas: make(map[string]*scheamInfo),
}
}

func (pending *Schemas) pushPending(
schema backuppb.Schema,
dbName, tableName string,
func (ss *Schemas) addSchema(
dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
name := fmt.Sprintf("%s.%s",
utils.EncloseName(dbName), utils.EncloseName(tableName))
pending.schemas[name] = schema
utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
ss.schemas[name] = &scheamInfo{
tableInfo: tableInfo,
dbInfo: dbInfo,
}
}

// Start backups schemas.
func (pending *Schemas) Start(
// BackupSchemas backups table info, including checksum and stats.
func (ss *Schemas) BackupSchemas(
ctx context.Context,
store kv.Storage,
statsHandle *handle.Handle,
backupTS uint64,
concurrency uint,
copConcurrency uint,
skipChecksum bool,
updateCh glue.Progress,
) {
) ([]*backuppb.Schema, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Schemas.Start", opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan("Schemas.BackupSchemas", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

workerPool := utils.NewWorkerPool(concurrency, "Schemas")
errg, ectx := errgroup.WithContext(ctx)
go func() {
startAll := time.Now()
for n, s := range pending.schemas {
log.Info("table checksum start", zap.String("table", n))
name := n
schema := s
workerPool.ApplyOnErrorGroup(errg, func() error {
startAll := time.Now()
for n := range ss.schemas {
name := n
schema := ss.schemas[name]
workerPool.ApplyOnErrorGroup(errg, func() error {
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
)

if !skipChecksum {
logger.Info("table checksum start")
start := time.Now()
table := model.TableInfo{}
err := json.Unmarshal(schema.Table, &table)
if err != nil {
return errors.Trace(err)
}
checksumResp, err := calculateChecksum(
ectx, &table, store.GetClient(), backupTS, copConcurrency)
ectx, schema.tableInfo, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
schema.Crc64Xor = checksumResp.Checksum
schema.TotalKvs = checksumResp.TotalKvs
schema.TotalBytes = checksumResp.TotalBytes
log.Info("table checksum finished",
zap.String("table", name),
schema.crc64xor = checksumResp.Checksum
schema.totalKvs = checksumResp.TotalKvs
schema.totalBytes = checksumResp.TotalBytes
logger.Info("table checksum finished",
zap.Uint64("Crc64Xor", checksumResp.Checksum),
zap.Uint64("TotalKvs", checksumResp.TotalKvs),
zap.Uint64("TotalBytes", checksumResp.TotalBytes),
zap.Duration("take", time.Since(start)))
pending.backupSchemaCh <- schema
}
if statsHandle != nil {
jsonTable, err := statsHandle.DumpStatsToJSONBySnapshot(
schema.dbInfo.Name.String(), schema.tableInfo, backupTS)
if err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
}
schema.stats = jsonTable
}

updateCh.Inc()
return nil
})
updateCh.Inc()
return nil
})
}
if err := errg.Wait(); err != nil {
return nil, errors.Trace(err)
}
log.Info("backup checksum", zap.Duration("take", time.Since(startAll)))
summary.CollectDuration("backup checksum", time.Since(startAll))

schemas := make([]*backuppb.Schema, 0, len(ss.schemas))
for name, schema := range ss.schemas {
dbBytes, err := json.Marshal(schema.dbInfo)
if err != nil {
return nil, errors.Trace(err)
}
if err := errg.Wait(); err != nil {
pending.errCh <- err
tableBytes, err := json.Marshal(schema.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
close(pending.backupSchemaCh)
log.Info("backup checksum",
zap.Duration("take", time.Since(startAll)))
summary.CollectDuration("backup checksum", time.Since(startAll))
}()
}

// FinishTableChecksum waits until all schemas' checksums are verified.
func (pending *Schemas) FinishTableChecksum() ([]*backuppb.Schema, error) {
schemas := make([]*backuppb.Schema, 0, len(pending.schemas))
for {
select {
case s, ok := <-pending.backupSchemaCh:
if !ok {
return schemas, nil
var statsBytes []byte
if schema.stats != nil {
statsBytes, err = json.Marshal(schema.stats)
if err != nil {
return nil, errors.Trace(err)
}
schemas = append(schemas, &s)
case err := <-pending.errCh:
return nil, errors.Trace(err)
}
}
}
s := &backuppb.Schema{
Db: dbBytes,
Table: tableBytes,
Crc64Xor: schema.crc64xor,
TotalKvs: schema.totalKvs,
TotalBytes: schema.totalBytes,
Stats: statsBytes,
}
// Delete scheme ASAP to help GC.
delete(ss.schemas, name)

// CopyMeta copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (pending *Schemas) CopyMeta() []*backuppb.Schema {
schemas := make([]*backuppb.Schema, 0, len(pending.schemas))
for _, v := range pending.schemas {
schema := v
schemas = append(schemas, &schema)
schemas = append(schemas, s)
}
return schemas
return schemas, nil
}

// Len returns the number of schemas.
func (pending *Schemas) Len() int {
return len(pending.schemas)
func (ss *Schemas) Len() int {
return len(ss.schemas)
}

func calculateChecksum(
Expand Down
Loading

0 comments on commit a39df36

Please sign in to comment.