Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store, executor: support stale read for tikv RPCContext #22176

Merged
merged 44 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
5ed556e
support staleness context
Yisaer Jan 5, 2021
864c2f0
enable staleread
Yisaer Jan 5, 2021
39cc9f1
use EnableStaleRead
Yisaer Jan 5, 2021
0d1a750
use match labels
Yisaer Jan 5, 2021
6566224
fix fmt
Yisaer Jan 5, 2021
ac661d3
revise judge
Yisaer Jan 6, 2021
dfc0842
add test
Yisaer Jan 6, 2021
28040eb
fix test
Yisaer Jan 6, 2021
266765b
Update executor_test.go
Yisaer Jan 6, 2021
a4725ef
Update executor_test.go
Yisaer Jan 7, 2021
66706cc
add info
Yisaer Jan 7, 2021
3135a99
fix test
Yisaer Jan 7, 2021
6215804
add debug log
Yisaer Jan 8, 2021
7835b2c
Update client.go
Yisaer Jan 8, 2021
5149542
Revert "Update client.go"
Yisaer Jan 11, 2021
6ffb15a
Revert "add debug log"
Yisaer Jan 11, 2021
ff83775
Merge remote-tracking branch 'upstream/master' into stale_read
Yisaer Jan 13, 2021
38a9631
add assert case
Yisaer Jan 20, 2021
69c62b7
Merge branch 'master' into stale_read
Yisaer Jan 21, 2021
0e645e6
update kvproto
Yisaer Feb 25, 2021
e114580
Merge remote-tracking branch 'upstream/master' into stale_read
Yisaer Feb 25, 2021
ac7239a
update SendReqCtx
Yisaer Feb 25, 2021
1760bf2
remove error code
Yisaer Feb 25, 2021
9a9835e
update go.mod
Yisaer Mar 1, 2021
cea84d9
make fmt
Yisaer Mar 1, 2021
d763be2
Merge remote-tracking branch 'upstream/master' into stale_read
Yisaer Mar 1, 2021
0135c0b
update tidy
Yisaer Mar 1, 2021
6dbd542
fix mockTikvGrpcServer
Yisaer Mar 1, 2021
0f51647
fix test
Yisaer Mar 1, 2021
11ab014
Revert "fix test"
Yisaer Mar 1, 2021
f3e9fae
Merge branch 'master' into stale_read
Yisaer Mar 1, 2021
4dff7a1
address the comment
Yisaer Mar 1, 2021
6ae290d
Merge remote-tracking branch 'upstream/master' into stale_read
Yisaer Mar 4, 2021
b630959
address the comment
Yisaer Mar 4, 2021
798ecb6
address the comment
Yisaer Mar 4, 2021
a0ab8ac
Merge remote-tracking branch 'upstream/master' into stale_read
Yisaer Mar 4, 2021
f03e716
Merge remote-tracking branch 'upstream/master' into stale_read
Yisaer Mar 5, 2021
fdc8e68
make tidy
Yisaer Mar 5, 2021
5775100
Merge branch 'master' into stale_read
ti-chi-bot Mar 5, 2021
c985b5c
Merge branch 'master' into stale_read
ti-chi-bot Mar 5, 2021
001a7cb
Merge branch 'master' into stale_read
ti-chi-bot Mar 5, 2021
932dd12
fix race
Yisaer Mar 5, 2021
3c869e0
Merge branch 'master' into stale_read
ti-chi-bot Mar 5, 2021
cb14023
Merge branch 'master' into stale_read
ti-chi-bot Mar 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -235,6 +236,15 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion
}
builder.txnScope = sv.TxnCtx.TxnScope
builder.IsStaleness = sv.TxnCtx.IsStaleness
if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't global scope support stale read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global scope also support stale read. If the txnScope is local here, we should pick store which match the label.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is it possible to also replace build.txnScope with builder.MatchStoreLabels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is unnecessary, build.txnScope is also responsible for (builder *RequestBuilder) verifyTxnScope().

builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: builder.txnScope,
},
}
}
return builder
}

Expand Down
13 changes: 13 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"sync/atomic"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -113,6 +116,16 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
},
})
}
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
48 changes: 48 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7634,6 +7634,54 @@ func (s *testSerialSuite) TestStalenessTransaction(c *C) {
}
}

func (s *testSerialSuite) TestStaleReadKVRequest(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")
defer tk.MustExec(`drop table if exists t`)
testcases := []struct {
name string
sql string
txnScope string
zone string
}{
{
name: "coprocessor read",
sql: "select * from t",
txnScope: "local",
zone: "sh",
},
{
name: "point get read",
sql: "select * from t where id = 1",
txnScope: "local",
zone: "bj",
},
{
name: "batch point get read",
sql: "select * from t where id in (1,2,3)",
txnScope: "local",
zone: "hz",
},
}
for _, testcase := range testcases {
c.Log(testcase.name)
tk.MustExec(fmt.Sprintf("set @@txn_scope=%v", testcase.txnScope))
failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone))
failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v")`, testcase.txnScope))
failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag")
}
}

func (s *testSuite) TestStalenessAndHistoryRead(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer func() {
Expand Down
12 changes: 12 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -148,6 +150,16 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
},
})
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -70,6 +71,8 @@ const (
TxnScope
// StalenessReadOnly indicates whether the transaction is staleness read only transaction
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
)

// Priority value for transaction priority.
Expand Down Expand Up @@ -418,6 +421,10 @@ type Request struct {
TaskID uint64
// TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances.
TiDBServerID uint64
// IsStaleness indicates whether the request read staleness data
IsStaleness bool
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels []*metapb.StoreLabel
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
9 changes: 8 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,14 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas
if worker.Stats == nil {
worker.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
}
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, task.storeType, task.storeAddr)
if worker.req.IsStaleness {
req.EnableStaleRead()
}
var ops []tikv.StoreSelectorOption
if len(worker.req.MatchStoreLabels) > 0 {
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, task.storeType, task.storeAddr, ops...)
if err != nil {
if task.storeType == kv.TiDB {
err = worker.handleTiDBSendReqErr(err, task, ch)
Expand Down
12 changes: 10 additions & 2 deletions store/tikv/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -73,13 +74,20 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string) (*tikvrpc.Response, *RPCContext, string, error) {
func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string, opts ...StoreSelectorOption) (*tikvrpc.Response, *RPCContext, string, error) {
sender := NewRegionRequestSender(ch.regionCache, ch.client)
if len(directStoreAddr) > 0 {
sender.SetStoreAddr(directStoreAddr)
}
sender.Stats = ch.Stats
req.Context.ResolvedLocks = ch.resolvedLocks.GetAll()
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType)
failpoint.Inject("assertStaleReadFlag", func(val failpoint.Value) {
if val.(bool) {
if len(opts) > 0 && !req.StaleRead {
panic("req.StaleRead shouldn't be false when opts is not empty")
}
}
})
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType, opts...)
return resp, ctx, sender.GetStoreAddr(), err
}
16 changes: 16 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
Expand Down Expand Up @@ -380,6 +381,21 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
for _, op := range opts {
op(options)
}
failpoint.Inject("assertStoreLabels", func(val failpoint.Value) {
if len(opts) > 0 {
value := val.(string)
v := ""
for _, label := range options.labels {
if label.Key == placement.DCLabelKey {
v = label.Value
break
}
}
if v != value {
panic(fmt.Sprintf("StoreSelectorOption's label %v is not %v", v, value))
}
}
})
switch replicaRead {
case kv.ReplicaReadFollower:
store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed, options)
Expand Down
6 changes: 4 additions & 2 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ func (s *RegionRequestSender) getRPCContext(
req *tikvrpc.Request,
regionID RegionVerID,
sType kv.StoreType,
opts ...StoreSelectorOption,
) (*RPCContext, error) {
switch sType {
case kv.TiKV:
var seed uint32
if req.ReplicaReadSeed != nil {
seed = *req.ReplicaReadSeed
}
return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed)
return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...)
case kv.TiFlash:
return s.regionCache.GetTiFlashRPCContext(bo, regionID)
case kv.TiDB:
Expand All @@ -217,6 +218,7 @@ func (s *RegionRequestSender) SendReqCtx(
regionID RegionVerID,
timeout time.Duration,
sType kv.StoreType,
opts ...StoreSelectorOption,
) (
resp *tikvrpc.Response,
rpcCtx *RPCContext,
Expand Down Expand Up @@ -262,7 +264,7 @@ func (s *RegionRequestSender) SendReqCtx(
logutil.Logger(bo.ctx).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes))
}

rpcCtx, err = s.getRPCContext(bo, req, regionID, sType)
rpcCtx, err = s.getRPCContext(bo, req, regionID, sType, opts...)
if err != nil {
return nil, nil, err
}
Expand Down
32 changes: 27 additions & 5 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand Down Expand Up @@ -60,7 +61,10 @@ type tikvSnapshot struct {
keyOnly bool
vars *kv.Variables
replicaReadSeed uint32
resolvedLocks *util.TSSet
isStaleness bool
// MatchStoreLabels indicates the labels the store should be matched
matchStoreLabels []*metapb.StoreLabel
resolvedLocks *util.TSSet

// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
Expand Down Expand Up @@ -282,8 +286,14 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
TaskId: s.mu.taskID,
})
s.mu.RUnlock()

resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "")
var ops []StoreSelectorOption
if s.isStaleness {
req.EnableStaleRead()
}
if len(s.matchStoreLabels) > 0 {
ops = append(ops, WithMatchLabels(s.matchStoreLabels))
}
resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "", ops...)

if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -430,13 +440,19 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
TaskId: s.mu.taskID,
})
s.mu.RUnlock()

var ops []StoreSelectorOption
if s.isStaleness {
req.EnableStaleRead()
}
if len(s.matchStoreLabels) > 0 {
ops = append(ops, WithMatchLabels(s.matchStoreLabels))
}
for {
loc, err := s.store.regionCache.LocateKey(bo, k)
if err != nil {
return nil, errors.Trace(err)
}
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV, "")
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV, "", ops...)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -552,6 +568,12 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
s.mu.Unlock()
case kv.SampleStep:
s.sampleStep = val.(uint32)
case kv.IsStalenessReadOnly:
s.mu.Lock()
s.isStaleness = val.(bool)
s.mu.Unlock()
case kv.MatchStoreLabels:
s.matchStoreLabels = val.([]*metapb.StoreLabel)
case kv.TxnScope:
s.txnScope = val.(string)
}
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,13 @@ func (req *Request) TxnHeartBeat() *kvrpcpb.TxnHeartBeatRequest {
return req.Req.(*kvrpcpb.TxnHeartBeatRequest)
}

// EnableStaleRead enables stale read
func (req *Request) EnableStaleRead() {
req.StaleRead = true
req.ReplicaReadType = kv.ReplicaReadMixed
djshow832 marked this conversation as resolved.
Show resolved Hide resolved
req.ReplicaRead = false
}

// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request {
switch req.Type {
Expand Down