Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pick #1250 to tidb-7.5 #1325

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
retryCount++
}
// Make sure the store's min resolved ts is not initialized.
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
Expand Down
9 changes: 9 additions & 0 deletions internal/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package logutil

import (
"context"
"testing"

"github.com/pingcap/log"
"go.uber.org/zap"
Expand All @@ -60,3 +61,11 @@ type ctxLogKeyType struct{}
// CtxLogKey is the key to retrieve logger from context.
// It can be assigned to another value.
var CtxLogKey interface{} = ctxLogKeyType{}

// AssertWarn panics when in testing mode, and logs a warning msg otherwise.
func AssertWarn(logger *zap.Logger, msg string, fields ...zap.Field) {
if testing.Testing() {
logger.Panic(msg, fields...)
}
logger.Warn(msg, fields...)
}
64 changes: 52 additions & 12 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ type KVStore struct {
wg sync.WaitGroup
close atomicutil.Bool
gP Pool

testingKnobs struct {
mockGetMinResolvedTSByStoresIDs atomic.Pointer[func(ctx context.Context, ids []string) (uint64, map[uint64]uint64, error)]
}
}

func (s *KVStore) setGetMinResolvedTSByStoresIDs(f func(ctx context.Context, ids []string) (uint64, map[uint64]uint64, error)) {
s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Store(&f)
}

func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, ids []string) (uint64, map[uint64]uint64, error) {
if f := s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Load(); f != nil {
return (*f)(ctx, ids)
}
return s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, ids)
}

// Go run the function in a separate goroutine.
Expand Down Expand Up @@ -497,6 +512,15 @@ func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
return 0
}

func (s *KVStore) setMinSafeTS(txnScope string, safeTS uint64) {
// ensure safeTS is not set to max uint64
if safeTS == math.MaxUint64 {
logutil.AssertWarn(logutil.BgLogger(), "skip setting min-safe-ts to max uint64", zap.String("txnScope", txnScope), zap.Stack("stack"))
return
}
s.minSafeTS.Store(txnScope, safeTS)
}

// Ctx returns ctx.
func (s *KVStore) Ctx() context.Context {
return s.ctx
Expand Down Expand Up @@ -532,14 +556,22 @@ func (s *KVStore) getSafeTS(storeID uint64) (bool, uint64) {

// setSafeTS sets safeTs for store storeID, export for testing
func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
// ensure safeTS is not set to max uint64
if safeTS == math.MaxUint64 {
logutil.AssertWarn(logutil.BgLogger(), "skip setting safe-ts to max uint64", zap.Uint64("storeID", storeID), zap.Stack("stack"))
return
}
s.safeTSMap.Store(storeID, safeTS)
}

func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
minSafeTS := uint64(math.MaxUint64)
// when there is no store, return 0 in order to let minStartTS become startTS directly
// actually storeIDs won't be empty since updateMinSafeTS is only called by updateSafeTS and updateSafeTS builds
// txnScopeMap with non-empty values. here we check it to make the logic more robust.
if len(storeIDs) < 1 {
s.minSafeTS.Store(txnScope, 0)
s.setMinSafeTS(txnScope, 0)
return
}
for _, store := range storeIDs {
ok, safeTS := s.getSafeTS(store)
Expand All @@ -551,7 +583,11 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
minSafeTS = 0
}
}
s.minSafeTS.Store(txnScope, minSafeTS)
// if minSafeTS is still math.MaxUint64, that means all store safe ts are 0, then we set minSafeTS to 0.
if minSafeTS == math.MaxUint64 {
minSafeTS = 0
}
s.setMinSafeTS(txnScope, minSafeTS)
}

func (s *KVStore) safeTSUpdater() {
Expand Down Expand Up @@ -591,11 +627,11 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
if s.pdHttpClient != nil {
if s.pdHttpClient != nil || s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Load() != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
_, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
Expand All @@ -612,8 +648,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
defer wg.Done()

var safeTS uint64
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
// If getting the minimum resolved timestamp from PD failed or returned 0/MaxUint64, try to get it from TiKV.
if storeMinResolvedTSs == nil || !isValidSafeTS(storeMinResolvedTSs[storeID]) || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
Expand Down Expand Up @@ -675,21 +711,21 @@ var (
func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
if (s.pdHttpClient != nil || s.testingKnobs.mockGetMinResolvedTSByStoresIDs.Load() != nil) && isGlobal {
clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
} else if isValidSafeTS(clusterMinSafeTS) {
// Update ts and metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
// If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized.
// preClusterMinSafeTS is guaranteed to be less than math.MaxUint64 (by this method and setMinSafeTS)
// related to https://github.com/tikv/client-go/issues/991
if preClusterMinSafeTS != math.MaxUint64 && preClusterMinSafeTS > clusterMinSafeTS {
if preClusterMinSafeTS > clusterMinSafeTS {
skipSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
} else {
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
s.setMinSafeTS(oracle.GlobalTxnScope, clusterMinSafeTS)
successSafeTSUpdateCounter.Inc()
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
Expand All @@ -701,6 +737,10 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
return false
}

func isValidSafeTS(ts uint64) bool {
return ts != 0 && ts != math.MaxUint64
}

// EnableResourceControl enables the resource control.
func EnableResourceControl() {
client.ResourceControlSwitch.Store(true)
Expand Down
Loading