Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 10, 2023
1 parent 5c324b7 commit acb6745
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 18 deletions.
2 changes: 1 addition & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Future interface {
const (
physicalShiftBits = 18
logicalBits = (1 << physicalShiftBits) - 1
// GlobalTxnScope is the default transaction scope for a Oracle service.
// GlobalTxnScope is the default transaction scope for an Oracle service.
GlobalTxnScope = "global"
)

Expand Down
47 changes: 30 additions & 17 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient util.PdController
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -193,6 +194,13 @@ func WithPool(gp Pool) Option {
}
}

// WithPDAddrsAndTls set the pd addrs and tls for pdHttpClient.
func WithPDAddrsAndTls(tlsConf *tls.Config, pdaddrs []string) Option {
return func(o *KVStore) {
o.pdHttpClient = *util.NewPdController(tlsConf, pdaddrs)
}
}

// loadOption load KVStore option into KVStore.
func loadOption(store *KVStore, opt ...Option) {
for _, f := range opt {
Expand Down Expand Up @@ -581,25 +589,30 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)

safeTS, err := s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
storeIDStr := strconv.Itoa(int(storeID))
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
}
safeTS := resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()

_, preSafeTS := s.getSafeTS(storeID)
if preSafeTS > safeTS {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", storeIDStr).Inc()
Expand Down
204 changes: 204 additions & 0 deletions util/pd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/execdetails.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
"io"
"net/http"
"net/url"
"os"
"strings"
"syscall"
"time"
)

// pd request retry time when connection fail
const (
pdRequestRetryTime = 10

clusterVersionPrefix = "pd/api/v1/config/cluster-version"
storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
)

// PdController manage get/update config from pd.
type PdController struct {
addrs []string
cli *http.Client
}

func NewPdController(
tlsConf *tls.Config,
pdAddrs []string,
) *PdController {
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, "http") {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
pdAddrs[i] = addr
}
}

return &PdController{
addrs: pdAddrs,
cli: httpClient(tlsConf),
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd
func (p *PdController) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/store_id=%d", storeMinResolvedTSPrefix, storeID)
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
if e != nil {
log.Warn("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
continue
}
log.Info("store min resolved ts", zap.String("resp", string(v)))
d := struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
}{}
err = json.Unmarshal(v, &d)
if err != nil {
return 0, errors.Trace(err)
}
if !d.IsRealTime {
message := "store min resolved ts not enabled"
log.Error(message, zap.String("addr", addr))
return 0, errors.Trace(errors.New(message))
}
return d.MinResolvedTS, nil
}
return 0, errors.Trace(err)
}

// pdRequest is a func to send an HTTP to pd and return the result bytes.
func pdRequest(
ctx context.Context,
addr string, prefix string,
cli *http.Client, method string, body io.Reader) ([]byte, error) {
_, respBody, err := pdRequestWithCode(ctx, addr, prefix, cli, method, body)
return respBody, err
}

func pdRequestWithCode(
ctx context.Context,
addr string, prefix string,
cli *http.Client, method string, body io.Reader) (int, []byte, error) {
u, err := url.Parse(addr)
if err != nil {
return 0, nil, errors.Trace(err)
}
reqURL := fmt.Sprintf("%s/%s", u, prefix)
req, err := http.NewRequestWithContext(ctx, method, reqURL, body)
if err != nil {
return 0, nil, errors.Trace(err)
}
var resp *http.Response
count := 0
for {
resp, err = cli.Do(req)
count++

if _, e := EvalFailpoint("InjectClosed"); e == nil {
resp = nil
err = &url.Error{
Op: "read",
Err: os.NewSyscallError("connect", syscall.ECONNREFUSED),
}
return 0, nil, errors.Trace(err)
}

if count > pdRequestRetryTime || (resp != nil && resp.StatusCode < 500) ||
err != nil {
break
}
if resp != nil {
_ = resp.Body.Close()
}
time.Sleep(pdRequestRetryInterval())
}
if err != nil {
return 0, nil, errors.Trace(err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
res, _ := io.ReadAll(resp.Body)
return resp.StatusCode, nil, errors.Errorf("[%d] %s %s", resp.StatusCode, res, reqURL)
}

r, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, nil, errors.Trace(err)
}
return resp.StatusCode, r, nil
}

func pdRequestRetryInterval() time.Duration {
if _, e := EvalFailpoint("FastRetry"); e == nil {
return 0
}
println("pd request retry interval 1s")

return time.Second
}

// httpClient returns an HTTP(s) client.
func httpClient(tlsConf *tls.Config) *http.Client {
// defaultTimeout for non-context requests.
const defaultTimeout = 30 * time.Second
cli := &http.Client{Timeout: defaultTimeout}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}
return cli
}
97 changes: 97 additions & 0 deletions util/pd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/rate_limit_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"fmt"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
"testing"
)

func TestPDRequestRetry(t *testing.T) {
EnableFailpoints()
ctx := context.Background()
require := require.New(t)

require.Nil(failpoint.Enable("tikvclient/FastRetry", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/FastRetry"))
}()

count := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count <= pdRequestRetryTime-1 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
w.WriteHeader(http.StatusOK)
}))
cli := http.DefaultClient
taddr := ts.URL
_, reqErr := pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Nil(reqErr)
ts.Close()

count = 0
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count <= pdRequestRetryTime+1 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
w.WriteHeader(http.StatusOK)
}))
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Error(reqErr)
ts.Close()

require.Nil(failpoint.Enable("tikvclient/InjectClosed", fmt.Sprintf("return(%d)", 0)))
defer func() {
require.Nil(failpoint.Disable("tikvclient/InjectClosed"))
}()
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Error(reqErr)
ts.Close()
}

0 comments on commit acb6745

Please sign in to comment.