diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 869f255e55705..310de50149eeb 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -19,6 +19,7 @@ import ( "sort" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/infoschema" @@ -235,6 +236,15 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion } builder.txnScope = sv.TxnCtx.TxnScope + builder.IsStaleness = sv.TxnCtx.IsStaleness + if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope { + builder.MatchStoreLabels = []*metapb.StoreLabel{ + { + Key: placement.DCLabelKey, + Value: builder.txnScope, + }, + } + } return builder } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 5ea0c2712ec8e..ca4416658306b 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -20,12 +20,15 @@ import ( "sync/atomic" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -113,6 +116,16 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness + snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ + { + Key: placement.DCLabelKey, + Value: e.ctx.GetSessionVars().TxnCtx.TxnScope, + }, + }) + } var batchGetter kv.BatchGetter = snapshot if txn.Valid() { lock := e.tblInfo.Lock diff --git a/executor/executor_test.go b/executor/executor_test.go index 43d96b1a06730..b3c917323e5d8 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7634,6 +7634,54 @@ func (s *testSerialSuite) TestStalenessTransaction(c *C) { } } +func (s *testSerialSuite) TestStaleReadKVRequest(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer") + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key);") + defer tk.MustExec(`drop table if exists t`) + testcases := []struct { + name string + sql string + txnScope string + zone string + }{ + { + name: "coprocessor read", + sql: "select * from t", + txnScope: "local", + zone: "sh", + }, + { + name: "point get read", + sql: "select * from t where id = 1", + txnScope: "local", + zone: "bj", + }, + { + name: "batch point get read", + sql: "select * from t where id in (1,2,3)", + txnScope: "local", + zone: "hz", + }, + } + for _, testcase := range testcases { + c.Log(testcase.name) + tk.MustExec(fmt.Sprintf("set @@txn_scope=%v", testcase.txnScope)) + failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) + failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v")`, testcase.txnScope)) + failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) + tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`) + tk.MustQuery(testcase.sql) + tk.MustExec(`commit`) + failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag") + } +} + func (s *testSuite) TestStalenessAndHistoryRead(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil) defer func() { diff --git a/executor/point_get.go b/executor/point_get.go index 91369979ed32f..0ca261b439efe 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -19,9 +19,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -148,6 +150,16 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness + e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) + if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope { + e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ + { + Key: placement.DCLabelKey, + Value: e.ctx.GetSessionVars().TxnCtx.TxnScope, + }, + }) + } return nil } diff --git a/kv/kv.go b/kv/kv.go index 37f5f83951cdc..bcebff808ca1f 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" @@ -70,6 +71,8 @@ const ( TxnScope // StalenessReadOnly indicates whether the transaction is staleness read only transaction IsStalenessReadOnly + // MatchStoreLabels indicates the labels the store should be matched + MatchStoreLabels ) // Priority value for transaction priority. @@ -418,6 +421,10 @@ type Request struct { TaskID uint64 // TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances. TiDBServerID uint64 + // IsStaleness indicates whether the request read staleness data + IsStaleness bool + // MatchStoreLabels indicates the labels the store should be matched + MatchStoreLabels []*metapb.StoreLabel } // ResultSubset represents a result subset from a single storage unit. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 4b891da6f8433..90427eed8d217 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -703,7 +703,14 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas if worker.Stats == nil { worker.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats) } - resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, task.storeType, task.storeAddr) + if worker.req.IsStaleness { + req.EnableStaleRead() + } + var ops []tikv.StoreSelectorOption + if len(worker.req.MatchStoreLabels) > 0 { + ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels)) + } + resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, tikv.ReadTimeoutMedium, task.storeType, task.storeAddr, ops...) if err != nil { if task.storeType == kv.TiDB { err = worker.handleTiDBSendReqErr(err, task, ch) diff --git a/store/tikv/client_helper.go b/store/tikv/client_helper.go index 8f1ee3cb1b331..aea3e7137e878 100644 --- a/store/tikv/client_helper.go +++ b/store/tikv/client_helper.go @@ -16,6 +16,7 @@ package tikv import ( "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" @@ -73,13 +74,20 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks } // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. -func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string) (*tikvrpc.Response, *RPCContext, string, error) { +func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sType kv.StoreType, directStoreAddr string, opts ...StoreSelectorOption) (*tikvrpc.Response, *RPCContext, string, error) { sender := NewRegionRequestSender(ch.regionCache, ch.client) if len(directStoreAddr) > 0 { sender.SetStoreAddr(directStoreAddr) } sender.Stats = ch.Stats req.Context.ResolvedLocks = ch.resolvedLocks.GetAll() - resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) + failpoint.Inject("assertStaleReadFlag", func(val failpoint.Value) { + if val.(bool) { + if len(opts) > 0 && !req.StaleRead { + panic("req.StaleRead shouldn't be false when opts is not empty") + } + } + }) + resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType, opts...) return resp, ctx, sender.GetStoreAddr(), err } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 8ac86a9b0af2d..ba99e18d3d882 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/store/tikv/logutil" @@ -380,6 +381,21 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe for _, op := range opts { op(options) } + failpoint.Inject("assertStoreLabels", func(val failpoint.Value) { + if len(opts) > 0 { + value := val.(string) + v := "" + for _, label := range options.labels { + if label.Key == placement.DCLabelKey { + v = label.Value + break + } + } + if v != value { + panic(fmt.Sprintf("StoreSelectorOption's label %v is not %v", v, value)) + } + } + }) switch replicaRead { case kv.ReplicaReadFollower: store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed, options) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 74290ad3efbc9..a3ae52c38b82c 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -193,6 +193,7 @@ func (s *RegionRequestSender) getRPCContext( req *tikvrpc.Request, regionID RegionVerID, sType kv.StoreType, + opts ...StoreSelectorOption, ) (*RPCContext, error) { switch sType { case kv.TiKV: @@ -200,7 +201,7 @@ func (s *RegionRequestSender) getRPCContext( if req.ReplicaReadSeed != nil { seed = *req.ReplicaReadSeed } - return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed) + return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...) case kv.TiFlash: return s.regionCache.GetTiFlashRPCContext(bo, regionID) case kv.TiDB: @@ -217,6 +218,7 @@ func (s *RegionRequestSender) SendReqCtx( regionID RegionVerID, timeout time.Duration, sType kv.StoreType, + opts ...StoreSelectorOption, ) ( resp *tikvrpc.Response, rpcCtx *RPCContext, @@ -262,7 +264,7 @@ func (s *RegionRequestSender) SendReqCtx( logutil.Logger(bo.ctx).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes)) } - rpcCtx, err = s.getRPCContext(bo, req, regionID, sType) + rpcCtx, err = s.getRPCContext(bo, req, regionID, sType, opts...) if err != nil { return nil, nil, err } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 21c3000a459b9..2585835d8d88c 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" @@ -60,7 +61,10 @@ type tikvSnapshot struct { keyOnly bool vars *kv.Variables replicaReadSeed uint32 - resolvedLocks *util.TSSet + isStaleness bool + // MatchStoreLabels indicates the labels the store should be matched + matchStoreLabels []*metapb.StoreLabel + resolvedLocks *util.TSSet // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -282,8 +286,14 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll TaskId: s.mu.taskID, }) s.mu.RUnlock() - - resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "") + var ops []StoreSelectorOption + if s.isStaleness { + req.EnableStaleRead() + } + if len(s.matchStoreLabels) > 0 { + ops = append(ops, WithMatchLabels(s.matchStoreLabels)) + } + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, ReadTimeoutMedium, kv.TiKV, "", ops...) if err != nil { return errors.Trace(err) @@ -430,13 +440,19 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte TaskId: s.mu.taskID, }) s.mu.RUnlock() - + var ops []StoreSelectorOption + if s.isStaleness { + req.EnableStaleRead() + } + if len(s.matchStoreLabels) > 0 { + ops = append(ops, WithMatchLabels(s.matchStoreLabels)) + } for { loc, err := s.store.regionCache.LocateKey(bo, k) if err != nil { return nil, errors.Trace(err) } - resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV, "") + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, kv.TiKV, "", ops...) if err != nil { return nil, errors.Trace(err) } @@ -552,6 +568,12 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { s.mu.Unlock() case kv.SampleStep: s.sampleStep = val.(uint32) + case kv.IsStalenessReadOnly: + s.mu.Lock() + s.isStaleness = val.(bool) + s.mu.Unlock() + case kv.MatchStoreLabels: + s.matchStoreLabels = val.([]*metapb.StoreLabel) case kv.TxnScope: s.txnScope = val.(string) } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 3d19d7d64bb44..c695f8ecb3827 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -389,6 +389,13 @@ func (req *Request) TxnHeartBeat() *kvrpcpb.TxnHeartBeatRequest { return req.Req.(*kvrpcpb.TxnHeartBeatRequest) } +// EnableStaleRead enables stale read +func (req *Request) EnableStaleRead() { + req.StaleRead = true + req.ReplicaReadType = kv.ReplicaReadMixed + req.ReplicaRead = false +} + // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request { switch req.Type {