diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index b47d294e9..553a958a5 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -37,6 +37,7 @@ package tikv_test import ( "context" "fmt" + "math" "strings" "sync/atomic" "testing" @@ -208,6 +209,55 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) } +func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { + util.EnableFailpoints() + require := s.Require() + require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + defer func() { + require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) + }() + // Try to get the minimum resolved timestamp of the cluster from PD. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) + mockClient := storeSafeTsMockClient{ + Client: s.store.GetTiKVClient(), + } + s.store.SetTiKVClient(&mockClient) + // Mock safeTS is not initialized. + s.store.SetStoreSafeTS(uint64(1), 0) + s.store.SetMinSafeTS(oracle.GlobalTxnScope, 0) + require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + + var retryCount int + for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { + require.NotEqual(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + time.Sleep(100 * time.Millisecond) + if retryCount > 5 { + break + } + retryCount++ + } + require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + + // Try to get the minimum resolved timestamp of the cluster from TiKV. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) + // Mock safeTS is not initialized. + s.store.SetStoreSafeTS(uint64(1), 0) + s.store.SetMinSafeTS(oracle.GlobalTxnScope, 0) + require.Equal(uint64(0), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + + for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { + require.NotEqual(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + time.Sleep(100 * time.Millisecond) + if retryCount > 5 { + break + } + retryCount++ + } + require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) +} + func (s *apiTestSuite) TearDownTest() { if s.store != nil { s.Require().Nil(s.store.Close()) diff --git a/tikv/kv.go b/tikv/kv.go index 506bfd67d..0345904ed 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -500,6 +500,9 @@ func (s *KVStore) GetTiKVClient() (client Client) { // GetMinSafeTS return the minimal safeTS of the storage with given txnScope. func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { if val, ok := s.minSafeTS.Load(txnScope); ok { + if val.(uint64) == uint64(math.MaxUint64) { + return 0 + } return val.(uint64) } return 0 @@ -850,3 +853,15 @@ type SchemaVer = transaction.SchemaVer // MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit. // We use it to abort the transaction to guarantee GC worker will not influence it. const MaxTxnTimeUse = transaction.MaxTxnTimeUse + +// SetMinSafeTS set the minimal safeTS of the storage with given txnScope. +// Note: it is only used for test. +func (s *KVStore) SetMinSafeTS(txnScope string, safeTS uint64) { + s.minSafeTS.Store(txnScope, safeTS) +} + +// SetStoreSafeTS set the safeTS of the store with given storeID. +// Note: it is only used for test. +func (s *KVStore) SetStoreSafeTS(storeID, safeTS uint64) { + s.safeTSMap.Store(storeID, safeTS) +}