Skip to content

Commit

Permalink
Merge pull request #3 from djshow832/session
Browse files Browse the repository at this point in the history
add session variable `tidb_read_staleness`
  • Loading branch information
nolouch authored Mar 12, 2021
2 parents ea4d960 + 4848951 commit 3547322
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 5 deletions.
19 changes: 14 additions & 5 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1829,14 +1829,23 @@ func (s *session) NewTxn(ctx context.Context) error {
zap.String("txnScope", txnScope))
}

txn, err := s.store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
txnOpt := kv.TransactionOption{}.SetTxnScope(s.sessionVars.CheckAndGetTxnScope())
isStaleness := false
if s.sessionVars.ReadStaleness > 0 {
isStaleness = true
txnOpt.SetPrevSec(uint64(s.sessionVars.ReadStaleness))
}
txn, err := s.store.BeginWithOption(txnOpt)
if err != nil {
return err
}
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().GetReplicaRead().IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if isStaleness {
txn.SetOption(kv.IsStalenessReadOnly, true)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
Expand All @@ -1845,7 +1854,7 @@ func (s *session) NewTxn(ctx context.Context) error {
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
IsStaleness: false,
IsStaleness: isStaleness,
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
}
return nil
Expand Down Expand Up @@ -2638,13 +2647,13 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
// NewTxnWithStalenessOption create a transaction with Staleness option
func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error {
if s.txn.Valid() {
txnID := s.txn.StartTS()
txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope).(string)
//txnID := s.txn.StartTS()
//txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope).(string)
err := s.CommitTxn(ctx)
if err != nil {
return err
}
vars := s.GetSessionVars()
//vars := s.GetSessionVars()
//logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit",
// zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion),
// zap.Uint64("txnStartTS", txnID),
Expand Down
2 changes: 2 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
var tsFuture oracle.Future
if s.sessionVars.LowResolutionTSO {
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
} else if s.sessionVars.ReadStaleness > 0 {
tsFuture = oracleStore.GetStaleTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()}, uint64(s.sessionVars.ReadStaleness))
} else {
tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
}
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,9 @@ type SessionVars struct {

// EnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
EnableTiFlashFallbackTiKV bool

// ReadStaleness is the staleness for stale read.
ReadStaleness int
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
Expand Down Expand Up @@ -1751,6 +1754,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.TiDBEnableExchangePartition = TiDBOptOn(val)
case TiDBEnableTiFlashFallbackTiKV:
s.EnableTiFlashFallbackTiKV = TiDBOptOn(val)
case TiDBReadStaleness:
s.ReadStaleness = tidbOptPositiveInt32(val, 0)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal, Name: TiDBGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64},
{Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true},
{Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "PHYSICAL", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}},
{Scope: ScopeSession, Name: TiDBReadStaleness, Value: "0", Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt16, AllowAutoValue: true},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ const (

// TiDBEnableTiFlashFallbackTiKV indicates whether to fallback to TiKV when TiFlash is unavailable.
TiDBEnableTiFlashFallbackTiKV = "tidb_enable_tiflash_fallback_tikv"

// TiDBReadStaleness is the staleness for stale read.
TiDBReadStaleness = "tidb_read_staleness"
)

// TiDB vars that have only global scope
Expand Down
1 change: 1 addition & 0 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Oracle interface {
GetLowResolutionTimestamp(ctx context.Context, opt *Option) (uint64, error)
GetLowResolutionTimestampAsync(ctx context.Context, opt *Option) Future
GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (uint64, error)
GetStaleTimestampAsync(ctx context.Context, opt *Option, prevSecond uint64) Future
IsExpired(lockTimestamp, TTL uint64, opt *Option) bool
UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64
Close()
Expand Down
8 changes: 8 additions & 0 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (l *localOracle) GetStaleTimestamp(ctx context.Context, txnScope string, pr
return ts, nil
}

func (l *localOracle) GetStaleTimestampAsync(ctx context.Context, opt *oracle.Option, prevSecond uint64) oracle.Future {
ts, err := l.GetStaleTimestamp(ctx, opt.TxnScope, prevSecond)
return lowResolutionTsFuture{
ts: ts,
err: err,
}
}

type future struct {
ctx context.Context
l *localOracle
Expand Down
11 changes: 11 additions & 0 deletions store/tikv/oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,14 @@ func (o *pdOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevS
}
return ts, nil
}

func (o *pdOracle) GetStaleTimestampAsync(ctx context.Context, opt *oracle.Option, prevSecond uint64) oracle.Future {
ts, err := o.getStaleTimestamp(opt.TxnScope, prevSecond)
if err != nil {
return o.GetTimestampAsync(ctx, opt)
}
return lowResolutionTsFuture{
ts: ts,
err: nil,
}
}

0 comments on commit 3547322

Please sign in to comment.