Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd http: support api to get store min resolved ts #793

Merged
merged 10 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Future interface {
const (
physicalShiftBits = 18
logicalBits = (1 << physicalShiftBits) - 1
// GlobalTxnScope is the default transaction scope for a Oracle service.
// GlobalTxnScope is the default transaction scope for an Oracle service.
GlobalTxnScope = "global"
)

Expand Down
47 changes: 30 additions & 17 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient util.PdController
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -193,6 +194,13 @@ func WithPool(gp Pool) Option {
}
}

// WithPDAddrsAndTls set the pd addrs and tls for pdHttpClient.
func WithPDAddrsAndTls(tlsConf *tls.Config, pdaddrs []string) Option {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return func(o *KVStore) {
o.pdHttpClient = *util.NewPdController(tlsConf, pdaddrs)
}
}

// loadOption load KVStore option into KVStore.
func loadOption(store *KVStore, opt ...Option) {
for _, f := range opt {
Expand Down Expand Up @@ -581,25 +589,30 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)

safeTS, err := s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
storeIDStr := strconv.Itoa(int(storeID))
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
}
safeTS := resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()

_, preSafeTS := s.getSafeTS(storeID)
if preSafeTS > safeTS {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", storeIDStr).Inc()
Expand Down
204 changes: 204 additions & 0 deletions util/pd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/execdetails.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"syscall"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

// pd request retry time when connection fail
const (
pdRequestRetryTime = 10

HuSharp marked this conversation as resolved.
Show resolved Hide resolved
storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
)

// PdController manage get/update config from pd.
type PdController struct {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
addrs []string
cli *http.Client
}

func NewPdController(
tlsConf *tls.Config,
pdAddrs []string,
) *PdController {
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, "http") {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
pdAddrs[i] = addr
}
}

return &PdController{
addrs: pdAddrs,
cli: httpClient(tlsConf),
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd
func (p *PdController) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/store_id=%d", storeMinResolvedTSPrefix, storeID)
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
if e != nil {
log.Warn("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
continue
}
log.Info("store min resolved ts", zap.String("resp", string(v)))
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
d := struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
}{}
err = json.Unmarshal(v, &d)
if err != nil {
return 0, errors.Trace(err)
}
if !d.IsRealTime {
message := "store min resolved ts not enabled"
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
log.Error(message, zap.String("addr", addr))
return 0, errors.Trace(errors.New(message))
}
return d.MinResolvedTS, nil
}
return 0, errors.Trace(err)
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}

// pdRequest is a func to send an HTTP to pd and return the result bytes.
func pdRequest(
ctx context.Context,
addr string, prefix string,
cli *http.Client, method string, body io.Reader) ([]byte, error) {
_, respBody, err := pdRequestWithCode(ctx, addr, prefix, cli, method, body)
return respBody, err
}

func pdRequestWithCode(
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
addr string, prefix string,
cli *http.Client, method string, body io.Reader) (int, []byte, error) {
u, err := url.Parse(addr)
if err != nil {
return 0, nil, errors.Trace(err)
}
reqURL := fmt.Sprintf("%s/%s", u, prefix)
req, err := http.NewRequestWithContext(ctx, method, reqURL, body)
if err != nil {
return 0, nil, errors.Trace(err)
}
var resp *http.Response
count := 0
for {
resp, err = cli.Do(req)
count++

if _, e := EvalFailpoint("InjectClosed"); e == nil {
resp = nil
err = &url.Error{
Op: "read",
Err: os.NewSyscallError("connect", syscall.ECONNREFUSED),
}
return 0, nil, errors.Trace(err)
}

if count > pdRequestRetryTime || (resp != nil && resp.StatusCode < 500) ||
err != nil {
break
}
if resp != nil {
_ = resp.Body.Close()
}
time.Sleep(pdRequestRetryInterval())
}
if err != nil {
return 0, nil, errors.Trace(err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
res, _ := io.ReadAll(resp.Body)
return resp.StatusCode, nil, errors.Errorf("[%d] %s %s", resp.StatusCode, res, reqURL)
}

r, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, nil, errors.Trace(err)
}
return resp.StatusCode, r, nil
}

func pdRequestRetryInterval() time.Duration {
if _, e := EvalFailpoint("FastRetry"); e == nil {
return 0
}
println("pd request retry interval 1s")

return time.Second
}

// httpClient returns an HTTP(s) client.
func httpClient(tlsConf *tls.Config) *http.Client {
// defaultTimeout for non-context requests.
const defaultTimeout = 30 * time.Second
cli := &http.Client{Timeout: defaultTimeout}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}
return cli
}
98 changes: 98 additions & 0 deletions util/pd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/rate_limit_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
)

func TestPDRequestRetry(t *testing.T) {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
EnableFailpoints()
ctx := context.Background()
require := require.New(t)

require.Nil(failpoint.Enable("tikvclient/FastRetry", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/FastRetry"))
}()

count := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count <= pdRequestRetryTime-1 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
w.WriteHeader(http.StatusOK)
}))
cli := http.DefaultClient
taddr := ts.URL
_, reqErr := pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Nil(reqErr)
ts.Close()

count = 0
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count <= pdRequestRetryTime+1 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
w.WriteHeader(http.StatusOK)
}))
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Error(reqErr)
ts.Close()

require.Nil(failpoint.Enable("tikvclient/InjectClosed", fmt.Sprintf("return(%d)", 0)))
defer func() {
require.Nil(failpoint.Disable("tikvclient/InjectClosed"))
}()
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Error(reqErr)
ts.Close()
}