Skip to content

Commit

Permalink
Fix batch client batchSendLoop panic (#1021) (#1023)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 authored Oct 18, 2023
1 parent 884157d commit 65f7812
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 39 deletions.
12 changes: 8 additions & 4 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,18 @@ func (a *batchConn) fetchMorePendingRequests(

const idleTimeout = 3 * time.Minute

// BatchSendLoopPanicCounter is only used for testing.
var BatchSendLoopPanicCounter int64 = 0

func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
defer func() {
if r := recover(); r != nil {
metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchSendLoop).Inc()
logutil.BgLogger().Error("batchSendLoop",
zap.Reflect("r", r),
zap.Any("r", r),
zap.Stack("stack"))
logutil.BgLogger().Info("restart batchSendLoop")
atomic.AddInt64(&BatchSendLoopPanicCounter, 1)
logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicCounter)))
go a.batchSendLoop(cfg)
}
}()
Expand Down Expand Up @@ -430,7 +434,7 @@ func (s *batchCommandsStream) recv() (resp *tikvpb.BatchCommandsResponse, err er
if r := recover(); r != nil {
metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc()
logutil.BgLogger().Error("batchCommandsClient.recv panic",
zap.Reflect("r", r),
zap.Any("r", r),
zap.Stack("stack"))
err = errors.New("batch conn recv paniced")
}
Expand Down Expand Up @@ -598,7 +602,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
if r := recover(); r != nil {
metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc()
logutil.BgLogger().Error("batchRecvLoop",
zap.Reflect("r", r),
zap.Any("r", r),
zap.Stack("stack"))
logutil.BgLogger().Info("restart batchRecvLoop")
go c.batchRecvLoop(cfg, tikvTransportLayerLoad, streamClient)
Expand Down
10 changes: 5 additions & 5 deletions internal/client/client_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ package client

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand All @@ -47,18 +46,19 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestPanicInRecvLoop(t *testing.T) {
require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`))
require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`))

server, port := startMockTikvService()
server, port := mock_server.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()

addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
addr := server.Addr()
rpcClient := NewRPCClient()
rpcClient.option.dialTimeout = time.Second / 3

Expand All @@ -82,10 +82,10 @@ func TestPanicInRecvLoop(t *testing.T) {
}

func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
server, port := startMockTikvService()
server, port := mock_server.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
addr := server.Addr()

// Enable batch and limit the connection count to 1 so that
// there is only one BatchCommands stream for each host or forwarded host.
Expand Down
23 changes: 12 additions & 11 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
Expand Down Expand Up @@ -116,11 +117,11 @@ func TestCancelTimeoutRetErr(t *testing.T) {
}

func TestSendWhenReconnect(t *testing.T) {
server, port := startMockTikvService()
server, port := mock_server.StartMockTikvService()
require.True(t, port > 0)

rpcClient := NewRPCClient()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
addr := server.Addr()
conn, err := rpcClient.getConnArray(addr, true)
assert.Nil(t, err)

Expand Down Expand Up @@ -240,7 +241,7 @@ func TestCollapseResolveLock(t *testing.T) {
}

func TestForwardMetadataByUnaryCall(t *testing.T) {
server, port := startMockTikvService()
server, port := mock_server.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
Expand All @@ -255,7 +256,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {

var checkCnt uint64
// Check no corresponding metadata if ForwardedHost is empty.
server.setMetaChecker(func(ctx context.Context) error {
server.SetMetaChecker(func(ctx context.Context) error {
atomic.AddUint64(&checkCnt, 1)
// gRPC may set some metadata by default, e.g. "context-type".
md, ok := metadata.FromIncomingContext(ctx)
Expand Down Expand Up @@ -283,7 +284,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
checkCnt = 0
forwardedHost := "127.0.0.1:6666"
// Check the metadata exists.
server.setMetaChecker(func(ctx context.Context) error {
server.SetMetaChecker(func(ctx context.Context) error {
atomic.AddUint64(&checkCnt, 1)
// gRPC may set some metadata by default, e.g. "context-type".
md, ok := metadata.FromIncomingContext(ctx)
Expand All @@ -308,10 +309,10 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
}

func TestForwardMetadataByBatchCommands(t *testing.T) {
server, port := startMockTikvService()
server, port := mock_server.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
addr := server.Addr()

// Enable batch and limit the connection count to 1 so that
// there is only one BatchCommands stream for each host or forwarded host.
Expand All @@ -324,7 +325,7 @@ func TestForwardMetadataByBatchCommands(t *testing.T) {

var checkCnt uint64
setCheckHandler := func(forwardedHost string) {
server.setMetaChecker(func(ctx context.Context) error {
server.SetMetaChecker(func(ctx context.Context) error {
atomic.AddUint64(&checkCnt, 1)
md, ok := metadata.FromIncomingContext(ctx)
if forwardedHost == "" {
Expand Down Expand Up @@ -641,10 +642,10 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 128
})()

server, port := startMockTikvService()
server, port := mock_server.StartMockTikvService()
require.True(t, port > 0)
require.True(t, server.IsRunning())
addr := server.addr
addr := server.Addr()
client := NewRPCClient()
defer func() {
err := client.Close()
Expand Down Expand Up @@ -681,7 +682,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
logutil.BgLogger().Info("restart mock tikv server")
server.Start(addr)
require.True(t, server.IsRunning())
require.Equal(t, addr, server.addr)
require.Equal(t, addr, server.Addr())

// Wait batch client to auto reconnect.
start := time.Now()
Expand Down
1 change: 1 addition & 0 deletions internal/client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.newBackoffFn.func1"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
}
goleak.VerifyTestMain(m, opts...)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go
//

package client
package mock_server

Check failure on line 21 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

don't use an underscore in package name (golint)

import (
"context"
Expand All @@ -36,7 +36,7 @@ import (
"google.golang.org/grpc"
)

type server struct {
type MockServer struct {

Check failure on line 39 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported type `MockServer` should have comment or be unexported (golint)
tikvpb.TikvServer
grpcServer *grpc.Server
addr string
Expand All @@ -49,21 +49,28 @@ type server struct {
}
}

func (s *server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {

Check failure on line 52 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.KvGet` should have comment or be unexported (golint)
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.GetResponse{}, nil
}

func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {

Check failure on line 59 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.KvPrewrite` should have comment or be unexported (golint)
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.PrewriteResponse{}, nil
}

func (s *server) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {
func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {

Check failure on line 66 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.CoprocessorStream` should have comment or be unexported (golint)
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
return ss.Send(&coprocessor.Response{})
}

func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {

Check failure on line 73 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.BatchCommands` should have comment or be unexported (golint)
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
Expand Down Expand Up @@ -94,13 +101,13 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
}
}

func (s *server) setMetaChecker(check func(context.Context) error) {
func (s *MockServer) SetMetaChecker(check func(context.Context) error) {

Check failure on line 104 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.SetMetaChecker` should have comment or be unexported (golint)
s.metaChecker.Lock()
s.metaChecker.check = check
s.metaChecker.Unlock()
}

func (s *server) checkMetadata(ctx context.Context) error {
func (s *MockServer) checkMetadata(ctx context.Context) error {
s.metaChecker.Lock()
defer s.metaChecker.Unlock()
if s.metaChecker.check != nil {
Expand All @@ -109,16 +116,20 @@ func (s *server) checkMetadata(ctx context.Context) error {
return nil
}

func (s *server) IsRunning() bool {
func (s *MockServer) IsRunning() bool {

Check failure on line 119 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.IsRunning` should have comment or be unexported (golint)
return atomic.LoadInt64(&s.running) == 1
}

func (s *server) Stop() {
func (s *MockServer) Addr() string {

Check failure on line 123 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.Addr` should have comment or be unexported (golint)
return s.addr
}

func (s *MockServer) Stop() {

Check failure on line 127 in internal/client/mock_server/mock_tikv_service.go

View workflow job for this annotation

GitHub Actions / golangci

exported method `MockServer.Stop` should have comment or be unexported (golint)
s.grpcServer.Stop()
atomic.StoreInt64(&s.running, 0)
}

func (s *server) Start(addr string) int {
func (s *MockServer) Start(addr string) int {
if addr == "" {
addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0)
}
Expand Down Expand Up @@ -148,9 +159,9 @@ func (s *server) Start(addr string) int {
return port
}

// Try to start a gRPC server and retrun the server instance and binded port.
func startMockTikvService() (*server, int) {
server := &server{}
// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port.
func StartMockTikvService() (*MockServer, int) {
server := &MockServer{}
port := server.Start("")
return server, port
}
2 changes: 1 addition & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
r := recover()
if r != nil {
logutil.BgLogger().Error("panic in the checkAndResolve goroutine",
zap.Reflect("r", r),
zap.Any("r", r),
zap.Stack("stack trace"))
}
}()
Expand Down
53 changes: 53 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ package locate
import (
"context"
"fmt"
"math/rand"
"net"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
Expand All @@ -51,7 +53,9 @@ import (
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -659,3 +663,52 @@ func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
}

func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
// This test should use `go test -race` to run.
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 128
})()

server, port := mock_server.StartMockTikvService()
s.True(port > 0)
rpcClient := client.NewRPCClient()
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
}}
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}

defer func() {
rpcClient.Close()
server.Stop()
}()

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
ctx, cancel := context.WithCancel(context.Background())
bo := retry.NewBackofferWithVars(ctx, int(client.ReadTimeoutShort.Milliseconds()), nil)
region, err := s.cache.LocateRegionByID(bo, s.region)
s.Nil(err)
s.NotNil(region)
go func() {
// mock for kill query execution or timeout.
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1))
cancel()
}()
req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1})
regionRequestSender := NewRegionRequestSender(s.cache, fnClient)
regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
}
}()
}
wg.Wait()
// batchSendLoop should not panic.
s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0))
}
15 changes: 11 additions & 4 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,20 @@ type MPPStreamResponse struct {

// SetContext set the Context field for the given req to the specified ctx.
func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
ctx := &req.Context
if region != nil {
ctx.RegionId = region.Id
ctx.RegionEpoch = region.RegionEpoch
req.Context.RegionId = region.Id
req.Context.RegionEpoch = region.RegionEpoch
}
ctx.Peer = peer
req.Context.Peer = peer

// Shallow copy the context to avoid concurrent modification.
return AttachContext(req, req.Context)
}

// AttachContext sets the request context to the request,
// Parameter `rpcCtx` use `kvrpcpb.Context` instead of `*kvrpcpb.Context` to avoid concurrent modification by shallow copy.
func AttachContext(req *Request, rpcCtx kvrpcpb.Context) error {
ctx := &rpcCtx
switch req.Type {
case CmdGet:
req.Get().Context = ctx
Expand Down
Loading

0 comments on commit 65f7812

Please sign in to comment.