Skip to content

Commit

Permalink
cherry pick pingcap#18514 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
hanfei1991 authored and ti-srebot committed Jul 15, 2020
1 parent 66236a5 commit 1990a97
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 0 deletions.
36 changes: 36 additions & 0 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,42 @@ type mockCopStreamClient struct {
finished bool
}

type mockBathCopErrClient struct {
mockClientStream

*errorpb.Error
}

func (mock *mockBathCopErrClient) Recv() (*coprocessor.BatchResponse, error) {
return &coprocessor.BatchResponse{
OtherError: mock.Error.Message,
}, nil
}

type mockBatchCopDataClient struct {
mockClientStream

chunks []tipb.Chunk
idx int
}

func (mock *mockBatchCopDataClient) Recv() (*coprocessor.BatchResponse, error) {
if mock.idx < len(mock.chunks) {
res := tipb.SelectResponse{
Chunks: []tipb.Chunk{mock.chunks[mock.idx]},
}
raw, err := res.Marshal()
if err != nil {
return nil, errors.Trace(err)
}
mock.idx++
return &coprocessor.BatchResponse{
Data: raw,
}, nil
}
return nil, io.EOF
}

type mockCopStreamErrClient struct {
mockClientStream

Expand Down
77 changes: 77 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tipb/go-tipb"
)

// For gofail injection.
Expand Down Expand Up @@ -663,6 +664,44 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
return resp
}

func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) {
var chunk tipb.Chunk
for {
row, err := e.Next(ctx)
if err != nil {
return chunk, errors.Trace(err)
}
if row == nil {
return chunk, nil
}
for _, offset := range req.OutputOffsets {
chunk.RowsData = append(chunk.RowsData, row[offset]...)
}
}
}

func (h *rpcHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) {
client := &mockBatchCopDataClient{}
for _, ri := range req.Regions {
cop := coprocessor.Request{
Tp: kv.ReqTypeDAG,
Data: req.Data,
StartTs: req.StartTs,
Ranges: ri.Ranges,
}
_, exec, dagReq, err := h.buildDAGExecutor(&cop)
if err != nil {
return nil, errors.Trace(err)
}
chunk, err := drainRowsFromExecutor(ctx, exec, dagReq)
if err != nil {
return nil, errors.Trace(err)
}
client.chunks = append(client.chunks, chunk)
}
return client, nil
}

// Client is a client that sends RPC.
// This is same with tikv.Client, define again for avoid circle import.
type Client interface {
Expand Down Expand Up @@ -998,6 +1037,44 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
panic(fmt.Sprintf("unknown coprocessor request type: %v", r.GetTp()))
}
resp.Resp = res
case tikvrpc.CmdBatchCop:
failpoint.Inject("BatchCopCancelled", func(value failpoint.Value) {
if value.(bool) {
failpoint.Return(nil, context.Canceled)
}
})

failpoint.Inject("BatchCopRpcErr"+addr, func(value failpoint.Value) {
if value.(string) == addr {
failpoint.Return(nil, errors.New("rpc error"))
}
})
r := req.BatchCop()
if err := handler.checkRequestContext(reqCtx); err != nil {
resp.Resp = &tikvrpc.BatchCopStreamResponse{
Tikv_BatchCoprocessorClient: &mockBathCopErrClient{Error: err},
BatchResponse: &coprocessor.BatchResponse{
OtherError: err.Message,
},
}
}
ctx1, cancel := context.WithCancel(ctx)
batchCopStream, err := handler.handleBatchCopRequest(ctx1, r)
if err != nil {
cancel()
return nil, errors.Trace(err)
}
batchResp := &tikvrpc.BatchCopStreamResponse{Tikv_BatchCoprocessorClient: batchCopStream}
batchResp.Lease.Cancel = cancel
batchResp.Timeout = timeout
c.streamTimeout <- &batchResp.Lease

first, err := batchResp.Recv()
if err != nil {
return nil, errors.Trace(err)
}
batchResp.BatchResponse = first
resp.Resp = batchResp
case tikvrpc.CmdCopStream:
r := req.Cop()
if err := handler.checkRequestContext(reqCtx); err != nil {
Expand Down
144 changes: 144 additions & 0 deletions store/tikv/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
"context"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/cluster"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/testkit"
)

type testBatchCopSuite struct {
}

var _ = Suite(&testBatchCopSuite{})

func newStoreWithBootstrap(tiflashNum int) (kv.Storage, *domain.Domain, error) {
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
mockCluster := c.(*mocktikv.Cluster)
_, _, region1 := mockstore.BootstrapWithSingleStore(c)
tiflashIdx := 0
for tiflashIdx < tiflashNum {
store2 := c.AllocID()
peer2 := c.AllocID()
addr2 := fmt.Sprintf("tiflash%d", tiflashIdx)
mockCluster.AddStore(store2, addr2)
mockCluster.UpdateStoreAddr(store2, addr2, &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
mockCluster.AddPeer(region1, store2, peer2)
tiflashIdx++
}
}),
mockstore.WithStoreType(mockstore.MockTiKV),
)

if err != nil {
return nil, nil, errors.Trace(err)
}

session.SetSchemaLease(0)
session.DisableStats4Test()

dom, err := session.BootstrapSession(store)
if err != nil {
return nil, nil, err
}

dom.SetStatsUpdating(true)
return store, dom, errors.Trace(err)
}

func testGetTableByName(c *C, ctx sessionctx.Context, db, table string) table.Table {
dom := domain.GetDomain(ctx)
// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table))
c.Assert(err, IsNil)
return tbl
}

func (s *testBatchCopSuite) TestStoreErr(c *C) {
store, dom, err := newStoreWithBootstrap(1)
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()

tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int not null, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/mocktikv/BatchCopCancelled", "1*return(true)"), IsNil)

err = tk.QueryToErr("select count(*) from t")
c.Assert(errors.Cause(err), Equals, context.Canceled)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/mocktikv/BatchCopRpcErrtiflash0", "1*return(\"tiflash0\")"), IsNil)

tk.MustQuery("select count(*) from t").Check(testkit.Rows("1"))

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/mocktikv/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)
err = tk.QueryToErr("select count(*) from t")
c.Assert(err, NotNil)
}

func (s *testBatchCopSuite) TestStoreSwitchPeer(c *C) {
store, dom, err := newStoreWithBootstrap(2)
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()

tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int not null, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/mocktikv/BatchCopRpcErrtiflash0", "return(\"tiflash0\")"), IsNil)

tk.MustQuery("select count(*) from t").Check(testkit.Rows("1"))

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/mocktikv/BatchCopRpcErrtiflash1", "return(\"tiflash1\")"), IsNil)
err = tk.QueryToErr("select count(*) from t")
c.Assert(err, NotNil)

}

0 comments on commit 1990a97

Please sign in to comment.