Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove stale-read flag after resolving lock #792

Merged
merged 9 commits into from
May 26, 2023
Merged
6 changes: 6 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ func (req *Request) EnableStaleRead() {
req.ReplicaRead = false
}

// DisableStaleReadMeetLock is called when stale-read fallbacks to leader read after meeting key-is-locked error.
func (req *Request) DisableStaleReadMeetLock() {
req.StaleRead = false
req.ReplicaReadType = kv.ReplicaReadLeader
}

// IsGlobalStaleRead checks if the request is a global stale read request.
func (req *Request) IsGlobalStaleRead() bool {
return req.ReadReplicaScope == oracle.GlobalTxnScope &&
Expand Down
15 changes: 13 additions & 2 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mergeRegionRequestStats(cli.Stats)
}()
}
isStaleness := s.mu.isStaleness
busyThresholdMs := s.mu.busyThreshold.Milliseconds()
s.mu.RUnlock()

pending := batch.keys
Expand All @@ -400,13 +402,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: s.mu.resourceGroupName,
},
BusyThresholdMs: uint32(s.mu.busyThreshold.Milliseconds()),
BusyThresholdMs: uint32(busyThresholdMs),
})
if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil {
s.mu.resourceGroupTagger(req)
}
scope := s.mu.readReplicaScope
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
replicaAdjuster := s.mu.replicaReadAdjuster
s.mu.RUnlock()
Expand Down Expand Up @@ -504,6 +505,11 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
// we need to read from leader after resolving the lock.
if isStaleness {
isStaleness = false
busyThresholdMs = 0
}
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: s.version,
Locks: locks,
Expand Down Expand Up @@ -694,6 +700,11 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
return nil, err
}
if firstLock == nil {
// we need to read from leader after resolving the lock.
if isStaleness {
req.DisableStaleReadMeetLock()
req.BusyThresholdMs = 0
}
firstLock = lock
} else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID {
// If it is an autocommit point get, it needs to be blocked only
Expand Down