Skip to content

Commit

Permalink
log-backup: restore meta kv with batch method (pingcap#37100) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Aug 16, 2022
1 parent 9d6a61d commit be9be01
Show file tree
Hide file tree
Showing 2 changed files with 396 additions and 45 deletions.
227 changes: 182 additions & 45 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,14 +1771,6 @@ func (rc *Client) ReadStreamDataFiles(
}
}

// sort files firstly.
slices.SortFunc(mFiles, func(i, j *backuppb.DataFileInfo) bool {
if i.ResolvedTs > 0 && j.ResolvedTs > 0 {
return i.ResolvedTs < j.ResolvedTs
} else {
return i.MaxTs < j.MaxTs
}
})
return dFiles, mFiles, nil
}

Expand Down Expand Up @@ -2000,6 +1992,31 @@ func (rc *Client) InitSchemasReplaceForDDL(
return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
slices.SortFunc(files, func(i, j *backuppb.DataFileInfo) bool {
if i.GetMinTs() < j.GetMinTs() {
return true
} else if i.GetMinTs() > j.GetMinTs() {
return false
}

if i.GetMaxTs() < j.GetMaxTs() {
return true
} else if i.GetMaxTs() > j.GetMaxTs() {
return false
}

if i.GetResolvedTs() < j.GetResolvedTs() {
return true
} else if i.GetResolvedTs() > j.GetResolvedTs() {
return false
}

return true
})
return files
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rc *Client) RestoreMetaKVFiles(
ctx context.Context,
Expand All @@ -2008,7 +2025,10 @@ func (rc *Client) RestoreMetaKVFiles(
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error {
// sort files firstly.
files = SortMetaKVFiles(files)
filesInWriteCF := make([]*backuppb.DataFileInfo, 0, len(files))
filesInDefaultCF := make([]*backuppb.DataFileInfo, 0, len(files))

// The k-v events in default CF should be restored firstly. The reason is that:
// The error of transactions of meta could happen if restore write CF events successfully,
Expand All @@ -2018,30 +2038,39 @@ func (rc *Client) RestoreMetaKVFiles(
filesInWriteCF = append(filesInWriteCF, f)
continue
}

if f.Type == backuppb.FileType_Delete {
// this should happen abnormally.
// only do some preventive checks here.
log.Warn("detected delete file of meta key, skip it", zap.Any("file", f))
continue
}

kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
if err != nil {
return errors.Trace(err)
if f.Cf == stream.DefaultCF {
filesInDefaultCF = append(filesInDefaultCF, f)
}
updateStats(kvCount, size)
progressInc()
}

// Restore files in default CF.
if err := rc.RestoreMetaKVFilesWithBatchMethod(
ctx,
filesInDefaultCF,
schemasReplace,
updateStats,
progressInc,
rc.RestoreBatchMetaKVFiles,
); err != nil {
return errors.Trace(err)
}

// Restore files in write CF.
for _, f := range filesInWriteCF {
kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
if err != nil {
return errors.Trace(err)
}
updateStats(kvCount, size)
progressInc()
if err := rc.RestoreMetaKVFilesWithBatchMethod(
ctx,
filesInWriteCF,
schemasReplace,
updateStats,
progressInc,
rc.RestoreBatchMetaKVFiles,
); err != nil {
return errors.Trace(err)
}

// Update global schema version and report all of TiDBs.
Expand All @@ -2051,41 +2080,128 @@ func (rc *Client) RestoreMetaKVFiles(
return nil
}

// RestoreMetaKVFile tries to restore a file about meta kv-event from stream-backup.
func (rc *Client) RestoreMetaKVFile(
func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
ctx context.Context,
file *backuppb.DataFileInfo,
sr *stream.SchemasReplace,
) (uint64, uint64, error) {
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
restoreBatch func(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error,
) error {
var (
kvCount uint64
size uint64
rangeMin uint64
rangeMax uint64
idx int
)
log.Info("restore meta kv events", zap.String("file", file.Path),
zap.String("cf", file.Cf), zap.Int64("kv-count", file.NumberOfEntries),
zap.Uint64("min-ts", file.MinTs), zap.Uint64("max-ts", file.MaxTs))
for i, f := range files {
if i == 0 {
idx = i
rangeMax = f.MaxTs
rangeMin = f.MinTs
} else {
if f.MinTs <= rangeMax {
rangeMin = mathutil.Min(rangeMin, f.MinTs)
rangeMax = mathutil.Max(rangeMax, f.MaxTs)
} else {
err := restoreBatch(ctx, files[idx:i], schemasReplace, updateStats, progressInc)
if err != nil {
return errors.Trace(err)
}
idx = i
rangeMin = f.MinTs
rangeMax = f.MaxTs
}
}

if i == len(files)-1 {
err := restoreBatch(ctx, files[idx:], schemasReplace, updateStats, progressInc)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}

// the kv entry with ts, the ts is decoded from entry.
type kvEntryWithTS struct {
e kv.Entry
ts uint64
}

func (rc *Client) RestoreBatchMetaKVFiles(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error {
if len(files) == 0 {
return nil
}

// read all of entries from files.
kvEntries := make([]*kvEntryWithTS, 0)
for _, f := range files {
es, err := rc.readAllEntries(ctx, f)
if err != nil {
return errors.Trace(err)
}

kvEntries = append(kvEntries, es...)
}

// sort these entries.
slices.SortFunc(kvEntries, func(i, j *kvEntryWithTS) bool {
return i.ts < j.ts
})

// restore these entries with rawPut() method.
kvCount, size, err := rc.restoreMetaKvEntries(ctx, schemasReplace, kvEntries, files[0].GetCf())
if err != nil {
return errors.Trace(err)
}

updateStats(kvCount, size)
for i := 0; i < len(files); i++ {
progressInc()
}
return nil
}

func (rc *Client) readAllEntries(
ctx context.Context,
file *backuppb.DataFileInfo,
) ([]*kvEntryWithTS, error) {
kvEntries := make([]*kvEntryWithTS, 0)

rc.rawKVClient.SetColumnFamily(file.GetCf())
buff, err := rc.storage.ReadFile(ctx, file.Path)
if err != nil {
return 0, 0, errors.Trace(err)
return nil, errors.Trace(err)
}

if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) {
return 0, 0, errors.Annotatef(berrors.ErrInvalidMetaFile,
return nil, errors.Annotatef(berrors.ErrInvalidMetaFile,
"checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:])
}

iter := stream.NewEventIterator(buff)
for iter.Valid() {
iter.Next()
if iter.GetError() != nil {
return 0, 0, errors.Trace(iter.GetError())
return nil, errors.Trace(iter.GetError())
}

txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()}
ts, err := GetKeyTS(txnEntry.Key)
if err != nil {
return 0, 0, errors.Trace(err)
return nil, errors.Trace(err)
}

// The commitTs in write CF need be limited on [startTs, restoreTs].
Expand All @@ -2105,20 +2221,41 @@ func (rc *Client) RestoreMetaKVFile(
log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key))
continue
}
log.Debug("txn entry", zap.Uint64("key-ts", ts), zap.Int("txnKey-len", len(txnEntry.Key)),
zap.Int("txnValue-len", len(txnEntry.Value)), zap.ByteString("txnKey", txnEntry.Key))
newEntry, err := sr.RewriteKvEntry(&txnEntry, file.Cf)
kvEntries = append(kvEntries, &kvEntryWithTS{e: txnEntry, ts: ts})
}

return kvEntries, nil
}

func (rc *Client) restoreMetaKvEntries(
ctx context.Context,
sr *stream.SchemasReplace,
entries []*kvEntryWithTS,
columnFamily string,
) (uint64, uint64, error) {
var (
kvCount uint64
size uint64
)

rc.rawKVClient.SetColumnFamily(columnFamily)

for _, entry := range entries {
log.Debug("before rewrte entry", zap.Uint64("key-ts", entry.ts), zap.Int("key-len", len(entry.e.Key)),
zap.Int("value-len", len(entry.e.Value)), zap.ByteString("key", entry.e.Key))

newEntry, err := sr.RewriteKvEntry(&entry.e, columnFamily)
if err != nil {
log.Error("rewrite txn entry failed", zap.Int("klen", len(txnEntry.Key)),
logutil.Key("txn-key", txnEntry.Key))
log.Error("rewrite txn entry failed", zap.Int("klen", len(entry.e.Key)),
logutil.Key("txn-key", entry.e.Key))
return 0, 0, errors.Trace(err)
} else if newEntry == nil {
continue
}
log.Debug("rewrite txn entry", zap.Int("newKey-len", len(newEntry.Key)),
zap.Int("newValue-len", len(txnEntry.Value)), zap.ByteString("newkey", newEntry.Key))
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))

if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, ts); err != nil {
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}

Expand Down
Loading

0 comments on commit be9be01

Please sign in to comment.