Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: refine progress for compress files import #39219

Merged
merged 12 commits into from
Dec 2, 2022
23 changes: 23 additions & 0 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,29 @@ func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint {
}
}

func (ccp *ChunkCheckpoint) UnfinishedSize() int64 {
if ccp.FileMeta.Compression == mydump.CompressionNone {
return ccp.Chunk.EndOffset - ccp.Chunk.Offset
}
return ccp.FileMeta.FileSize - ccp.Chunk.RealOffset
}

func (ccp *ChunkCheckpoint) TotalSize() int64 {
if ccp.FileMeta.Compression == mydump.CompressionNone {
return ccp.Chunk.EndOffset - ccp.Key.Offset
}
// TODO: compressed file won't be split into chunks, so using FileSize as TotalSize is ok
// change this when we support split compressed file into chunks
return ccp.FileMeta.FileSize
}

func (ccp *ChunkCheckpoint) FinishedSize() int64 {
if ccp.FileMeta.Compression == mydump.CompressionNone {
return ccp.Chunk.Offset - ccp.Key.Offset
}
return ccp.Chunk.RealOffset - ccp.Key.Offset
}

type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ func (pp *ParquetParser) SetPos(pos int64, rowID int64) error {
return nil
}

// RealPos implements the Parser interface.
// For parquet it's equal to Pos().
func (pp *ParquetParser) RealPos() (int64, error) {
return pp.curStart + int64(pp.curIndex), nil
}

// Close closes the parquet file of the parser.
// It implements the Parser interface.
func (pp *ParquetParser) Close() error {
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ChunkParser struct {
type Chunk struct {
Offset int64
EndOffset int64
RealOffset int64
PrevRowIDMax int64
RowIDMax int64
Columns []string
Expand Down Expand Up @@ -126,6 +127,7 @@ const (
type Parser interface {
Pos() (pos int64, rowID int64)
SetPos(pos int64, rowID int64) error
RealPos() (int64, error)
Close() error
ReadRow() error
LastRow() Row
Expand Down Expand Up @@ -175,6 +177,11 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error {
return nil
}

// RealPos gets the read position of current reader.
func (parser *blockParser) RealPos() (int64, error) {
return parser.reader.Seek(0, io.SeekCurrent)
}

// Pos returns the current file offset.
// Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
// compressDataRatio is a relatively maximum compress ratio for normal compressed data
// It's used to estimate rowIDMax, we use a large value to try to avoid overlapping
compressDataRatio = 500
)

// TableRegion contains information for a table region during import.
Expand Down Expand Up @@ -302,7 +305,9 @@ func MakeSourceFileRegion(
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.
// TODO: update progress bar calculation for compressed files.
if fi.FileMeta.Compression != CompressionNone {
rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution.
// FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax
// currently we use 500 here. It's a relatively large value for most data.
rowIDMax = fileSize * compressDataRatio / divisor
fileSize = TableFileSizeINF
}
tableRegion := &TableRegion{
Expand Down
41 changes: 32 additions & 9 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error {
if _, ok := fileChunks[c.Key.Path]; !ok {
fileChunks[c.Key.Path] = 0.0
}
remainChunkCnt := float64(c.Chunk.EndOffset-c.Chunk.Offset) / float64(c.Chunk.EndOffset-c.Key.Offset)
remainChunkCnt := float64(c.UnfinishedSize()) / float64(c.TotalSize())
fileChunks[c.Key.Path] += remainChunkCnt
}
}
Expand Down Expand Up @@ -1619,7 +1619,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
} else {
for _, eng := range cp.Engines {
for _, chunk := range eng.Chunks {
totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset
totalDataSizeToRestore += chunk.UnfinishedSize()
}
}
}
Expand Down Expand Up @@ -2299,6 +2299,8 @@ type deliveredKVs struct {
columns []string
offset int64
rowID int64

realOffset int64 // indicates file reader's current position, only used for compressed files
}

type deliverResult struct {
Expand Down Expand Up @@ -2327,6 +2329,8 @@ func (cr *chunkRestore) deliverLoop(

dataSynced := true
hasMoreKVs := true
var startRealOffset, currRealOffset int64 // save to 0 at first

for hasMoreKVs {
var dataChecksum, indexChecksum verify.KVChecksum
var columns []string
Expand All @@ -2335,6 +2339,8 @@ func (cr *chunkRestore) deliverLoop(
// chunk checkpoint should stay the same
startOffset := cr.chunk.Chunk.Offset
currOffset := startOffset
startRealOffset = cr.chunk.Chunk.RealOffset
currRealOffset = startRealOffset
rowID := cr.chunk.Chunk.PrevRowIDMax

populate:
Expand All @@ -2349,12 +2355,14 @@ func (cr *chunkRestore) deliverLoop(
if p.kvs == nil {
// This is the last message.
currOffset = p.offset
currRealOffset = p.realOffset
hasMoreKVs = false
break populate
}
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
columns = p.columns
currOffset = p.offset
currRealOffset = p.realOffset
rowID = p.rowID
}
case <-ctx.Done():
Expand Down Expand Up @@ -2421,23 +2429,29 @@ func (cr *chunkRestore) deliverLoop(
cr.chunk.Checksum.Add(&dataChecksum)
cr.chunk.Checksum.Add(&indexChecksum)
cr.chunk.Chunk.Offset = currOffset
cr.chunk.Chunk.RealOffset = currRealOffset
cr.chunk.Chunk.PrevRowIDMax = rowID

if m, ok := metric.FromContext(ctx); ok {
// value of currOffset comes from parser.pos which increase monotonically. the init value of parser.pos
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely

delta := currOffset - startOffset
var lowOffset, highOffset int64
if cr.chunk.FileMeta.Compression != mydump.CompressionNone {
lowOffset, highOffset = startRealOffset, currRealOffset
} else {
lowOffset, highOffset = startOffset, currOffset
}
delta := highOffset - lowOffset
if delta >= 0 {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta))
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))
}
}

Expand Down Expand Up @@ -2618,14 +2632,22 @@ func (cr *chunkRestore) encodeLoop(
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
curOffset := offset
var newOffset, rowID int64
var newOffset, rowID, realOffset int64
var kvSize uint64
var realOffsetErr error
outLoop:
for !canDeliver {
readDurStart := time.Now()
err = cr.parser.ReadRow()
columnNames := cr.parser.Columns()
newOffset, rowID = cr.parser.Pos()
if cr.chunk.FileMeta.Compression != mydump.CompressionNone {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not compressed, the realOffset is not assigned and 0 is used. Is this the expected behavior ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Not compressed file won't use realOffset.

realOffset, realOffsetErr = cr.parser.RealPos()
if realOffsetErr != nil {
logger.Warn("fail to get data engine RealPos, progress may not be accurate",
log.ShortError(realOffsetErr), zap.String("file", cr.chunk.FileMeta.Path))
}
}

switch errors.Cause(err) {
case nil:
Expand Down Expand Up @@ -2687,7 +2709,8 @@ func (cr *chunkRestore) encodeLoop(
continue
}

kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID})
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset,
rowID: rowID, realOffset: realOffset})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
Expand Down Expand Up @@ -2719,7 +2742,7 @@ func (cr *chunkRestore) encodeLoop(
}
}

err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset}})
err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset, realOffset: cr.chunk.FileMeta.FileSize}})
return
}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil && rc.status.backend == config.BackendLocal {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
rc.status.FinishedFileSize.Add(chunk.TotalSize())
}
}
}
Expand All @@ -341,7 +341,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
}(restoreWorker, engineID, engine)
} else {
for _, chunk := range engine.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
rc.status.FinishedFileSize.Add(chunk.TotalSize())
}
}
}
Expand Down Expand Up @@ -541,7 +541,7 @@ func (tr *TableRestore) restoreEngine(
}
var remainChunkCnt float64
if chunk.Chunk.Offset < chunk.Chunk.EndOffset {
remainChunkCnt = float64(chunk.Chunk.EndOffset-chunk.Chunk.Offset) / float64(chunk.Chunk.EndOffset-chunk.Key.Offset)
remainChunkCnt = float64(chunk.UnfinishedSize()) / float64(chunk.TotalSize())
if metrics != nil {
metrics.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt)
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func (tr *TableRestore) restoreEngine(
totalSQLSize := int64(0)
for _, chunk := range cp.Chunks {
totalKVSize += chunk.Checksum.SumSize()
totalSQLSize += chunk.Chunk.EndOffset - chunk.Chunk.Offset
totalSQLSize += chunk.UnfinishedSize()
}

err = chunkErr.Get()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/web/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (cpm *checkpointsMap) update(diffs map[string]*checkpoints.TableCheckpointD
for _, engine := range cp.Engines {
for _, chunk := range engine.Chunks {
if engine.Status >= checkpoints.CheckpointStatusAllWritten {
tw += chunk.Chunk.EndOffset - chunk.Key.Offset
tw += chunk.TotalSize()
} else {
tw += chunk.Chunk.Offset - chunk.Key.Offset
}
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
return io.ReadAll(compressBf)
}

// compressReader is a wrapper for compress.Reader
type compressReader struct {
io.Reader
io.Seeker
io.Closer
}

Expand All @@ -97,11 +99,16 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return &compressReader{
Reader: r,
Closer: fileReader,
Seeker: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
func (c *compressReader) Seek(offset int64, whence int) (int64, error) {
// only support get original reader's current offset
if offset == 0 && whence == io.SeekCurrent {
return c.Seeker.Seek(offset, whence)
}
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now, offset %d, whence %d", offset, whence)
}

func (c *compressReader) Close() error {
Expand Down