Skip to content

Commit

Permalink
Merge branch 'master' into fix-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 8, 2023
2 parents 73d253a + 03081c6 commit cb30f33
Show file tree
Hide file tree
Showing 69 changed files with 2,429 additions and 3,744 deletions.
30 changes: 20 additions & 10 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type baseClient struct {
// dc-location -> TSO allocator leader URL
allocators sync.Map // Store as map[string]string

checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
updateConnectionCtxsCh chan struct{}
checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
updateConnectionCtxsCh chan struct{}
updateTokenConnectionCh chan struct{}

wg sync.WaitGroup
ctx context.Context
Expand All @@ -81,13 +82,14 @@ type SecurityOption struct {
func newBaseClient(ctx context.Context, urls []string, security SecurityOption) *baseClient {
clientCtx, clientCancel := context.WithCancel(ctx)
bc := &baseClient{
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
updateConnectionCtxsCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
security: security,
option: newOption(),
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
updateConnectionCtxsCh: make(chan struct{}, 1),
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
security: security,
option: newOption(),
}
bc.urls.Store(urls)
return bc
Expand Down Expand Up @@ -168,6 +170,13 @@ func (c *baseClient) scheduleUpdateConnectionCtxs() {
}
}

func (c *baseClient) scheduleUpdateTokenConnection() {
select {
case c.updateTokenConnectionCh <- struct{}{}:
default:
}
}

// GetClusterID returns the ClusterID.
func (c *baseClient) GetClusterID(context.Context) uint64 {
return c.clusterID
Expand Down Expand Up @@ -375,6 +384,7 @@ func (c *baseClient) switchLeader(addrs []string) error {
// Set PD leader and Global TSO Allocator (which is also the PD leader)
c.leader.Store(addr)
c.allocators.Store(globalDCLocation, addr)
c.scheduleUpdateTokenConnection()
log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader))
return nil
}
Expand Down
29 changes: 21 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type GlobalConfigItem struct {
EventType pdpb.EventType
Name string
Value string
PayLoad []byte
}

// Client is a PD (Placement Driver) client.
Expand Down Expand Up @@ -122,7 +123,7 @@ type Client interface {
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

// LoadGlobalConfig gets the global config from etcd
LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
Expand Down Expand Up @@ -419,7 +420,7 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
}
// Start the daemons.
c.updateTSODispatcher()
c.createTokenispatcher()
c.createTokenDispatcher()
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
Expand Down Expand Up @@ -1820,16 +1821,22 @@ func trimHTTPPrefix(str string) string {
return str
}

func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath})
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath})
if err != nil {
return nil, 0, err
}

res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
cfg.Value = item.GetValue()
cfg := GlobalConfigItem{Name: item.GetName(), EventType: item.GetKind(), PayLoad: item.GetPayload()}
if item.GetValue() == "" {
// We need to keep the Value field for CDC compatibility.
// But if you not use `Names`, will only have `Payload` field.
cfg.Value = string(item.GetPayload())
} else {
cfg.Value = item.GetValue()
}
res[i] = cfg
}
return res, resp.GetRevision(), nil
Expand All @@ -1838,7 +1845,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]Glo
func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType}
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad}
}
_, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
Expand Down Expand Up @@ -1874,7 +1881,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()}
// We need to keep the Value field for CDC compatibility.
// But if you not use `Names`, will only have `Payload` field.
if i.GetValue() == "" {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), string(i.GetPayload()), i.GetPayload()}
} else {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue(), i.GetPayload()}
}
}
select {
case <-ctx.Done():
Expand Down
26 changes: 24 additions & 2 deletions client/go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
module github.com/tikv/pd/client

go 1.16
go 1.19

require (
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.51.0
)

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit cb30f33

Please sign in to comment.