Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support sample for compressed files for adjustment #39680

Merged
merged 13 commits into from
Dec 19, 2022
Binary file not shown.
93 changes: 90 additions & 3 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mydump

import (
"context"
"io"
"path/filepath"
"sort"
"strings"
Expand All @@ -30,6 +31,9 @@ import (
"go.uber.org/zap"
)

// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files
const sampleCompressedFileSize = 4 * 1024

// MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
type MDDatabaseMeta struct {
Name string
Expand Down Expand Up @@ -82,7 +86,9 @@ type SourceFileMeta struct {
Compression Compression
SortKey string
FileSize int64
ExtendData ExtendColumnData
// WARNING: variables below are not persistent
ExtendData ExtendColumnData
RealSize int64
}

// NewMDTableMeta creates an Mydumper table meta with specified character set.
Expand Down Expand Up @@ -386,7 +392,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context) error {
// set a dummy `FileInfo` here without file meta because we needn't restore the table schema
tableMeta, _, _ := s.insertTable(FileInfo{TableName: fileInfo.TableName})
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo)
tableMeta.TotalSize += fileInfo.FileMeta.FileSize
tableMeta.TotalSize += fileInfo.FileMeta.RealSize
}

for _, dbMeta := range s.loader.dbs {
Expand Down Expand Up @@ -453,7 +459,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size

info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size, RealSize: size},
}

if s.loader.shouldSkip(&info.TableName) {
Expand All @@ -470,6 +476,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
if info.FileMeta.Compression != CompressionNone {
compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("[loader] fail to calculate data file compress compress ratio",
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type))
} else {
info.FileMeta.RealSize = int64(compressRatio * float64(info.FileMeta.FileSize))
}
}
s.tableDatas = append(s.tableDatas, info)
}

Expand Down Expand Up @@ -648,3 +663,75 @@ func (l *MDLoader) GetDatabases() []*MDDatabaseMeta {
func (l *MDLoader) GetStore() storage.ExternalStorage {
return l.store
}

func calculateFileBytes(ctx context.Context,
dataFile string,
compressType storage.CompressType,
store storage.ExternalStorage,
offset int64) (tot int, pos int64, err error) {
bytes := make([]byte, sampleCompressedFileSize)
reader, err := store.Open(ctx, dataFile)
if err != nil {
return 0, 0, errors.Trace(err)
}
defer reader.Close()

compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset)
if err != nil {
return 0, 0, errors.Trace(err)
}

readBytes := func() error {
n, err2 := compressReader.Read(bytes)
if err2 != nil && !strings.Contains(err2.Error(), "EOF") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use errors.Cause(err2) != io.EOF?

Copy link
Contributor Author

@lichunzhu lichunzhu Dec 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether S3Reader or GCSReader can correctly return io.EOF before. I check their code and it sames that they can. I will change this.

return err2
}
tot += n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is n equal to sampleCompressedFileSize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand your problem. n is the bytes read in one process.

return err2
}

if offset == 0 {
err = readBytes()
if err != nil && !strings.Contains(err.Error(), "EOF") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return 0, 0, err
}
pos, err = compressReader.Seek(0, io.SeekCurrent)
if err != nil {
return 0, 0, errors.Trace(err)
}
return tot, pos, nil
}

for {
err = readBytes()
if err != nil {
break
}
}
if err != nil && !strings.Contains(err.Error(), "EOF") {
return 0, 0, errors.Trace(err)
}
return tot, offset, nil
}

// SampleFileCompressRatio samples the compress ratio of the compressed file.
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
if fileMeta.Compression == CompressionNone {
return 1, nil
}
compressType, err := ToStorageCompressType(fileMeta.Compression)
if err != nil {
return 0, err
}
// read first time, aims to find a valid end pos in compressed file
_, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, 0)
if err != nil {
return 0, err
}
// read second time, original reader ends at first time's valid pos, compute sample data compress ratio
tot, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, pos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why we need to read twice here? Would you elaborate it in the PR or in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the above comment saying:
We read first time aiming to find a valid compressed file offset. If we continue read now, the below file reader may process more data and buffer them in memory.
So we use a second reading to limit the below file reader only read n bytes(n is the valid position we find in the first reading). Then we read all the data out to calculate the compress ratio.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I finally understand what you mean by reading the compression reader code😢

  1. find a valid offset n
  2. read n byte of data from the compressed file, then calculate the uncompressed data (m bytes)
  3. compress ratio: m/n

Am I right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's what I mean.

if err != nil {
return 0, err
}
return float64(tot) / float64(pos), nil
}
33 changes: 33 additions & 0 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package mydump_test

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1053,3 +1055,34 @@ func TestExternalDataRoutes(t *testing.T) {
require.Equal(t, expectedExtendVals[i], fileInfo.FileMeta.ExtendData.Values)
}
}

func TestSampleFileCompressRatio(t *testing.T) {
s := newTestMydumpLoaderSuite(t)
store, err := storage.NewLocalStorage(s.sourceDir)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

byteArray := make([]byte, 0, 4096)
bf := bytes.NewBuffer(byteArray)
compressWriter := gzip.NewWriter(bf)
csvData := []byte("aaaa\n")
for i := 0; i < 1000; i++ {
_, err = compressWriter.Write(csvData)
require.NoError(t, err)
}
err = compressWriter.Flush()
require.NoError(t, err)

fileName := "test_1.t1.csv.gz"
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

ratio, err := md.SampleFileCompressRatio(ctx, md.SourceFileMeta{
Path: fileName,
Compression: md.CompressionGZ,
}, store)
require.NoError(t, err)
require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5)
}
28 changes: 11 additions & 17 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ import (
)

const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
// compressDataRatio is a relatively maximum compress ratio for normal compressed data
// It's used to estimate rowIDMax, we use a large value to try to avoid overlapping
compressDataRatio = 500
// CompressSizeFactor is used to adjust compressed data size
CompressSizeFactor = 5
)

// TableRegion contains information for a table region during import.
Expand Down Expand Up @@ -303,11 +301,8 @@ func MakeSourceFileRegion(
rowIDMax := fileSize / divisor
// for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax.
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.
// TODO: update progress bar calculation for compressed files.
if fi.FileMeta.Compression != CompressionNone {
// FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax
// currently we use 500 here. It's a relatively large value for most data.
rowIDMax = fileSize * compressDataRatio / divisor
rowIDMax = fi.FileMeta.RealSize * CompressSizeFactor / divisor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info.FileMeta.RealSize = int64(compressRatio * float64(info.FileMeta.FileSize))

The real size is the size of the de-compressed file, so we don't need to multiply the CompressSizeFactor anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The real size is the estimated file size. There are some cases that the first few bytes of this compressed file has lower compress ratio than the whole compressed file. So we still need to muliply this factor to make sure the limitation is loose.

I will add this in comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

fileSize = TableFileSizeINF
}
tableRegion := &TableRegion{
Expand All @@ -317,24 +312,23 @@ func MakeSourceFileRegion(
Chunk: Chunk{
Offset: 0,
EndOffset: fileSize,
RealOffset: 0,
PrevRowIDMax: 0,
RowIDMax: rowIDMax,
},
}

regionTooBig := false
if fi.FileMeta.Compression == CompressionNone {
regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold
} else {
regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold
regionSize := tableRegion.Size()
if fi.FileMeta.Compression != CompressionNone {
regionSize = fi.FileMeta.RealSize
}
if regionTooBig {
if regionSize > tableRegionSizeWarningThreshold {
log.FromContext(ctx).Warn(
"file is too big to be processed efficiently; we suggest splitting it at 256 MB each",
zap.String("file", fi.FileMeta.Path),
zap.Int64("size", dataFileSize))
zap.Int64("size", regionSize))
}
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.RealSize)}, nil
}

// because parquet files can't seek efficiently, there is no benefit in split.
Expand Down
56 changes: 55 additions & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func TestMakeSourceFileRegion(t *testing.T) {
store, err := storage.NewLocalStorage(".")
assert.NoError(t, err)

// test - no compression
fileInfo.FileMeta.Compression = CompressionNone
regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
Expand All @@ -221,6 +220,61 @@ func TestMakeSourceFileRegion(t *testing.T) {
assert.Len(t, regions[0].Chunk.Columns, 0)
}

func TestCompressedMakeSourceFileRegion(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
},
}
filePath := "./csv/split_large_file.csv.zst"
dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()

fileInfo := FileInfo{FileMeta: SourceFileMeta{
Path: filePath,
Type: SourceTypeCSV,
Compression: CompressionZStd,
FileSize: fileSize,
}}
colCnt := 3

ctx := context.Background()
ioWorkers := worker.NewPool(ctx, 4, "io")
store, err := storage.NewLocalStorage(".")
assert.NoError(t, err)
compressRatio, err := SampleFileCompressRatio(ctx, fileInfo.FileMeta, store)
require.NoError(t, err)
fileInfo.FileMeta.RealSize = int64(compressRatio * float64(fileInfo.FileMeta.FileSize))

regions, sizes, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
assert.Len(t, regions, 1)
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, int64(0), regions[0].Chunk.RealOffset)
assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset)
rowIDMax := fileInfo.FileMeta.RealSize * CompressSizeFactor / int64(colCnt)
assert.Equal(t, rowIDMax, regions[0].Chunk.RowIDMax)
assert.Len(t, regions[0].Chunk.Columns, 0)
assert.Equal(t, fileInfo.FileMeta.RealSize, int64(sizes[0]))
}

func TestSplitLargeFile(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport
FileMeta: mydump.SourceFileMeta{
Path: tblDataFile.FileName,
FileSize: int64(fileSize),
RealSize: int64(fileSize),
},
}
fileName := tblDataFile.FileName
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
case CheckLocalDiskPlacement:
return NewLocalDiskPlacementCheckItem(b.cfg), nil
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
default:
Expand Down
26 changes: 23 additions & 3 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (ci *largeFileCheckItem) Check(ctx context.Context) (*CheckResult, error) {
for _, db := range ci.dbMetas {
for _, t := range db.Tables {
for _, f := range t.DataFiles {
if f.FileMeta.FileSize > defaultCSVSize {
if f.FileMeta.RealSize > defaultCSVSize {
theResult.Message = fmt.Sprintf("large csv: %s file exists and it will slow down import performance", f.FileMeta.Path)
theResult.Passed = false
}
Expand Down Expand Up @@ -484,23 +484,43 @@ func (ci *localDiskPlacementCheckItem) Check(ctx context.Context) (*CheckResult,
type localTempKVDirCheckItem struct {
cfg *config.Config
preInfoGetter PreRestoreInfoGetter
dbMetas []*mydump.MDDatabaseMeta
}

func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter) PrecheckItem {
func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem {
return &localTempKVDirCheckItem{
cfg: cfg,
preInfoGetter: preInfoGetter,
dbMetas: dbMetas,
}
}

func (ci *localTempKVDirCheckItem) GetCheckItemID() CheckItemID {
return CheckLocalTempKVDir
}

func (ci *localTempKVDirCheckItem) hasCompressedFiles() bool {
for _, dbMeta := range ci.dbMetas {
for _, tbMeta := range dbMeta.Tables {
for _, file := range tbMeta.DataFiles {
if file.FileMeta.Compression != mydump.CompressionNone {
return true
}
}
}
}
return false
}

func (ci *localTempKVDirCheckItem) Check(ctx context.Context) (*CheckResult, error) {
severity := Critical
// for cases that have compressed files, the estimated size may not be accurate, set severity to Warn to avoid failure
if ci.hasCompressedFiles() {
severity = Warn
}
theResult := &CheckResult{
Item: ci.GetCheckItemID(),
Severity: Critical,
Severity: severity,
}
storageSize, err := common.GetStorageSize(ci.cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
Loading