Skip to content

Commit

Permalink
executor: fix data race in IndexNestedLoopHashJoin (#55824) (#55911)
Browse files Browse the repository at this point in the history
close #49692
  • Loading branch information
ti-chi-bot authored Sep 10, 2024
1 parent 441a913 commit 854bf3c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
35 changes: 31 additions & 4 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"runtime"
"runtime/trace"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -74,7 +75,11 @@ type IndexNestedLoopHashJoin struct {
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
panicErr struct {
sync.Mutex
atomic.Bool
error
}
ctxWithCancel context.Context
}

Expand Down Expand Up @@ -186,13 +191,22 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
if r != nil {
e.IndexLookUpJoin.finished.Store(true)
err := errors.New(fmt.Sprintf("%v", r))

if !e.panicErr.Load() {
e.panicErr.Lock()
if !e.panicErr.Load() {
e.panicErr.error = err
e.panicErr.Store(true)
}
e.panicErr.Unlock()
}

if !e.keepOuterOrder {
e.resultCh <- &indexHashJoinResult{err: err}
} else {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down Expand Up @@ -226,7 +240,10 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
for {
if e.isDryUpTasks(ctx) {
return e.panicErr
if e.panicErr.Load() {
return e.panicErr.error
}
return nil
}
if e.curTask.err != nil {
return e.curTask.err
Expand Down Expand Up @@ -282,7 +299,17 @@ func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resu
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
failpoint.Inject("TestIssue49692", func() {
for !e.panicErr.Load() {
runtime.Gosched()
}
})

err := error(nil)
if e.panicErr.Load() {
err = e.panicErr.error
}

if err == nil {
err = ctx.Err()
}
Expand Down
6 changes: 6 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2751,6 +2751,12 @@ func TestIssue30211(t *testing.T) {
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(a int, index(a));")
tk.MustExec("create table t2(a int, index(a));")
fpName2 := "github.com/pingcap/tidb/executor/TestIssue49692"
require.NoError(t, failpoint.Enable(fpName2, `return`))
defer func() {
require.NoError(t, failpoint.Disable(fpName2))
}()

func() {
fpName := "github.com/pingcap/tidb/executor/TestIssue30211"
require.NoError(t, failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`))
Expand Down

0 comments on commit 854bf3c

Please sign in to comment.