Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capture(cdc), tcpserver: integrate P2P with server & capture #3932

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 95 additions & 8 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -69,19 +70,34 @@ type Capture struct {

tableActorSystem *system.System

// MessageServer is the receiver of the messages from the other nodes.
// It should be recreated each time the capture is restarted.
MessageServer *p2p.MessageServer

// MessageRouter manages the clients to send messages to all peers.
MessageRouter p2p.MessageRouter

// grpcService is a wrapper that can hold a MessageServer.
// The instance should last for the whole life of the server,
// regardless of server restarting.
// This design is to solve the problem that grpc-go cannot gracefully
// unregister a service.
grpcService *p2p.ServerWrapper

cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func(pd.Client) *owner.Owner
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient) *Capture {
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture {
return &Capture{
pdClient: pdClient,
kvStorage: kvStorage,
etcdClient: etcdClient,
cancel: func() {},
pdClient: pdClient,
kvStorage: kvStorage,
etcdClient: etcdClient,
grpcService: grpcService,
cancel: func() {},

newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
Expand Down Expand Up @@ -159,11 +175,32 @@ func (c *Capture) reset(ctx context.Context) error {
if c.grpcPool != nil {
c.grpcPool.Close()
}

if config.SchedulerV2Enabled {
c.grpcService.Reset(nil)

if c.MessageRouter != nil {
c.MessageRouter.Close()
c.MessageRouter.Wait()
Comment on lines +183 to +184
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wait in Close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separating Close and Wait in the design of the interface is for two reasons:

  1. To provide a finer-grained control over the timing of execution to achieve better unit-test coverage.
  2. To avoid introducing deadlock bugs accidentally in the future. The pattern of close-wait-close-wait instead of close-close-wait-wait has created deadlock bugs before.

c.MessageRouter = nil
}
}

c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.pdClient)

if config.SchedulerV2Enabled {
messageServerConfig := conf.Debug.Messages.ToMessageServerConfig()
c.MessageServer = p2p.NewMessageServer(c.info.ID, messageServerConfig)
c.grpcService.Reset(c.MessageServer)

messageClientConfig := conf.Debug.Messages.ToMessageClientConfig()
c.MessageRouter = p2p.NewMessageRouter(c.info.ID, conf.Security, messageClientConfig)
}

log.Info("init capture",
zap.String("capture-id", c.info.ID),
zap.String("capture-addr", c.info.AdvertiseAddr))
Expand Down Expand Up @@ -219,6 +256,8 @@ func (c *Capture) run(stdCtx context.Context) error {
TimeAcquirer: c.TimeAcquirer,
TableActorSystem: c.tableActorSystem,
SorterSystem: c.sorterSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -232,8 +271,8 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(4)
var ownerErr, processorErr error
var ownerErr, processorErr, messageServerErr error
wg.Add(1)
go func() {
defer wg.Done()
defer c.AsyncClose()
Expand All @@ -243,32 +282,59 @@ func (c *Capture) run(stdCtx context.Context) error {
ownerErr = c.campaignOwner(ctx)
log.Info("the owner routine has exited", zap.Error(ownerErr))
}()
wg.Add(1)
go func() {
defer wg.Done()
defer c.AsyncClose()
conf := config.GetGlobalServerConfig()
processorFlushInterval := time.Duration(conf.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState()

if config.SchedulerV2Enabled {
globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
})
globalState.SetOnCaptureRemoved(func(captureID model.CaptureID) {
c.MessageRouter.RemovePeer(captureID)
})
}

// when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, orchestrator.NewGlobalState(), processorFlushInterval)
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
}()
if config.SchedulerV2Enabled {
wg.Add(1)
go func() {
defer wg.Done()
defer c.AsyncClose()
defer c.grpcService.Reset(nil)
messageServerErr = c.MessageServer.Run(ctx)
}()
}
wg.Wait()
if ownerErr != nil {
return errors.Annotate(ownerErr, "owner exited with error")
}
if processorErr != nil {
return errors.Annotate(processorErr, "processor exited with error")
}
if messageServerErr != nil {
return errors.Annotate(messageServerErr, "message server exited with error")
}
return nil
}

Expand Down Expand Up @@ -337,6 +403,18 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {

owner := c.newOwner(c.pdClient)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState()

if config.SchedulerV2Enabled {
globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
})
globalState.SetOnCaptureRemoved(func(captureID model.CaptureID) {
c.MessageRouter.RemovePeer(captureID)
})
}

err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval)
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
Expand Down Expand Up @@ -461,6 +539,15 @@ func (c *Capture) AsyncClose() {
}
c.sorterSystem = nil
}
if config.SchedulerV2Enabled {
c.grpcService.Reset(nil)

if c.MessageRouter != nil {
c.MessageRouter.Close()
c.MessageRouter.Wait()
c.MessageRouter = nil
}
}
}

// WriteDebugInfo writes the debug info into writer.
Expand Down
41 changes: 11 additions & 30 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,6 @@ import (
"os"

"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
Expand All @@ -38,10 +37,16 @@ import (
"go.uber.org/zap"
)

func (s *Server) startStatusHTTP() error {
// startStatusHTTP starts the HTTP server.
// `lis` is a listener that gives us plain-text HTTP requests.
// TODO can we decouple the HTTP server from the capture server?
func (s *Server) startStatusHTTP(lis net.Listener) error {
conf := config.GetGlobalServerConfig()

// OpenAPI handling logic is injected here.
router := newRouter(capture.NewHTTPHandler(s.capture))

// Inject the legacy API handlers.
router.GET("/status", gin.WrapF(s.handleStatus))
router.GET("/debug/info", gin.WrapF(s.handleDebugInfo))
router.POST("/capture/owner/resign", gin.WrapF(s.handleResignOwner))
Expand All @@ -59,36 +64,12 @@ func (s *Server) startStatusHTTP() error {
prometheus.DefaultGatherer = registry
router.Any("/metrics", gin.WrapH(promhttp.Handler()))

// if CertAllowedCN was specified, we should add server's common name
// otherwise, https requests sent to non-owner capture can't be forward
// to owner
if len(conf.Security.CertAllowedCN) != 0 {
err := conf.Security.AddSelfCommonName()
if err != nil {
log.Error("status server set tls config failed", zap.Error(err))
return errors.Trace(err)
}
}

tlsConfig, err := conf.Security.ToTLSConfigWithVerify()
if err != nil {
log.Error("status server get tls config failed", zap.Error(err))
return errors.Trace(err)
}

s.statusServer = &http.Server{Addr: conf.Addr, Handler: router, TLSConfig: tlsConfig}
// No need to configure TLS because it is already handled by `s.tcpServer`.
s.statusServer = &http.Server{Handler: router}

ln, err := net.Listen("tcp", conf.Addr)
if err != nil {
return cerror.WrapError(cerror.ErrServeHTTP, err)
}
go func() {
log.Info("http server is running", zap.String("addr", conf.Addr))
if tlsConfig != nil {
err = s.statusServer.ServeTLS(ln, conf.Security.CertPath, conf.Security.KeyPath)
} else {
err = s.statusServer.Serve(ln)
}
err := s.statusServer.Serve(lis)
if err != nil && err != http.ErrServerClosed {
log.Error("http server error", zap.Error(cerror.WrapError(cerror.ErrServeHTTP, err)))
}
Expand Down
41 changes: 38 additions & 3 deletions cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/pingcap/check"
Expand Down Expand Up @@ -70,7 +71,19 @@ func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
config.StoreGlobalServerConfig(conf)
server, err := NewServer([]string{"http://127.0.0.1:2379"})
c.Assert(err, check.IsNil)
err = server.startStatusHTTP()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := server.tcpServer.Run(ctx)
c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*")
}()

err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
c.Assert(err, check.IsNil)
defer func() {
c.Assert(server.statusServer.Close(), check.IsNil)
Expand All @@ -85,6 +98,9 @@ func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
testHandleMoveTable(c)
testHandleChangefeedQuery(c)
testHandleFailpoint(c)

cancel()
wg.Wait()
}

func testPprof(c *check.C) {
Expand Down Expand Up @@ -186,7 +202,7 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) {
server, err := NewServer([]string{"https://127.0.0.1:2379"})
server.capture = capture.NewCapture4Test()
c.Assert(err, check.IsNil)
err = server.startStatusHTTP()
err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
c.Assert(err, check.IsNil)
defer func() {
c.Assert(server.statusServer.Close(), check.IsNil)
Expand All @@ -196,6 +212,14 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := server.tcpServer.Run(ctx)
c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*")
}()

// test cli sends request without a cert will success
err = retry.Do(ctx, func() error {
tr := &http.Transport{
Expand Down Expand Up @@ -236,6 +260,9 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) {
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
c.Assert(err, check.IsNil)

cancel()
wg.Wait()
}

//
Expand All @@ -254,7 +281,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
server, err := NewServer([]string{"https://127.0.0.1:2379"})
server.capture = capture.NewCapture4Test()
c.Assert(err, check.IsNil)
err = server.startStatusHTTP()
err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
c.Assert(err, check.IsNil)
defer func() {
c.Assert(server.statusServer.Close(), check.IsNil)
Expand All @@ -264,6 +291,14 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := server.tcpServer.Run(ctx)
c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*")
}()

// test cli sends request without a cert will fail
err = retry.Do(ctx, func() error {
tr := &http.Transport{
Expand Down
Loading