Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: retry TSO RPC #24682

Merged
merged 9 commits into from
May 18, 2021
14 changes: 5 additions & 9 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,15 +739,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
return
}
bo := retry.NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars)
now, err := c.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
now, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
err1 := bo.Backoff(retry.BoPDRPC, err)
if err1 != nil {
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
Expand Down Expand Up @@ -999,7 +995,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// from PD and plus one as our MinCommitTS.
if commitTSMayBeCalculated && c.needLinearizability() {
failpoint.Inject("getMinCommitTSFromTSO", nil)
latestTS, err := c.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
latestTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope())
// If we fail to get a timestamp from PD, we just propagate the failure
// instead of falling back to the normal 2PC because a normal 2PC will
// also be likely to fail due to the same timestamp issue.
Expand Down
9 changes: 2 additions & 7 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// locks have been cleaned before GC.
expiredLocks := locks

callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return false, errors.Trace(err)
}

txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
Expand All @@ -243,7 +238,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}
Expand All @@ -257,7 +252,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true, l)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l)
if err != nil {
return false, err
}
Expand Down