Skip to content

Commit

Permalink
Merge branch 'master' into cleanup-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed Jul 14, 2023
2 parents a2cda87 + c8f88e4 commit 37a653a
Show file tree
Hide file tree
Showing 64 changed files with 1,583 additions and 284 deletions.
13 changes: 8 additions & 5 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)
default: static tidy test

test:
CGO_ENABLE=1 go test -race -cover
CGO_ENABLE=1 go test ./... -race -cover

basic-test:
CGO_ENABLE=1 go test
CGO_ENABLE=1 go test ./...

ci-test-job:
CGO_ENABLED=1 go test -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client
CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client

install-tools:
cd .. && $(MAKE) install-tools

static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ golangci-lint run -c ../.golangci.yml ./...
@ revive -formatter friendly -config ../revive.toml .
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./...
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

tidy:
@ go mod tidy
Expand Down
4 changes: 3 additions & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const (
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
)

Expand Down Expand Up @@ -91,6 +92,7 @@ var (
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
)

// ErrClientGetResourceGroup is the error type for getting resource group.
type ErrClientGetResourceGroup struct {
ResourceGroupName string
Cause string
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc=
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
86 changes: 44 additions & 42 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// KeyspaceClient manages keyspace metadata.
type KeyspaceClient interface {
// LoadKeyspace load and return target keyspace's metadata.
LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error)
// WatchKeyspaces watches keyspace meta changes.
WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
// UpdateKeyspaceState updates target keyspace's state.
UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error)
// WatchKeyspaces watches keyspace meta changes.
WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
// GetAllKeyspaces get all keyspace's metadata.
GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error)
}

// keyspaceClient returns the KeyspaceClient from current PD leader.
Expand Down Expand Up @@ -75,44 +75,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
return resp.Keyspace, nil
}

// WatchKeyspaces watches keyspace meta changes.
// It returns a stream of slices of keyspace metadata.
// The first message in stream contains all current keyspaceMeta,
// all subsequent messages contains new put events for all keyspaces.
func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) {
keyspaceWatcherChan := make(chan []*keyspacepb.KeyspaceMeta)
req := &keyspacepb.WatchKeyspacesRequest{
Header: c.requestHeader(),
}
stream, err := c.keyspaceClient().WatchKeyspaces(ctx, req)
if err != nil {
close(keyspaceWatcherChan)
return nil, err
}
go func() {
defer func() {
close(keyspaceWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in keyspace client `WatchKeyspaces`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
return
}
keyspaceWatcherChan <- resp.Keyspaces
}
}
}()
return keyspaceWatcherChan, err
}

// UpdateKeyspaceState attempts to update the keyspace specified by ID to the target state,
// it will also record StateChangedAt for the given keyspace if a state change took place.
// Currently, legal operations includes:
Expand Down Expand Up @@ -153,3 +115,43 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

return resp.Keyspace, nil
}

// WatchKeyspaces watches keyspace meta changes.
// It returns a stream of slices of keyspace metadata.
// The first message in stream contains all current keyspaceMeta,
// all subsequent messages contains new put events for all keyspaces.
func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) {
return nil, errors.Errorf("WatchKeyspaces unimplemented")
}

// GetAllKeyspaces get all keyspaces metadata.
func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &keyspacepb.GetAllKeyspacesRequest{
Header: c.requestHeader(),
StartId: startID,
Limit: limit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req)
cancel()

if err != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String())
}

return resp.Keyspaces, nil
}
2 changes: 2 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var (
cmdDurationSplitAndScatterRegions prometheus.Observer
cmdDurationLoadKeyspace prometheus.Observer
cmdDurationUpdateKeyspaceState prometheus.Observer
cmdDurationGetAllKeyspaces prometheus.Observer
cmdDurationGet prometheus.Observer
cmdDurationPut prometheus.Observer
cmdDurationUpdateGCSafePointV2 prometheus.Observer
Expand Down Expand Up @@ -184,6 +185,7 @@ func initCmdDurations() {
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationGetAllKeyspaces = cmdDuration.WithLabelValues("get_all_keyspaces")
cmdDurationGet = cmdDuration.WithLabelValues("get")
cmdDurationPut = cmdDuration.WithLabelValues("put")
cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
Expand Down
28 changes: 14 additions & 14 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ const (
defaultAvgBatchProportion = 0.7
)

// ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.
type ControllerConfig struct {
// Config is the configuration of the resource manager controller which includes some option for client needed.
type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

Expand All @@ -87,9 +87,9 @@ type ControllerConfig struct {
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

// DefaultControllerConfig returns the default resource manager controller configuration.
func DefaultControllerConfig() *ControllerConfig {
return &ControllerConfig{
// DefaultConfig returns the default resource manager controller configuration.
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
RequestUnit: DefaultRequestUnitConfig(),
}
Expand Down Expand Up @@ -130,10 +130,10 @@ func DefaultRequestUnitConfig() RequestUnitConfig {
}
}

// Config is the configuration of the resource units, which gives the read/write request
// RUConfig is the configuration of the resource units, which gives the read/write request
// units or request resource cost standards. It should be calculated by a given `RequestUnitConfig`
// or `RequestResourceConfig`.
type Config struct {
type RUConfig struct {
// RU model config
ReadBaseCost RequestUnit
ReadPerBatchBaseCost RequestUnit
Expand All @@ -148,16 +148,16 @@ type Config struct {
DegradedModeWaitDuration time.Duration
}

// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
return GenerateConfig(
DefaultControllerConfig(),
// DefaultRUConfig returns the default configuration.
func DefaultRUConfig() *RUConfig {
return GenerateRUConfig(
DefaultConfig(),
)
}

// GenerateConfig generates the configuration by the given request unit configuration.
func GenerateConfig(config *ControllerConfig) *Config {
cfg := &Config{
// GenerateRUConfig generates the configuration by the given request unit configuration.
func GenerateRUConfig(config *Config) *RUConfig {
cfg := &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
Expand Down
Loading

0 comments on commit 37a653a

Please sign in to comment.