Skip to content

Commit

Permalink
Merge branch 'introduce-callerid-for-grpc' of https://github.com/okJi…
Browse files Browse the repository at this point in the history
…ang/pd; branch 'master' of https://github.com/tikv/pd into introduce-callerid-for-grpc

Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Nov 14, 2024
2 parents 550c343 + 40ae26c commit 9e3af20
Show file tree
Hide file tree
Showing 46 changed files with 347 additions and 2,574 deletions.
17 changes: 0 additions & 17 deletions .github/workflows/tso-consistency-test.yaml

This file was deleted.

20 changes: 4 additions & 16 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ SUBMODULES := $(filter $(shell find . -iname "go.mod" -exec dirname {} \;),\
test: install-tools
# testing all pkgs...
@$(FAILPOINT_ENABLE)
CGO_ENABLED=1 go test -tags tso_function_test,deadlock -timeout 20m -race -cover $(TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
CGO_ENABLED=1 go test -tags deadlock -timeout 20m -race -cover $(TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

basic-test: install-tools
Expand All @@ -257,24 +257,12 @@ ci-test-job: install-tools dashboard-ui pd-ut
./scripts/ci-subtask.sh $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso

test-tso: install-tools
# testing TSO function & consistency...
@$(FAILPOINT_ENABLE)
CGO_ENABLED=1 go test -race -tags without_dashboard,tso_full_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)
TSO_FUNCTION_TEST_PKGS := $(PD_PKG)/tests/server/tso

test-tso-function: install-tools
# testing TSO function...
@$(FAILPOINT_ENABLE)
CGO_ENABLED=1 go test -race -tags without_dashboard,tso_function_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

test-tso-consistency: install-tools
# testing TSO consistency...
@$(FAILPOINT_ENABLE)
CGO_ENABLED=1 go test -race -tags without_dashboard,tso_consistency_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
CGO_ENABLED=1 go test -race -tags without_dashboard,deadlock $(TSO_FUNCTION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realcluster
Expand Down Expand Up @@ -302,7 +290,7 @@ test-with-cover-parallel: install-tools dashboard-ui split

split:
# todo: it will remove server/api,/tests and tso packages after daily CI integrate all verify CI.
go list ./... | grep -v -E "github.com/tikv/pd/server/api|github.com/tikv/pd/tests/client|github.com/tikv/pd/tests/server/tso" > packages.list;\
go list ./... | grep -v -E "github.com/tikv/pd/server/api|github.com/tikv/pd/tests/client|$(TSO_FUNCTION_TEST_PKGS)" > packages.list;\
split packages.list -n r/${TASK_COUNT} packages_unit_ -a 1 --numeric-suffixes=1;\
cat packages_unit_${TASK_ID} |tr "\n" " " >package.list;\
rm packages*;
Expand Down
39 changes: 16 additions & 23 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/tikv/pd/client/caller"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/utils/tlsutil"
"github.com/tikv/pd/client/utils/tsoutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -590,18 +589,20 @@ func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, contex

// GetTSAsync implements the TSOClient interface.
func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}

// GetLocalTSAsync implements the TSOClient interface.
func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
defer trace.StartRegion(ctx, "pdclient.GetTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
span = span.Tracer().StartSpan("pdclient.GetTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.inner.dispatchTSORequestWithRetry(ctx)
}

return c.inner.dispatchTSORequestWithRetry(ctx, dcLocation)
// GetLocalTSAsync implements the TSOClient interface.
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTSAsync(ctx context.Context, _ string) TSFuture {
return c.GetTSAsync(ctx)
}

// GetTS implements the TSOClient interface.
Expand All @@ -611,9 +612,11 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
}

// GetLocalTS implements the TSOClient interface.
func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical int64, logical int64, err error) {
resp := c.GetLocalTSAsync(ctx, dcLocation)
return resp.Wait()
//
// Deprecated: Local TSO will be completely removed in the future. Currently, regardless of the
// parameters passed in, this method will default to returning the global TSO.
func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logical int64, err error) {
return c.GetTS(ctx)
}

// GetMinTS implements the TSOClient interface.
Expand Down Expand Up @@ -659,7 +662,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
}

minTS := resp.GetTimestamp()
return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
Expand Down Expand Up @@ -1444,16 +1447,6 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
return nil
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.inner.getTSOClient()
if tsoClient == nil {
return nil
}
return tsoClient.GetTSOAllocators()
}

// WithCallerID implements the RPCClient interface.
func (c *client) WithCallerID(callerID caller.ID) RPCClient {
newClient := *c
Expand Down
4 changes: 2 additions & 2 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
return cc, err
}

func (c *innerClient) dispatchTSORequestWithRetry(ctx context.Context, dcLocation string) TSFuture {
func (c *innerClient) dispatchTSORequestWithRetry(ctx context.Context) TSFuture {
var (
retryable bool
err error
Expand All @@ -226,7 +226,7 @@ func (c *innerClient) dispatchTSORequestWithRetry(ctx context.Context, dcLocatio
}
// Get a new request from the pool if it's nil or not from the current pool.
if req == nil || req.pool != tsoClient.tsoReqPool {
req = tsoClient.getTSORequest(ctx, dcLocation)
req = tsoClient.getTSORequest(ctx)
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
Expand Down
99 changes: 25 additions & 74 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pd
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"reflect"
"sort"
Expand All @@ -40,7 +41,6 @@ import (
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
Expand Down Expand Up @@ -383,21 +383,17 @@ func (c *pdServiceBalancer) get() (ret ServiceClient) {
}

type updateKeyspaceIDFunc func() error
type tsoLocalServURLsUpdatedFunc func(map[string]string) error
type tsoGlobalServURLUpdatedFunc func(string) error
type tsoLeaderURLUpdatedFunc func(string) error

type tsoAllocatorEventSource interface {
// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc)
// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc)
// tsoEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery.
type tsoEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

var (
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil)
_ ServiceDiscovery = (*pdServiceDiscovery)(nil)
_ tsoEventSource = (*pdServiceDiscovery)(nil)
)

// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based
Expand Down Expand Up @@ -426,12 +422,8 @@ type pdServiceDiscovery struct {
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator
// leader list is updated. The input is a map {DC Location -> Leader URL}
tsoLocalAllocLeadersUpdatedCb tsoLocalServURLsUpdatedFunc
// tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServURLUpdatedFunc
// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -801,22 +793,15 @@ func (c *pdServiceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func())
c.membersChangedCbs = append(c.membersChangedCbs, callbacks...)
}

// SetTSOLocalServURLsUpdatedCallback adds a callback which will be called when the local tso
// allocator leader list is updated.
func (c *pdServiceDiscovery) SetTSOLocalServURLsUpdatedCallback(callback tsoLocalServURLsUpdatedFunc) {
c.tsoLocalAllocLeadersUpdatedCb = callback
}

// SetTSOGlobalServURLUpdatedCallback adds a callback which will be called when the global tso
// allocator leader is updated.
func (c *pdServiceDiscovery) SetTSOGlobalServURLUpdatedCallback(callback tsoGlobalServURLUpdatedFunc) {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
func (c *pdServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso global service url update", zap.String("url", url), errs.ZapError(err))
log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err))
}
}
c.tsoGlobalAllocLeaderUpdatedCb = callback
c.tsoLeaderUpdatedCb = callback
}

// getLeaderURL returns the leader URL.
Expand Down Expand Up @@ -901,19 +886,16 @@ func (c *pdServiceDiscovery) updateMember() error {

members, err := c.getMembers(c.ctx, url, updateMemberTimeout)
// Check the cluster ID.
if err == nil && members.GetHeader().GetClusterId() != c.clusterID {
err = errs.ErrClientUpdateMember.FastGenByArgs("cluster id does not match")
updatedClusterID := members.GetHeader().GetClusterId()
if err == nil && updatedClusterID != c.clusterID {
log.Warn("[pd] cluster id does not match",
zap.Uint64("updated-cluster-id", updatedClusterID),
zap.Uint64("expected-cluster-id", c.clusterID))
err = errs.ErrClientUpdateMember.FastGenByArgs(fmt.Sprintf("cluster id does not match: %d != %d", updatedClusterID, c.clusterID))
}
// Check the TSO Allocator Leader.
var errTSO error
if err == nil {
if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}
// Still need to update TsoAllocatorLeaders, even if there is no PD leader
errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders())
if err == nil && (members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0) {
err = errs.ErrClientGetLeader.FastGenByArgs("leader url doesn't exist")
}

// Failed to get members
if err != nil {
log.Info("[pd] cannot update member from this url",
Expand All @@ -926,15 +908,9 @@ func (c *pdServiceDiscovery) updateMember() error {
continue
}
}

c.updateURLs(members.GetMembers())
if err := c.updateServiceClient(members.GetMembers(), members.GetLeader()); err != nil {
return err
}

// If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error,
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
return c.updateServiceClient(members.GetMembers(), members.GetLeader())
}
return errs.ErrClientGetMember.FastGenByArgs()
}
Expand Down Expand Up @@ -1009,13 +985,12 @@ func (c *pdServiceDiscovery) switchLeader(url string) (bool, error) {
newConn, err := c.GetOrCreateGRPCConn(url)
// If gRPC connect is created successfully or leader is new, still saves.
if url != oldLeader.GetURL() || newConn != nil {
// Set PD leader and Global TSO Allocator (which is also the PD leader)
leaderClient := newPDServiceClient(url, url, newConn, true)
c.leader.Store(leaderClient)
}
// Run callbacks
if c.tsoGlobalAllocLeaderUpdatedCb != nil {
if err := c.tsoGlobalAllocLeaderUpdatedCb(url); err != nil {
if c.tsoLeaderUpdatedCb != nil {
if err := c.tsoLeaderUpdatedCb(url); err != nil {
return true, err
}
}
Expand Down Expand Up @@ -1102,30 +1077,6 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader
return err
}

func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error {
if len(allocatorMap) == 0 {
return nil
}

allocMap := make(map[string]string)
// Switch to the new one
for dcLocation, member := range allocatorMap {
if len(member.GetClientUrls()) == 0 {
continue
}
allocMap[dcLocation] = member.GetClientUrls()[0]
}

// Run the callback to reflect any possible change in the local tso allocators.
if c.tsoLocalAllocLeadersUpdatedCb != nil {
if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil {
return err
}
}

return nil
}

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL.
func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) {
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...)
Expand Down
Loading

0 comments on commit 9e3af20

Please sign in to comment.