diff --git a/br/pkg/lightning/checkpoints/checkpoints.go b/br/pkg/lightning/checkpoints/checkpoints.go index 13817e28eb668..d20134660de9c 100644 --- a/br/pkg/lightning/checkpoints/checkpoints.go +++ b/br/pkg/lightning/checkpoints/checkpoints.go @@ -262,6 +262,29 @@ func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint { } } +func (ccp *ChunkCheckpoint) UnfinishedSize() int64 { + if ccp.FileMeta.Compression == mydump.CompressionNone { + return ccp.Chunk.EndOffset - ccp.Chunk.Offset + } + return ccp.FileMeta.FileSize - ccp.Chunk.RealOffset +} + +func (ccp *ChunkCheckpoint) TotalSize() int64 { + if ccp.FileMeta.Compression == mydump.CompressionNone { + return ccp.Chunk.EndOffset - ccp.Key.Offset + } + // TODO: compressed file won't be split into chunks, so using FileSize as TotalSize is ok + // change this when we support split compressed file into chunks + return ccp.FileMeta.FileSize +} + +func (ccp *ChunkCheckpoint) FinishedSize() int64 { + if ccp.FileMeta.Compression == mydump.CompressionNone { + return ccp.Chunk.Offset - ccp.Key.Offset + } + return ccp.Chunk.RealOffset - ccp.Key.Offset +} + type EngineCheckpoint struct { Status CheckpointStatus Chunks []*ChunkCheckpoint // a sorted array diff --git a/br/pkg/lightning/mydump/parquet_parser.go b/br/pkg/lightning/mydump/parquet_parser.go index e7ac2baa6d80f..a1b612903c5e8 100644 --- a/br/pkg/lightning/mydump/parquet_parser.go +++ b/br/pkg/lightning/mydump/parquet_parser.go @@ -351,6 +351,12 @@ func (pp *ParquetParser) SetPos(pos int64, rowID int64) error { return nil } +// RealPos implements the Parser interface. +// For parquet it's equal to Pos(). +func (pp *ParquetParser) RealPos() (int64, error) { + return pp.curStart + int64(pp.curIndex), nil +} + // Close closes the parquet file of the parser. // It implements the Parser interface. func (pp *ParquetParser) Close() error { diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index 73f84424bf5e3..512c3789cfa7f 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -94,6 +94,7 @@ type ChunkParser struct { type Chunk struct { Offset int64 EndOffset int64 + RealOffset int64 PrevRowIDMax int64 RowIDMax int64 Columns []string @@ -126,6 +127,7 @@ const ( type Parser interface { Pos() (pos int64, rowID int64) SetPos(pos int64, rowID int64) error + RealPos() (int64, error) Close() error ReadRow() error LastRow() Row @@ -175,6 +177,11 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error { return nil } +// RealPos gets the read position of current reader. +func (parser *blockParser) RealPos() (int64, error) { + return parser.reader.Seek(0, io.SeekCurrent) +} + // Pos returns the current file offset. // Attention: for compressed sql/csv files, pos is the position in uncompressed files func (parser *blockParser) Pos() (pos int64, lastRowID int64) { diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index ffd9173483896..f1eb7934c55e8 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -38,6 +38,9 @@ const ( // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold + // compressDataRatio is a relatively maximum compress ratio for normal compressed data + // It's used to estimate rowIDMax, we use a large value to try to avoid overlapping + compressDataRatio = 500 ) // TableRegion contains information for a table region during import. @@ -302,7 +305,9 @@ func MakeSourceFileRegion( // set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files. // TODO: update progress bar calculation for compressed files. if fi.FileMeta.Compression != CompressionNone { - rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution. + // FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax + // currently we use 500 here. It's a relatively large value for most data. + rowIDMax = fileSize * compressDataRatio / divisor fileSize = TableFileSizeINF } tableRegion := &TableRegion{ diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 3c37342ec71cf..329ef29c98667 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -938,7 +938,7 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { if _, ok := fileChunks[c.Key.Path]; !ok { fileChunks[c.Key.Path] = 0.0 } - remainChunkCnt := float64(c.Chunk.EndOffset-c.Chunk.Offset) / float64(c.Chunk.EndOffset-c.Key.Offset) + remainChunkCnt := float64(c.UnfinishedSize()) / float64(c.TotalSize()) fileChunks[c.Key.Path] += remainChunkCnt } } @@ -1619,7 +1619,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { } else { for _, eng := range cp.Engines { for _, chunk := range eng.Chunks { - totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset + totalDataSizeToRestore += chunk.UnfinishedSize() } } } @@ -2299,6 +2299,8 @@ type deliveredKVs struct { columns []string offset int64 rowID int64 + + realOffset int64 // indicates file reader's current position, only used for compressed files } type deliverResult struct { @@ -2327,6 +2329,8 @@ func (cr *chunkRestore) deliverLoop( dataSynced := true hasMoreKVs := true + var startRealOffset, currRealOffset int64 // save to 0 at first + for hasMoreKVs { var dataChecksum, indexChecksum verify.KVChecksum var columns []string @@ -2335,6 +2339,8 @@ func (cr *chunkRestore) deliverLoop( // chunk checkpoint should stay the same startOffset := cr.chunk.Chunk.Offset currOffset := startOffset + startRealOffset = cr.chunk.Chunk.RealOffset + currRealOffset = startRealOffset rowID := cr.chunk.Chunk.PrevRowIDMax populate: @@ -2349,12 +2355,14 @@ func (cr *chunkRestore) deliverLoop( if p.kvs == nil { // This is the last message. currOffset = p.offset + currRealOffset = p.realOffset hasMoreKVs = false break populate } p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) columns = p.columns currOffset = p.offset + currRealOffset = p.realOffset rowID = p.rowID } case <-ctx.Done(): @@ -2421,6 +2429,7 @@ func (cr *chunkRestore) deliverLoop( cr.chunk.Checksum.Add(&dataChecksum) cr.chunk.Checksum.Add(&indexChecksum) cr.chunk.Chunk.Offset = currOffset + cr.chunk.Chunk.RealOffset = currRealOffset cr.chunk.Chunk.PrevRowIDMax = rowID if m, ok := metric.FromContext(ctx); ok { @@ -2428,16 +2437,21 @@ func (cr *chunkRestore) deliverLoop( // comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0. // but we met it one time, but cannot reproduce it now, we add this check to make code more robust // TODO: reproduce and find the root cause and fix it completely - - delta := currOffset - startOffset + var lowOffset, highOffset int64 + if cr.chunk.FileMeta.Compression != mydump.CompressionNone { + lowOffset, highOffset = startRealOffset, currRealOffset + } else { + lowOffset, highOffset = startOffset, currOffset + } + delta := highOffset - lowOffset if delta >= 0 { m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta)) if rc.status != nil && rc.status.backend == config.BackendTiDB { rc.status.FinishedFileSize.Add(delta) } } else { - deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset), - zap.Int64("start", startOffset)) + deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset), + zap.Int64("start", lowOffset)) } } @@ -2618,14 +2632,22 @@ func (cr *chunkRestore) encodeLoop( canDeliver := false kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt) curOffset := offset - var newOffset, rowID int64 + var newOffset, rowID, realOffset int64 var kvSize uint64 + var realOffsetErr error outLoop: for !canDeliver { readDurStart := time.Now() err = cr.parser.ReadRow() columnNames := cr.parser.Columns() newOffset, rowID = cr.parser.Pos() + if cr.chunk.FileMeta.Compression != mydump.CompressionNone { + realOffset, realOffsetErr = cr.parser.RealPos() + if realOffsetErr != nil { + logger.Warn("fail to get data engine RealPos, progress may not be accurate", + log.ShortError(realOffsetErr), zap.String("file", cr.chunk.FileMeta.Path)) + } + } switch errors.Cause(err) { case nil: @@ -2687,7 +2709,8 @@ func (cr *chunkRestore) encodeLoop( continue } - kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID}) + kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, + rowID: rowID, realOffset: realOffset}) kvSize += kvs.Size() failpoint.Inject("mock-kv-size", func(val failpoint.Value) { kvSize += uint64(val.(int)) @@ -2719,7 +2742,7 @@ func (cr *chunkRestore) encodeLoop( } } - err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset}}) + err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset, realOffset: cr.chunk.FileMeta.FileSize}}) return } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 33bf45e4d344c..c1556e36b0824 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -331,7 +331,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp) if rc.status != nil && rc.status.backend == config.BackendLocal { for _, chunk := range ecp.Chunks { - rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) + rc.status.FinishedFileSize.Add(chunk.TotalSize()) } } } @@ -341,7 +341,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp }(restoreWorker, engineID, engine) } else { for _, chunk := range engine.Chunks { - rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) + rc.status.FinishedFileSize.Add(chunk.TotalSize()) } } } @@ -541,7 +541,7 @@ func (tr *TableRestore) restoreEngine( } var remainChunkCnt float64 if chunk.Chunk.Offset < chunk.Chunk.EndOffset { - remainChunkCnt = float64(chunk.Chunk.EndOffset-chunk.Chunk.Offset) / float64(chunk.Chunk.EndOffset-chunk.Key.Offset) + remainChunkCnt = float64(chunk.UnfinishedSize()) / float64(chunk.TotalSize()) if metrics != nil { metrics.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt) } @@ -616,7 +616,7 @@ func (tr *TableRestore) restoreEngine( totalSQLSize := int64(0) for _, chunk := range cp.Chunks { totalKVSize += chunk.Checksum.SumSize() - totalSQLSize += chunk.Chunk.EndOffset - chunk.Chunk.Offset + totalSQLSize += chunk.UnfinishedSize() } err = chunkErr.Get() diff --git a/br/pkg/lightning/web/progress.go b/br/pkg/lightning/web/progress.go index 8a3412087b94f..d5f3494a14040 100644 --- a/br/pkg/lightning/web/progress.go +++ b/br/pkg/lightning/web/progress.go @@ -64,7 +64,7 @@ func (cpm *checkpointsMap) update(diffs map[string]*checkpoints.TableCheckpointD for _, engine := range cp.Engines { for _, chunk := range engine.Chunks { if engine.Status >= checkpoints.CheckpointStatusAllWritten { - tw += chunk.Chunk.EndOffset - chunk.Key.Offset + tw += chunk.TotalSize() } else { tw += chunk.Chunk.Offset - chunk.Key.Offset } diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 1d5300cfa8d55..5794c813c9d5f 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -80,8 +80,10 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er return io.ReadAll(compressBf) } +// compressReader is a wrapper for compress.Reader type compressReader struct { io.Reader + io.Seeker io.Closer } @@ -97,11 +99,16 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType return &compressReader{ Reader: r, Closer: fileReader, + Seeker: fileReader, }, nil } -func (*compressReader) Seek(_ int64, _ int) (int64, error) { - return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now") +func (c *compressReader) Seek(offset int64, whence int) (int64, error) { + // only support get original reader's current offset + if offset == 0 && whence == io.SeekCurrent { + return c.Seeker.Seek(offset, whence) + } + return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now, offset %d, whence %d", offset, whence) } func (c *compressReader) Close() error {