Skip to content

Commit

Permalink
executor: clear the disk files after sort spilling when exit unexpect…
Browse files Browse the repository at this point in the history
…ed (#56070)
  • Loading branch information
wshwsh12 committed Sep 13, 2024
1 parent 87569e8 commit 831d9c1
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 8 deletions.
75 changes: 75 additions & 0 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package executor

import (
"context"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand All @@ -31,6 +35,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -408,3 +413,73 @@ func TestSortSpillDisk(t *testing.T) {
err = exec.Close()
require.NoError(t, err)
}

func TestSortSpillDiskReturnErrAndClearFile(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/errInSortExecFetchRowChunks", "1*return(0)->1*return(1)->return(2)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/errInSortExecFetchRowChunks"))
}()
disk.InitializeTempDir() // Clear all files
ctx := mock.NewContext()
ctx.GetSessionVars().MemQuota.MemQuotaQuery = 1
ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, 1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)
cas := &sortCase{rows: 20480, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
rows: cas.rows,
ctx: cas.ctx,
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, 0, dataSource),
ByItems: make([]*plannerutil.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
for _, idx := range cas.orderByIdx {
exec.ByItems = append(exec.ByItems, &plannerutil.ByItems{Expr: cas.columns()[idx]})
}
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource.prepareChunks()
err := exec.Open(tmpCtx)
require.NoError(t, err)

for {
err = exec.Next(tmpCtx, chk)
if chk.NumRows() == 0 || err != nil {
break
}
}
require.ErrorContains(t, err, "mockError")

err = exec.Close()
require.NoError(t, err)

// Check the temp files are deleted.
path := config.GetGlobalConfig().TempStoragePath
// Path is existed.
_, err = os.Stat(path)
require.False(t, os.IsNotExist(err))

err = filepath.Walk(path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if !info.IsDir() {
require.False(t, strings.Contains(filePath, "chunk")) // Check the temp files chunk.ListInDisk are deleted.
}
return nil
})

require.NoError(t, err)
}
24 changes: 16 additions & 8 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (e *SortExec) externalSorting(req *chunk.Chunk) (err error) {
return nil
}

func (e *SortExec) fetchRowChunks(ctx context.Context) error {
func (e *SortExec) fetchRowChunks(ctx context.Context) (err error) {
fields := retTypes(e)
byItemsDesc := make([]bool, len(e.ByItems))
for i, byItem := range e.ByItems {
Expand All @@ -193,9 +193,24 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
e.rowChunks.GetDiskTracker().AttachTo(e.diskTracker)
e.rowChunks.GetDiskTracker().SetLabel(memory.LabelForRowChunks)
}
defer func() {
if e.rowChunks.NumRow() > 0 {
if err == nil {
err = e.rowChunks.Sort()
}
e.partitionList = append(e.partitionList, e.rowChunks)
}
}()
for {
chk := tryNewCacheChunk(e.children[0])
err := Next(ctx, e.children[0], chk)
failpoint.Inject("errInSortExecFetchRowChunks", func(val failpoint.Value) {
switch val.(int) {
case 1:
err = errors.New("mockError")
default:
}
})
if err != nil {
return err
}
Expand Down Expand Up @@ -233,13 +248,6 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
}
}
})
if e.rowChunks.NumRow() > 0 {
err := e.rowChunks.Sort()
if err != nil {
return err
}
e.partitionList = append(e.partitionList, e.rowChunks)
}
return nil
}

Expand Down

0 comments on commit 831d9c1

Please sign in to comment.