Skip to content

Commit

Permalink
*: allow TiDB to use TiKV's RC for high performance read (#30943)
Browse files Browse the repository at this point in the history
close #30942
  • Loading branch information
you06 authored Dec 30, 2021
1 parent 9d50d0a commit 0ab0dad
Show file tree
Hide file tree
Showing 19 changed files with 154 additions and 16 deletions.
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *Req
return builder
}

// SetIsolationLevel sets "IsolationLevel" for "kv.Request".
func (builder *RequestBuilder) SetIsolationLevel(level kv.IsoLevel) *RequestBuilder {
builder.Request.IsolationLevel = level
return builder
}

const estimatedRegionRowCount = 100000

// SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data.
Expand Down
3 changes: 2 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
for i, key := range keys {
val := values[string(key)]
if len(val) == 0 {
if e.idxInfo != nil && (!e.tblInfo.IsCommonHandle || !e.idxInfo.Primary) {
if e.idxInfo != nil && (!e.tblInfo.IsCommonHandle || !e.idxInfo.Primary) &&
!e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
return kv.ErrNotExist.GenWithStack("inconsistent extra index %s, handle %d not found in table",
e.idxInfo.Name.O, e.handles[i])
}
Expand Down
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3936,6 +3936,9 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
if err != nil {
return nil, err
}
if builder.ctx.GetSessionVars().StmtCtx.WeakConsistency {
reqBuilderWithRange.SetIsolationLevel(kv.RC)
}
kvReq, err := reqBuilderWithRange.
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
Expand Down
9 changes: 8 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down Expand Up @@ -556,6 +559,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
PushedLimit: e.PushedLimit,
}
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
Expand Down Expand Up @@ -1254,7 +1260,8 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
sort.Sort(task)
}

if handleCnt != len(task.rows) && !util.HasCancelled(ctx) {
if handleCnt != len(task.rows) && !util.HasCancelled(ctx) &&
!w.idxLookup.ctx.GetSessionVars().StmtCtx.WeakConsistency {
if len(w.idxLookup.tblPlans) == 1 {
obtainedHandlesMap := kv.NewHandleMap()
for _, row := range task.rows {
Expand Down
12 changes: 12 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,11 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
if topsqlstate.TopSQLEnabled() && prepareStmt.SQLDigest != nil {
topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil, vars.InRestrictedSQL)
}
if s, ok := prepareStmt.PreparedAst.Stmt.(*ast.SelectStmt); ok {
if s.LockInfo == nil {
sc.WeakConsistency = isWeakConsistencyRead(ctx, execStmt)
}
}
}
// execute missed stmtID uses empty sql
sc.OriginalSQL = s.Text()
Expand Down Expand Up @@ -1808,6 +1813,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.Priority = opts.Priority
sc.NotFillCache = !opts.SQLCache
}
sc.WeakConsistency = isWeakConsistencyRead(ctx, stmt)
case *ast.SetOprStmt:
sc.InSelectStmt = true
sc.OverflowAsWarning = true
Expand Down Expand Up @@ -1924,3 +1930,9 @@ func setRPCInterceptorOfExecCounterForTxn(vars *variable.SessionVars, snapshot k
snapshot.SetOption(kv.RPCInterceptor, vars.StmtCtx.KvExecCounter.RPCInterceptor())
}
}

func isWeakConsistencyRead(ctx sessionctx.Context, node ast.Node) bool {
sessionVars := ctx.GetSessionVars()
return sessionVars.ConnectionID > 0 && sessionVars.ReadConsistency.IsWeak() &&
plannercore.IsAutoCommitTxn(ctx) && plannercore.IsReadOnly(node, sessionVars)
}
3 changes: 3 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
}

var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetDAGRequest(e.dagPBs[workID]).
SetStartTS(e.startTS).
SetDesc(e.descs[workID]).
Expand Down
3 changes: 2 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
if len(val) == 0 {
if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) {
if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) &&
!e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
return kv.ErrNotExist.GenWithStack("inconsistent extra index %s, handle %d not found in table",
e.idxInfo.Name.O, e.handle)
}
Expand Down
6 changes: 6 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
return nil, err
}
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
reqBuilder := builder.SetKeyRanges(kvRange)
kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
Expand Down Expand Up @@ -354,6 +357,9 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
reqBuilder.SetIsolationLevel(kv.RC)
}
reqBuilder.
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211223062159-300275dee63e
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211223062159-300275dee63e h1:UildvukO7gTs4/bW+h6jNnpv6syWmh2VMQxD5sMm9II=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211223062159-300275dee63e/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7 h1:megVHSEVsedArz10MwJTlrTtBNhORupPbuVAUhHbSko=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI=
Expand Down
13 changes: 6 additions & 7 deletions metrics/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/metrics"
)

// Metrics for the GC worker.
Expand Down Expand Up @@ -69,11 +70,9 @@ var (
Help: "Counter of gc scan lock request more than once in the same region.",
})

GCUnsafeDestroyRangeFailuresCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "gc_unsafe_destroy_range_failures",
Help: "Counter of unsafe destroyrange failures",
}, []string{"type"})
GCUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
)

func init() {
GCUnsafeDestroyRangeFailuresCounterVec = metrics.TiKVUnsafeDestroyRangeFailuresCounterVec
}
1 change: 0 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func RegisterMetrics() {
prometheus.MustRegister(GCJobFailureCounter)
prometheus.MustRegister(GCRegionTooManyLocksCounter)
prometheus.MustRegister(GCWorkerCounter)
prometheus.MustRegister(GCUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TotalQueryProcHistogram)
prometheus.MustRegister(TotalCopProcHistogram)
prometheus.MustRegister(TotalCopWaitHistogram)
Expand Down
5 changes: 4 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2206,6 +2206,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.txn.SetOption(kv.ReplicaRead, readReplicaType)
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
if s.GetSessionVars().StmtCtx.WeakConsistency {
s.txn.SetOption(kv.IsolationLevel, kv.RC)
}
}
return &s.txn, nil
}
Expand Down Expand Up @@ -2895,7 +2898,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error {
return nil
}

// PrepareTxnCtx starts a goroutine to begin a transaction if needed, and creates a new transaction context.
// PrepareTxnCtx begins a transaction, and creates a new transaction context.
// It is called before we execute a sql query.
func (s *session) PrepareTxnCtx(ctx context.Context) error {
s.currentCtx = ctx
Expand Down
6 changes: 5 additions & 1 deletion sessionctx/stmtctx/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ import (

func TestMain(m *testing.M) {
testbridge.SetupForCommonTest()
goleak.VerifyTestMain(m)
opts := []goleak.Option{
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
goleak.VerifyTestMain(m, opts...)
}
3 changes: 3 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type StatementContext struct {
// Its life cycle is limited to this execution, and a new KvExecCounter is
// always created during each statement execution.
KvExecCounter *stmtstats.KvExecCounter

// WeakConsistency is true when read consistency is weak and in a read statement and not in a transaction.
WeakConsistency bool
}

// StmtHints are SessionVars related sql hints.
Expand Down
52 changes: 52 additions & 0 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/execdetails"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -90,3 +91,54 @@ func TestStatementContextPushDownFLags(t *testing.T) {
require.Equal(t, tt.out, got)
}
}

func TestWeakConsistencyRead(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

lastWeakConsistency := func(tk *testkit.TestKit) bool {
return tk.Session().GetSessionVars().StmtCtx.WeakConsistency
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, c int, c1 int, unique index i(c))")
// strict
tk.MustExec("insert into t values(1, 1, 1)")
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
require.False(t, lastWeakConsistency(tk))
tk.MustExec("prepare s from 'select * from t'")
tk.MustExec("prepare u from 'update t set c1 = id + 1'")
tk.MustQuery("execute s").Check(testkit.Rows("1 1 1"))
require.False(t, lastWeakConsistency(tk))
tk.MustExec("execute u")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("admin check table t")
require.False(t, lastWeakConsistency(tk))
// weak
tk.MustExec("set tidb_read_consistency = weak")
tk.MustExec("insert into t values(2, 2, 2)")
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2", "2 2 2"))
require.True(t, lastWeakConsistency(tk))
tk.MustQuery("execute s").Check(testkit.Rows("1 1 2", "2 2 2"))
require.True(t, lastWeakConsistency(tk))
tk.MustExec("execute u")
require.False(t, lastWeakConsistency(tk))
// non-read-only queries should be strict
tk.MustExec("admin check table t")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("update t set c = c + 1 where id = 2")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("delete from t where id = 2")
require.False(t, lastWeakConsistency(tk))
// in-transaction queries should be strict
tk.MustExec("begin")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2"))
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("execute s").Check(testkit.Rows("1 1 2"))
require.False(t, lastWeakConsistency(tk))
tk.MustExec("rollback")
}
27 changes: 27 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,30 @@ const (
oneShotUse
)

// ReadConsistencyLevel is the level of read consistency.
type ReadConsistencyLevel string

const (
// ReadConsistencyStrict means read by strict consistency, default value.
ReadConsistencyStrict ReadConsistencyLevel = "strict"
// ReadConsistencyWeak means read can be weak consistency.
ReadConsistencyWeak ReadConsistencyLevel = "weak"
)

// IsWeak returns true only if it's a weak-consistency read.
func (r ReadConsistencyLevel) IsWeak() bool {
return r == ReadConsistencyWeak
}

func validateReadConsistencyLevel(val string) error {
switch v := ReadConsistencyLevel(strings.ToLower(val)); v {
case ReadConsistencyStrict, ReadConsistencyWeak:
return nil
default:
return ErrWrongTypeForVar.GenWithStackByArgs(TiDBReadConsistency)
}
}

// SessionVars is to handle user-defined or global variables in the current session.
type SessionVars struct {
Concurrency
Expand Down Expand Up @@ -980,6 +1004,9 @@ type SessionVars struct {
// all the local data in each session, and finally report them to the remote
// regularly.
StmtStats *stmtstats.StatementStats

// ReadConsistency indicates the read consistency requirement.
ReadConsistency ReadConsistencyLevel
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,15 @@ var defaultSysVars = []*SysVar{
EnableColumnTracking.Store(v)
return nil
}},
{Scope: ScopeSession, Name: TiDBReadConsistency, Value: string(ReadConsistencyStrict), Type: TypeStr, Hidden: true,
Validation: func(_ *SessionVars, normalized string, _ string, _ ScopeFlag) (string, error) {
return normalized, validateReadConsistencyLevel(normalized)
},
SetSession: func(s *SessionVars, val string) error {
s.ReadConsistency = ReadConsistencyLevel(val)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ const (

// TiDBEnablePaging indicates whether paging is enabled in coprocessor requests.
TiDBEnablePaging = "tidb_enable_paging"

// TiDBReadConsistency indicates whether the autocommit read statement goes through TiKV RC.
TiDBReadConsistency = "tidb_read_consistency"
)

// TiDB system variable names that both in session and global scope.
Expand Down

0 comments on commit 0ab0dad

Please sign in to comment.