Skip to content

Commit

Permalink
lightning: fix MakeTableRegions may panic when context is canceled (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Apr 20, 2023
1 parent 3a881e4 commit d547306
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 73 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_xitongsys_parquet_go//reader",
"@com_github_xitongsys_parquet_go//source",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
"@org_golang_x_text//encoding",
"@org_golang_x_text//encoding/charmap",
"@org_golang_x_text//encoding/simplifiedchinese",
Expand Down
121 changes: 48 additions & 73 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -217,84 +218,53 @@ func MakeTableRegions(

start := time.Now()

execCtx, cancel := context.WithCancel(ctx)
defer cancel()

concurrency := mathutil.Max(cfg.Concurrency, 2)
fileChan := make(chan FileInfo, concurrency)
resultChan := make(chan fileRegionRes, concurrency)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for info := range fileChan {
var (
regions []*TableRegion
sizes []float64
err error
)
dataFileSize := info.FileMeta.FileSize
if info.FileMeta.Type == SourceTypeParquet {
regions, sizes, err = makeParquetFileRegion(ctx, cfg, info)
} else if info.FileMeta.Type == SourceTypeCSV && cfg.StrictFormat &&
info.FileMeta.Compression == CompressionNone &&
dataFileSize > cfg.MaxChunkSize+cfg.MaxChunkSize/largeCSVLowerThresholdRation {
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
regions, sizes, err = SplitLargeCSV(ctx, cfg, info)
} else {
regions, sizes, err = MakeSourceFileRegion(execCtx, cfg, info)
}
select {
case resultChan <- fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}:
case <-ctx.Done():
return
}
if err != nil {
log.FromContext(ctx).Error("make source file region error", zap.Error(err), zap.String("file_path", info.FileMeta.Path))
break
}
}
}()
}
var fileRegionsMap sync.Map

go func() {
wg.Wait()
close(resultChan)
}()

errChan := make(chan error, 1)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
meta := cfg.TableMeta
fileRegionsMap := make(map[string]fileRegionRes, len(meta.DataFiles))
go func() {
for res := range resultChan {
if res.err != nil {
errChan <- res.err
return
for _, info := range meta.DataFiles {
info := info
eg.Go(func() error {
select {
case <-egCtx.Done():
return nil
default:
}
fileRegionsMap[res.info.FileMeta.Path] = res
}
errChan <- nil
}()

for _, dataFile := range meta.DataFiles {
select {
case fileChan <- dataFile:
case <-ctx.Done():
close(fileChan)
return nil, ctx.Err()
case err := <-errChan:
return nil, err
}
var (
regions []*TableRegion
sizes []float64
err error
)
dataFileSize := info.FileMeta.FileSize
if info.FileMeta.Type == SourceTypeParquet {
regions, sizes, err = makeParquetFileRegion(egCtx, cfg, info)
} else if info.FileMeta.Type == SourceTypeCSV && cfg.StrictFormat &&
info.FileMeta.Compression == CompressionNone &&
dataFileSize > cfg.MaxChunkSize+cfg.MaxChunkSize/largeCSVLowerThresholdRation {
// If a csv file is overlarge, we need to split it into multiple regions.
// Note: We can only split a csv file whose format is strict.
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
regions, sizes, err = SplitLargeCSV(egCtx, cfg, info)
} else {
regions, sizes, err = MakeSourceFileRegion(egCtx, cfg, info)
}
if err != nil {
log.FromContext(egCtx).Error("make source file region error", zap.Error(err), zap.String("file_path", info.FileMeta.Path))
return err
}
result := fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}
fileRegionsMap.Store(info.FileMeta.Path, result)
return nil
})
}
close(fileChan)
err := <-errChan
if err != nil {

if err := eg.Wait(); err != nil {
return nil, err
}

Expand All @@ -303,7 +273,12 @@ func MakeTableRegions(
// rebase row-id for all chunk
rowIDBase := int64(0)
for _, dataFile := range meta.DataFiles {
fileRegionsRes := fileRegionsMap[dataFile.FileMeta.Path]
v, ok := fileRegionsMap.Load(dataFile.FileMeta.Path)
if !ok {
return nil, errors.Errorf("file %s not found in MakeTableRegions", dataFile.FileMeta.Path)
}
//nolint: forcetypeassert
fileRegionsRes := v.(fileRegionRes)
for _, region := range fileRegionsRes.regions {
region.Chunk.PrevRowIDMax += rowIDBase
region.Chunk.RowIDMax += rowIDBase
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ func TestMakeTableRegionsSplitLargeFile(t *testing.T) {
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset)
assert.Len(t, regions[0].Chunk.Columns, 0)

// test canceled context will not panic
ctx, cancel := context.WithCancel(context.Background())
cancel()
for i := 0; i < 20; i++ {
_, _ = MakeTableRegions(ctx, divideConfig)
}
}

func TestCompressedMakeSourceFileRegion(t *testing.T) {
Expand Down

0 comments on commit d547306

Please sign in to comment.