Skip to content

Commit

Permalink
pd http: support api to get store min resolved ts (#793)
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 12, 2023
1 parent c946782 commit adb48af
Show file tree
Hide file tree
Showing 9 changed files with 514 additions and 30 deletions.
148 changes: 148 additions & 0 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
"context"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
)

func TestPDAPI(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestPDAPI because with-tikv is not enabled")
}
suite.Run(t, new(apiTestSuite))
}

type apiTestSuite struct {
suite.Suite
store *tikv.KVStore
}

func (s *apiTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().Nil(err)
rpcClient := tikv.NewRPCClient()
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
}

func (s *apiTestSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}

type storeSafeTsMockClient struct {
tikv.Client
requestCount int32
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150}
return resp, nil
}

func (c *storeSafeTsMockClient) Close() error {
return c.Client.Close()
}

func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the store from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the store from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
}
}
8 changes: 4 additions & 4 deletions integration_tests/raw/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
pd "github.com/tikv/pd/client"
)

func TestApi(t *testing.T) {
func TestAPI(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestApi because with-tikv is not enabled")
t.Skip("skipping TestAPI because with-tikv is not enabled")
}
suite.Run(t, new(apiTestSuite))
}
Expand Down Expand Up @@ -517,10 +517,10 @@ func (s *apiTestSuite) TestEmptyValue() {

func (s *apiTestSuite) TearDownTest() {
if s.client != nil {
_ = s.client.Close()
s.Require().Nil(s.client.Close())
}
if s.clientForCas != nil {
_ = s.clientForCas.Close()
s.Require().Nil(s.clientForCas.Close())
}
if s.pdClient != nil {
s.pdClient.Close()
Expand Down
9 changes: 5 additions & 4 deletions integration_tests/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ func (s *testTiclientSuite) TearDownSuite() {
// Clean all data, or it may pollute other data.
txn := s.beginTxn()
scanner, err := txn.Iter(encodeKey(s.prefix, ""), nil)
s.Require().Nil(err)
s.Require().NotNil(scanner)
require := s.Require()
require.Nil(err)
require.NotNil(scanner)
for scanner.Valid() {
k := scanner.Key()
err = txn.Delete(k)
s.Require().Nil(err)
scanner.Next()
}
err = txn.Commit(context.Background())
s.Require().Nil(err)
require.Nil(err)
err = s.store.Close()
s.Require().Nil(err)
require.Nil(err)
}

func (s *testTiclientSuite) beginTxn() *tikv.KVTxn {
Expand Down
4 changes: 2 additions & 2 deletions internal/resourcecontrol/resource_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -115,7 +115,7 @@ func MakeResponseInfo(resp *tikvrpc.Response) *ResponseInfo {
// TODO: using a more accurate size rather than using the whole response size as the read bytes.
readBytes = uint64(r.Size())
default:
log.Debug("[kv resource] unknown response type to collect the info", zap.Any("type", reflect.TypeOf(r)))
logutil.BgLogger().Debug("[kv resource] unknown response type to collect the info", zap.Any("type", reflect.TypeOf(r)))
return &ResponseInfo{}
}
// Try to get read bytes from the `detailsV2`.
Expand Down
2 changes: 1 addition & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Future interface {
const (
physicalShiftBits = 18
logicalBits = (1 << physicalShiftBits) - 1
// GlobalTxnScope is the default transaction scope for a Oracle service.
// GlobalTxnScope is the default transaction scope for an Oracle service.
GlobalTxnScope = "global"
)

Expand Down
61 changes: 44 additions & 17 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient *util.PDHTTPClient
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -193,6 +194,13 @@ func WithPool(gp Pool) Option {
}
}

// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
return func(o *KVStore) {
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
}
}

// loadOption load KVStore option into KVStore.
func loadOption(store *KVStore, opt ...Option) {
for _, f := range opt {
Expand Down Expand Up @@ -353,6 +361,9 @@ func (s *KVStore) Close() error {

s.oracle.Close()
s.pdClient.Close()
if s.pdHttpClient != nil {
s.pdHttpClient.Close()
}
s.lockResolver.Close()

if err := s.GetTiKVClient().Close(); err != nil {
Expand Down Expand Up @@ -385,7 +396,7 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) {
return startTS, nil
}

// GetTimestampWithRetry returns latest timestamp.
// GetTimestampWithRetry returns the latest timestamp.
func (s *KVStore) GetTimestampWithRetry(bo *Backoffer, scope string) (uint64, error) {
return s.getTimestampWithRetry(bo, scope)
}
Expand Down Expand Up @@ -581,25 +592,41 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,

var (
safeTS uint64
err error
)
storeIDStr := strconv.Itoa(int(storeID))
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
}
}
safeTS := resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
}

_, preSafeTS := s.getSafeTS(storeID)
if preSafeTS > safeTS {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", storeIDStr).Inc()
Expand Down
4 changes: 2 additions & 2 deletions tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *CodecClient) SendRequest(ctx context.Context, addr string, req *tikvrpc
}

// NewTestTiKVStore creates a test store with Option
func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Client) Client, pdClientHijack func(pd.Client) pd.Client, txnLocalLatches uint) (*KVStore, error) {
func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Client) Client, pdClientHijack func(pd.Client) pd.Client, txnLocalLatches uint, opt ...Option) (*KVStore, error) {
codec := apicodec.NewCodecV1(apicodec.ModeTxn)
client = &CodecClient{
Client: client,
Expand All @@ -84,7 +84,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
// Make sure the uuid is unique.
uid := uuid.New().String()
spkv := NewMockSafePointKV()
tikvStore, err := NewKVStore(uid, pdCli, spkv, client)
tikvStore, err := NewKVStore(uid, pdCli, spkv, client, opt...)

if txnLocalLatches > 0 {
tikvStore.EnableTxnLocalLatches(txnLocalLatches)
Expand Down
Loading

0 comments on commit adb48af

Please sign in to comment.