Skip to content

Commit

Permalink
copIterator: return context error to avoid return incorrect result on…
Browse files Browse the repository at this point in the history
… context cancel/timeout (#53489) (#55637)

close #50089
  • Loading branch information
ti-chi-bot committed Sep 14, 2024
1 parent 831d9c1 commit 5e9d45a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
36 changes: 36 additions & 0 deletions executor/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
package executor_test

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
)

func TestQueryTime(t *testing.T) {
Expand Down Expand Up @@ -52,3 +58,33 @@ func TestFormatSQL(t *testing.T) {
val = executor.FormatSQL("aaaaaaaaaaaaaaaaaaaa")
require.Equal(t, "aaaaa(len:20)", val.String())
}

func TestContextCancelWhenReadFromCopIterator(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/CtxCancelBeforeReceive", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/CtxCancelBeforeReceive"))
}()
ctx := context.WithValue(context.Background(), "TestContextCancel", "test")
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx = util.WithInternalSourceType(ctx, "scheduler")
rs, err := tk.Session().ExecuteInternal(ctx, "select * from test.t")
require.NoError(t, err)
_, err2 := session.ResultSetToStringSlice(ctx, tk.Session(), rs)
require.ErrorIs(t, err2, context.Canceled)
}()
<-copr.GlobalSyncChForTest
cancelFunc()
copr.GlobalSyncChForTest <- struct{}{}
wg.Wait()
}
15 changes: 12 additions & 3 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,16 @@ func (sender *copIteratorTaskSender) run(connID uint64) {
}
}

// GlobalSyncChForTest is a global channel for test.
var GlobalSyncChForTest = make(chan struct{})

func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copResponse) (resp *copResponse, ok bool, exit bool) {
failpoint.Inject("CtxCancelBeforeReceive", func(_ failpoint.Value) {
if ctx.Value("TestContextCancel") == "test" {
GlobalSyncChForTest <- struct{}{}
<-GlobalSyncChForTest
}
})
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -1036,7 +1045,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
if !ok || closed {
it.actionOnExceed.close()
return nil, nil
return nil, errors.Trace(ctx.Err())
}
if resp == finCopResp {
it.actionOnExceed.destroyTokenIfNeeded(func() {
Expand All @@ -1054,8 +1063,8 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
task := it.tasks[it.curr]
resp, ok, closed = it.recvFromRespCh(ctx, task.respChan)
if closed {
// Close() is already called, so Next() is invalid.
return nil, nil
// Close() is called or context cancelled/timeout, so Next() is invalid.
return nil, errors.Trace(ctx.Err())
}
if ok {
break
Expand Down

0 comments on commit 5e9d45a

Please sign in to comment.