From 831d9c16a1b31d1f20a3b3438eed765354ef045d Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Fri, 13 Sep 2024 16:57:42 +0800 Subject: [PATCH] executor: clear the disk files after sort spilling when exit unexpected (#56070) --- executor/executor_pkg_test.go | 75 +++++++++++++++++++++++++++++++++++ executor/sort.go | 24 +++++++---- 2 files changed, 91 insertions(+), 8 deletions(-) diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 787dd4312f29e..0c724456fa06d 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -16,13 +16,17 @@ package executor import ( "context" + "os" + "path/filepath" "runtime" "strconv" + "strings" "testing" "time" "unsafe" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -31,6 +35,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/ranger" @@ -408,3 +413,73 @@ func TestSortSpillDisk(t *testing.T) { err = exec.Close() require.NoError(t, err) } + +func TestSortSpillDiskReturnErrAndClearFile(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill")) + }() + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/errInSortExecFetchRowChunks", "1*return(0)->1*return(1)->return(2)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/errInSortExecFetchRowChunks")) + }() + disk.InitializeTempDir() // Clear all files + ctx := mock.NewContext() + ctx.GetSessionVars().MemQuota.MemQuotaQuery = 1 + ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize + ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize + ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 1) + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) + ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) + cas := &sortCase{rows: 20480, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx} + opt := mockDataSourceParameters{ + schema: expression.NewSchema(cas.columns()...), + rows: cas.rows, + ctx: cas.ctx, + ndvs: cas.ndvs, + } + dataSource := buildMockDataSource(opt) + exec := &SortExec{ + baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, 0, dataSource), + ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)), + schema: dataSource.schema, + } + for _, idx := range cas.orderByIdx { + exec.ByItems = append(exec.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]}) + } + tmpCtx := context.Background() + chk := newFirstChunk(exec) + dataSource.prepareChunks() + err := exec.Open(tmpCtx) + require.NoError(t, err) + + for { + err = exec.Next(tmpCtx, chk) + if chk.NumRows() == 0 || err != nil { + break + } + } + require.ErrorContains(t, err, "mockError") + + err = exec.Close() + require.NoError(t, err) + + // Check the temp files are deleted. + path := config.GetGlobalConfig().TempStoragePath + // Path is existed. + _, err = os.Stat(path) + require.False(t, os.IsNotExist(err)) + + err = filepath.Walk(path, func(filePath string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + require.False(t, strings.Contains(filePath, "chunk")) // Check the temp files chunk.ListInDisk are deleted. + } + return nil + }) + + require.NoError(t, err) +} diff --git a/executor/sort.go b/executor/sort.go index cb2c97e68a8e4..cdd0f0f01a927 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -172,7 +172,7 @@ func (e *SortExec) externalSorting(req *chunk.Chunk) (err error) { return nil } -func (e *SortExec) fetchRowChunks(ctx context.Context) error { +func (e *SortExec) fetchRowChunks(ctx context.Context) (err error) { fields := retTypes(e) byItemsDesc := make([]bool, len(e.ByItems)) for i, byItem := range e.ByItems { @@ -193,9 +193,24 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { e.rowChunks.GetDiskTracker().AttachTo(e.diskTracker) e.rowChunks.GetDiskTracker().SetLabel(memory.LabelForRowChunks) } + defer func() { + if e.rowChunks.NumRow() > 0 { + if err == nil { + err = e.rowChunks.Sort() + } + e.partitionList = append(e.partitionList, e.rowChunks) + } + }() for { chk := tryNewCacheChunk(e.children[0]) err := Next(ctx, e.children[0], chk) + failpoint.Inject("errInSortExecFetchRowChunks", func(val failpoint.Value) { + switch val.(int) { + case 1: + err = errors.New("mockError") + default: + } + }) if err != nil { return err } @@ -233,13 +248,6 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { } } }) - if e.rowChunks.NumRow() > 0 { - err := e.rowChunks.Sort() - if err != nil { - return err - } - e.partitionList = append(e.partitionList, e.rowChunks) - } return nil }