Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd http: support api to get store min resolved ts #793

Merged
merged 10 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
"context"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
)

func TestPDAPI(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestPDAPI because with-tikv is not enabled")
}
suite.Run(t, new(apiTestSuite))
}

type apiTestSuite struct {
suite.Suite
store *tikv.KVStore
}

func (s *apiTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().Nil(err)
rpcClient := tikv.NewRPCClient()
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
}

func (s *apiTestSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}

type storeSafeTsMockClient struct {
tikv.Client
requestCount int32
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150}
return resp, nil
}

func (c *storeSafeTsMockClient) Close() error {
return c.Client.Close()
}

func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the store from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the store from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
}
}
8 changes: 4 additions & 4 deletions integration_tests/raw/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
pd "github.com/tikv/pd/client"
)

func TestApi(t *testing.T) {
func TestAPI(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestApi because with-tikv is not enabled")
t.Skip("skipping TestAPI because with-tikv is not enabled")
}
suite.Run(t, new(apiTestSuite))
}
Expand Down Expand Up @@ -517,10 +517,10 @@ func (s *apiTestSuite) TestEmptyValue() {

func (s *apiTestSuite) TearDownTest() {
if s.client != nil {
_ = s.client.Close()
s.Require().Nil(s.client.Close())
}
if s.clientForCas != nil {
_ = s.clientForCas.Close()
s.Require().Nil(s.clientForCas.Close())
}
if s.pdClient != nil {
s.pdClient.Close()
Expand Down
9 changes: 5 additions & 4 deletions integration_tests/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ func (s *testTiclientSuite) TearDownSuite() {
// Clean all data, or it may pollute other data.
txn := s.beginTxn()
scanner, err := txn.Iter(encodeKey(s.prefix, ""), nil)
s.Require().Nil(err)
s.Require().NotNil(scanner)
require := s.Require()
require.Nil(err)
require.NotNil(scanner)
for scanner.Valid() {
k := scanner.Key()
err = txn.Delete(k)
s.Require().Nil(err)
scanner.Next()
}
err = txn.Commit(context.Background())
s.Require().Nil(err)
require.Nil(err)
err = s.store.Close()
s.Require().Nil(err)
require.Nil(err)
}

func (s *testTiclientSuite) beginTxn() *tikv.KVTxn {
Expand Down
2 changes: 1 addition & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Future interface {
const (
physicalShiftBits = 18
logicalBits = (1 << physicalShiftBits) - 1
// GlobalTxnScope is the default transaction scope for a Oracle service.
// GlobalTxnScope is the default transaction scope for an Oracle service.
GlobalTxnScope = "global"
)

Expand Down
58 changes: 41 additions & 17 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient *util.PDHTTPClient
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -193,6 +194,13 @@ func WithPool(gp Pool) Option {
}
}

// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
return func(o *KVStore) {
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
}
}

// loadOption load KVStore option into KVStore.
func loadOption(store *KVStore, opt ...Option) {
for _, f := range opt {
Expand Down Expand Up @@ -353,6 +361,9 @@ func (s *KVStore) Close() error {

s.oracle.Close()
s.pdClient.Close()
if s.pdHttpClient != nil {
s.pdHttpClient.Close()
}
s.lockResolver.Close()

if err := s.GetTiKVClient().Close(); err != nil {
Expand Down Expand Up @@ -385,7 +396,7 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) {
return startTS, nil
}

// GetTimestampWithRetry returns latest timestamp.
// GetTimestampWithRetry returns the latest timestamp.
func (s *KVStore) GetTimestampWithRetry(bo *Backoffer, scope string) (uint64, error) {
return s.getTimestampWithRetry(bo, scope)
}
Expand Down Expand Up @@ -581,25 +592,38 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,

var (
safeTS uint64
err error
)
storeIDStr := strconv.Itoa(int(storeID))
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a debug log to output safeTS and err when it's not nil.

}
safeTS := resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
}

_, preSafeTS := s.getSafeTS(storeID)
if preSafeTS > safeTS {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", storeIDStr).Inc()
Expand Down
4 changes: 2 additions & 2 deletions tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *CodecClient) SendRequest(ctx context.Context, addr string, req *tikvrpc
}

// NewTestTiKVStore creates a test store with Option
func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Client) Client, pdClientHijack func(pd.Client) pd.Client, txnLocalLatches uint) (*KVStore, error) {
func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Client) Client, pdClientHijack func(pd.Client) pd.Client, txnLocalLatches uint, opt ...Option) (*KVStore, error) {
codec := apicodec.NewCodecV1(apicodec.ModeTxn)
client = &CodecClient{
Client: client,
Expand All @@ -84,7 +84,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
// Make sure the uuid is unique.
uid := uuid.New().String()
spkv := NewMockSafePointKV()
tikvStore, err := NewKVStore(uid, pdCli, spkv, client)
tikvStore, err := NewKVStore(uid, pdCli, spkv, client, opt...)

if txnLocalLatches > 0 {
tikvStore.EnableTxnLocalLatches(txnLocalLatches)
Expand Down
Loading