Skip to content

Commit

Permalink
etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661)
Browse files Browse the repository at this point in the history
close #9565
  • Loading branch information
asddongmen authored Sep 8, 2023
1 parent 2e8893a commit 4452688
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 89 deletions.
9 changes: 6 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,6 +82,7 @@ type captureImpl struct {
liveness model.Liveness
config *config.ServerConfig

pdClient pd.Client
pdEndpoints []string
ownerMu sync.Mutex
owner owner.Owner
Expand Down Expand Up @@ -128,6 +130,7 @@ func NewCapture(pdEndpoints []string,
etcdClient etcd.CDCEtcdClient,
grpcService *p2p.ServerWrapper,
sortEngineMangerFactory *factory.SortEngineFactory,
pdClient pd.Client,
) Capture {
conf := config.GetGlobalServerConfig()
return &captureImpl{
Expand All @@ -142,8 +145,8 @@ func NewCapture(pdEndpoints []string,
newController: controller.NewController,
info: &model.CaptureInfo{},
sortEngineFactory: sortEngineMangerFactory,

migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
pdClient: pdClient,
}
}

Expand Down Expand Up @@ -227,7 +230,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
c.upstreamManager.Close()
}
c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security)
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient)
if err != nil {
return errors.Trace(err)
}
Expand Down
112 changes: 56 additions & 56 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,11 @@ import (
"github.com/pingcap/tiflow/pkg/util"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -78,11 +74,15 @@ type Server interface {
// TODO: we need to make server more unit testable and add more test cases.
// Especially we need to decouple the HTTPServer out of server.
type server struct {
capture capture.Capture
tcpServer tcpserver.TCPServer
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
capture capture.Capture
tcpServer tcpserver.TCPServer
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
// pdClient is the default upstream PD client.
// The PD acts as a metadata management service for TiCDC.
pdClient pd.Client
pdAPIClient pdutil.PDAPIClient
pdEndpoints []string
sortEngineFactory *factory.SortEngineFactory
}
Expand Down Expand Up @@ -125,35 +125,21 @@ func New(pdEndpoints []string) (*server, error) {
func (s *server) prepare(ctx context.Context) error {
conf := config.GetGlobalServerConfig()

grpcTLSOption, err := conf.Security.ToGRPCDialOption()
tlsConfig, err := conf.Security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}

tlsConfig, err := conf.Security.ToTLSConfig()
grpcTLSOption, err := conf.Security.ToGRPCDialOption()
if err != nil {
return errors.Trace(err)
}

logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
AutoSyncInterval: 30 * time.Second,
DialOptions: []grpc.DialOption{
log.Info("create pd client", zap.Strings("endpoints", s.pdEndpoints))
s.pdClient, err = pd.NewClientWithContext(
ctx, s.pdEndpoints, conf.Security.PDSecurityOption(),
// the default `timeout` is 3s, maybe too small if the pd is busy,
// set to 10s to avoid frequent timeout.
pd.WithCustomTimeoutOption(10*time.Second),
pd.WithGRPCDialOptions(
grpcTLSOption,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand All @@ -165,12 +151,24 @@ func (s *server) prepare(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
))
if err != nil {
return errors.Trace(err)
}
s.pdAPIClient, err = pdutil.NewPDAPIClient(s.pdClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to create a the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, grpcTLSOption, s.pdEndpoints...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -181,6 +179,15 @@ func (s *server) prepare(ctx context.Context) error {
}
s.etcdClient = cdcEtcdClient

// Collect all endpoints from pd here to make the server more robust.
// Because in some scenarios, the deployer may only provide one pd endpoint,
// this will cause the TiCDC server to fail to restart when some pd node is down.
allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
return errors.Trace(err)
}
s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...)

err = s.initDir(ctx)
if err != nil {
return errors.Trace(err)
Expand All @@ -192,9 +199,8 @@ func (s *server) prepare(ctx context.Context) error {
return errors.Trace(err)
}

s.capture = capture.NewCapture(
s.pdEndpoints, cdcEtcdClient, s.grpcService, s.sortEngineFactory)

s.capture = capture.NewCapture(s.pdEndpoints, cdcEtcdClient,
s.grpcService, s.sortEngineFactory, s.pdClient)
return nil
}

Expand Down Expand Up @@ -289,18 +295,7 @@ func (s *server) startStatusHTTP(lis net.Listener) error {
return nil
}

func (s *server) etcdHealthChecker(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption())
if err != nil {
return errors.Trace(err)
}
pc, err := pdutil.NewPDAPIClient(grpcClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
defer pc.Close()

func (s *server) upstreamPDHealthChecker(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

Expand All @@ -309,15 +304,15 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
endpoints, err := pc.CollectMemberEndpoints(ctx)
endpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
log.Warn("etcd health check: cannot collect all members", zap.Error(err))
continue
}
for _, endpoint := range endpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if err := pc.Healthy(ctx, endpoint); err != nil {
if err := s.pdAPIClient.Healthy(ctx, endpoint); err != nil {
log.Warn("etcd health check error",
zap.String("endpoint", endpoint), zap.Error(err))
}
Expand All @@ -338,6 +333,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
func (s *server) run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer s.pdAPIClient.Close()

eg, egCtx := errgroup.WithContext(ctx)

Expand All @@ -346,7 +342,7 @@ func (s *server) run(ctx context.Context) (err error) {
})

eg.Go(func() error {
return s.etcdHealthChecker(egCtx)
return s.upstreamPDHealthChecker(egCtx)
})

eg.Go(func() error {
Expand Down Expand Up @@ -401,6 +397,10 @@ func (s *server) Close() {
}
s.tcpServer = nil
}

if s.pdClient != nil {
s.pdClient.Close()
}
}

func (s *server) closeSortEngineFactory() {
Expand Down
Loading

0 comments on commit 4452688

Please sign in to comment.