diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index fcbc7178566..1008c4b71fe 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -36,6 +36,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/tikv" @@ -69,6 +70,20 @@ type Capture struct { tableActorSystem *system.System + // MessageServer is the receiver of the messages from the other nodes. + // It should be recreated each time the capture is restarted. + MessageServer *p2p.MessageServer + + // MessageRouter manages the clients to send messages to all peers. + MessageRouter p2p.MessageRouter + + // grpcService is a wrapper that can hold a MessageServer. + // The instance should last for the whole life of the server, + // regardless of server restarting. + // This design is to solve the problem that grpc-go cannot gracefully + // unregister a service. + grpcService *p2p.ServerWrapper + cancel context.CancelFunc newProcessorManager func() *processor.Manager @@ -76,12 +91,13 @@ type Capture struct { } // NewCapture returns a new Capture instance -func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient) *Capture { +func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture { return &Capture{ - pdClient: pdClient, - kvStorage: kvStorage, - etcdClient: etcdClient, - cancel: func() {}, + pdClient: pdClient, + kvStorage: kvStorage, + etcdClient: etcdClient, + grpcService: grpcService, + cancel: func() {}, newProcessorManager: processor.NewManager, newOwner: owner.NewOwner, @@ -159,11 +175,32 @@ func (c *Capture) reset(ctx context.Context) error { if c.grpcPool != nil { c.grpcPool.Close() } + + if config.SchedulerV2Enabled { + c.grpcService.Reset(nil) + + if c.MessageRouter != nil { + c.MessageRouter.Close() + c.MessageRouter.Wait() + c.MessageRouter = nil + } + } + c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security) if c.regionCache != nil { c.regionCache.Close() } c.regionCache = tikv.NewRegionCache(c.pdClient) + + if config.SchedulerV2Enabled { + messageServerConfig := conf.Debug.Messages.ToMessageServerConfig() + c.MessageServer = p2p.NewMessageServer(c.info.ID, messageServerConfig) + c.grpcService.Reset(c.MessageServer) + + messageClientConfig := conf.Debug.Messages.ToMessageClientConfig() + c.MessageRouter = p2p.NewMessageRouter(c.info.ID, conf.Security, messageClientConfig) + } + log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr)) @@ -219,6 +256,8 @@ func (c *Capture) run(stdCtx context.Context) error { TimeAcquirer: c.TimeAcquirer, TableActorSystem: c.tableActorSystem, SorterSystem: c.sorterSystem, + MessageServer: c.MessageServer, + MessageRouter: c.MessageRouter, }) err := c.register(ctx) if err != nil { @@ -232,8 +271,8 @@ func (c *Capture) run(stdCtx context.Context) error { cancel() }() wg := new(sync.WaitGroup) - wg.Add(4) - var ownerErr, processorErr error + var ownerErr, processorErr, messageServerErr error + wg.Add(1) go func() { defer wg.Done() defer c.AsyncClose() @@ -243,25 +282,49 @@ func (c *Capture) run(stdCtx context.Context) error { ownerErr = c.campaignOwner(ctx) log.Info("the owner routine has exited", zap.Error(ownerErr)) }() + wg.Add(1) go func() { defer wg.Done() defer c.AsyncClose() conf := config.GetGlobalServerConfig() processorFlushInterval := time.Duration(conf.ProcessorFlushInterval) + + globalState := orchestrator.NewGlobalState() + + if config.SchedulerV2Enabled { + globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { + c.MessageRouter.AddPeer(captureID, addr) + }) + globalState.SetOnCaptureRemoved(func(captureID model.CaptureID) { + c.MessageRouter.RemovePeer(captureID) + }) + } + // when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors // (recoverable errors are intercepted in the processor tick) // so we should also stop the processor and let capture restart or exit - processorErr = c.runEtcdWorker(ctx, c.processorManager, orchestrator.NewGlobalState(), processorFlushInterval) + processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval) log.Info("the processor routine has exited", zap.Error(processorErr)) }() + wg.Add(1) go func() { defer wg.Done() c.TimeAcquirer.Run(ctx) }() + wg.Add(1) go func() { defer wg.Done() c.grpcPool.RecycleConn(ctx) }() + if config.SchedulerV2Enabled { + wg.Add(1) + go func() { + defer wg.Done() + defer c.AsyncClose() + defer c.grpcService.Reset(nil) + messageServerErr = c.MessageServer.Run(ctx) + }() + } wg.Wait() if ownerErr != nil { return errors.Annotate(ownerErr, "owner exited with error") @@ -269,6 +332,9 @@ func (c *Capture) run(stdCtx context.Context) error { if processorErr != nil { return errors.Annotate(processorErr, "processor exited with error") } + if messageServerErr != nil { + return errors.Annotate(messageServerErr, "message server exited with error") + } return nil } @@ -337,6 +403,18 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { owner := c.newOwner(c.pdClient) c.setOwner(owner) + + globalState := orchestrator.NewGlobalState() + + if config.SchedulerV2Enabled { + globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) { + c.MessageRouter.AddPeer(captureID, addr) + }) + globalState.SetOnCaptureRemoved(func(captureID model.CaptureID) { + c.MessageRouter.RemovePeer(captureID) + }) + } + err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval) c.setOwner(nil) log.Info("run owner exited", zap.Error(err)) @@ -461,6 +539,15 @@ func (c *Capture) AsyncClose() { } c.sorterSystem = nil } + if config.SchedulerV2Enabled { + c.grpcService.Reset(nil) + + if c.MessageRouter != nil { + c.MessageRouter.Close() + c.MessageRouter.Wait() + c.MessageRouter = nil + } + } } // WriteDebugInfo writes the debug info into writer. diff --git a/cdc/http_status.go b/cdc/http_status.go index f7e6e2d0e59..0c99a0fadc3 100644 --- a/cdc/http_status.go +++ b/cdc/http_status.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,7 +23,6 @@ import ( "os" "github.com/gin-gonic/gin" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/capture" @@ -38,10 +37,16 @@ import ( "go.uber.org/zap" ) -func (s *Server) startStatusHTTP() error { +// startStatusHTTP starts the HTTP server. +// `lis` is a listener that gives us plain-text HTTP requests. +// TODO can we decouple the HTTP server from the capture server? +func (s *Server) startStatusHTTP(lis net.Listener) error { conf := config.GetGlobalServerConfig() + + // OpenAPI handling logic is injected here. router := newRouter(capture.NewHTTPHandler(s.capture)) + // Inject the legacy API handlers. router.GET("/status", gin.WrapF(s.handleStatus)) router.GET("/debug/info", gin.WrapF(s.handleDebugInfo)) router.POST("/capture/owner/resign", gin.WrapF(s.handleResignOwner)) @@ -59,36 +64,12 @@ func (s *Server) startStatusHTTP() error { prometheus.DefaultGatherer = registry router.Any("/metrics", gin.WrapH(promhttp.Handler())) - // if CertAllowedCN was specified, we should add server's common name - // otherwise, https requests sent to non-owner capture can't be forward - // to owner - if len(conf.Security.CertAllowedCN) != 0 { - err := conf.Security.AddSelfCommonName() - if err != nil { - log.Error("status server set tls config failed", zap.Error(err)) - return errors.Trace(err) - } - } - - tlsConfig, err := conf.Security.ToTLSConfigWithVerify() - if err != nil { - log.Error("status server get tls config failed", zap.Error(err)) - return errors.Trace(err) - } - - s.statusServer = &http.Server{Addr: conf.Addr, Handler: router, TLSConfig: tlsConfig} + // No need to configure TLS because it is already handled by `s.tcpServer`. + s.statusServer = &http.Server{Handler: router} - ln, err := net.Listen("tcp", conf.Addr) - if err != nil { - return cerror.WrapError(cerror.ErrServeHTTP, err) - } go func() { log.Info("http server is running", zap.String("addr", conf.Addr)) - if tlsConfig != nil { - err = s.statusServer.ServeTLS(ln, conf.Security.CertPath, conf.Security.KeyPath) - } else { - err = s.statusServer.Serve(ln) - } + err := s.statusServer.Serve(lis) if err != nil && err != http.ErrServerClosed { log.Error("http server error", zap.Error(cerror.WrapError(cerror.ErrServeHTTP, err))) } diff --git a/cdc/http_status_test.go b/cdc/http_status_test.go index 9150a141ce1..5e2b5c7ea61 100644 --- a/cdc/http_status_test.go +++ b/cdc/http_status_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/pingcap/check" @@ -70,7 +71,19 @@ func (s *httpStatusSuite) TestHTTPStatus(c *check.C) { config.StoreGlobalServerConfig(conf) server, err := NewServer([]string{"http://127.0.0.1:2379"}) c.Assert(err, check.IsNil) - err = server.startStatusHTTP() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := server.tcpServer.Run(ctx) + c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*") + }() + + err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) c.Assert(err, check.IsNil) defer func() { c.Assert(server.statusServer.Close(), check.IsNil) @@ -85,6 +98,9 @@ func (s *httpStatusSuite) TestHTTPStatus(c *check.C) { testHandleMoveTable(c) testHandleChangefeedQuery(c) testHandleFailpoint(c) + + cancel() + wg.Wait() } func testPprof(c *check.C) { @@ -186,7 +202,7 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) { server, err := NewServer([]string{"https://127.0.0.1:2379"}) server.capture = capture.NewCapture4Test() c.Assert(err, check.IsNil) - err = server.startStatusHTTP() + err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) c.Assert(err, check.IsNil) defer func() { c.Assert(server.statusServer.Close(), check.IsNil) @@ -196,6 +212,14 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := server.tcpServer.Run(ctx) + c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*") + }() + // test cli sends request without a cert will success err = retry.Do(ctx, func() error { tr := &http.Transport{ @@ -236,6 +260,9 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) { return nil }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) c.Assert(err, check.IsNil) + + cancel() + wg.Wait() } // @@ -254,7 +281,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) { server, err := NewServer([]string{"https://127.0.0.1:2379"}) server.capture = capture.NewCapture4Test() c.Assert(err, check.IsNil) - err = server.startStatusHTTP() + err = server.startStatusHTTP(server.tcpServer.HTTP1Listener()) c.Assert(err, check.IsNil) defer func() { c.Assert(server.statusServer.Close(), check.IsNil) @@ -264,6 +291,14 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := server.tcpServer.Run(ctx) + c.Check(err, check.ErrorMatches, ".*ErrTCPServerClosed.*") + }() + // test cli sends request without a cert will fail err = retry.Do(ctx, func() error { tr := &http.Transport{ diff --git a/cdc/server.go b/cdc/server.go index 91b99f70c29..dcecb6a3e6d 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -33,8 +33,11 @@ import ( "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/fsutil" "github.com/pingcap/tiflow/pkg/httputil" + "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/tcpserver" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" + p2pProto "github.com/pingcap/tiflow/proto/p2p" "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" @@ -53,8 +56,12 @@ const ( ) // Server is the capture server +// TODO: we need to make Server more unit testable and add more test cases. +// Especially we need to decouple the HTTPServer out of Server. type Server struct { capture *capture.Capture + tcpServer tcpserver.TCPServer + grpcService *p2p.ServerWrapper statusServer *http.Server pdClient pd.Client etcdClient *etcd.CDCEtcdClient @@ -70,9 +77,31 @@ func NewServer(pdEndpoints []string) (*Server, error) { zap.Stringer("config", conf), ) + // This is to make communication between nodes possible. + // In other words, the nodes have to trust each other. + if len(conf.Security.CertAllowedCN) != 0 { + err := conf.Security.AddSelfCommonName() + if err != nil { + log.Error("status server set tls config failed", zap.Error(err)) + return nil, errors.Trace(err) + } + } + + // tcpServer is the unified frontend of the CDC server that serves + // both RESTful APIs and gRPC APIs. + // Note that we pass the TLS config to the tcpServer, so there is no need to + // configure TLS elsewhere. + tcpServer, err := tcpserver.NewTCPServer(conf.Addr, conf.Security) + if err != nil { + return nil, errors.Trace(err) + } + s := &Server{ pdEndpoints: pdEndpoints, + grpcService: p2p.NewServerWrapper(), + tcpServer: tcpServer, } + return s, nil } @@ -167,9 +196,9 @@ func (s *Server) Run(ctx context.Context) error { s.kvStorage = kvStore ctx = util.PutKVStorageInCtx(ctx, kvStore) - s.capture = capture.NewCapture(s.pdClient, s.kvStorage, s.etcdClient) + s.capture = capture.NewCapture(s.pdClient, s.kvStorage, s.etcdClient, s.grpcService) - err = s.startStatusHTTP() + err = s.startStatusHTTP(s.tcpServer.HTTP1Listener()) if err != nil { return err } @@ -241,6 +270,24 @@ func (s *Server) run(ctx context.Context) (err error) { return kv.RunWorkerPool(cctx) }) + wg.Go(func() error { + return s.tcpServer.Run(cctx) + }) + + if config.SchedulerV2Enabled { + grpcServer := grpc.NewServer() + p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService) + + wg.Go(func() error { + return grpcServer.Serve(s.tcpServer.GrpcListener()) + }) + wg.Go(func() error { + <-cctx.Done() + grpcServer.Stop() + return nil + }) + } + return wg.Wait() } @@ -256,6 +303,13 @@ func (s *Server) Close() { } s.statusServer = nil } + if s.tcpServer != nil { + err := s.tcpServer.Close() + if err != nil { + log.Error("close tcp server", zap.Error(err)) + } + s.tcpServer = nil + } } func (s *Server) initDir(ctx context.Context) error { diff --git a/pkg/tcpserver/tcp_server.go b/pkg/tcpserver/tcp_server.go index 24ce5b102ee..a3ed0937e1e 100644 --- a/pkg/tcpserver/tcp_server.go +++ b/pkg/tcpserver/tcp_server.go @@ -25,6 +25,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" "github.com/soheilhy/cmux" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -35,6 +36,8 @@ var cmuxReadTimeout = 10 * time.Second // serve both plain HTTP and gRPC at the same time. type TCPServer interface { // Run runs the TCPServer. + // For a given instance of TCPServer, Run is expected + // to be called only once. Run(ctx context.Context) error // GrpcListener returns the gRPC listener that // can be listened on by a gRPC server. @@ -43,6 +46,12 @@ type TCPServer interface { HTTP1Listener() net.Listener // IsTLSEnabled returns whether TLS has been enabled. IsTLSEnabled() bool + // Close closed the TCPServer. + // The listeners returned by GrpcListener and HTTP1Listener + // will be closed, which will force the consumers of these + // listeners to stop. This provides a reliable mechanism to + // cancel all related components. + Close() error } type tcpServerImpl struct { @@ -52,6 +61,8 @@ type tcpServerImpl struct { grpcListener net.Listener http1Listener net.Listener + isClosed atomic.Bool + credentials *security.Credential isTLSEnabled bool // read only } @@ -93,6 +104,16 @@ func NewTCPServer(address string, credentials *security.Credential) (TCPServer, // Run runs the mux. The mux has to be running to accept connections. func (s *tcpServerImpl) Run(ctx context.Context) error { + if s.isClosed.Load() { + return cerror.ErrTCPServerClosed.GenWithStackByArgs() + } + + defer func() { + s.isClosed.Store(true) + // Closing the rootListener provides a reliable way + // for telling downstream components to exit. + _ = s.rootListener.Close() + }() errg, ctx := errgroup.WithContext(ctx) errg.Go(func() error { @@ -128,6 +149,16 @@ func (s *tcpServerImpl) IsTLSEnabled() bool { return s.isTLSEnabled } +func (s *tcpServerImpl) Close() error { + if s.isClosed.Swap(true) { + // ignore double closing + return nil + } + // Closing the rootListener provides a reliable way + // for telling downstream components to exit. + return errors.Trace(s.rootListener.Close()) +} + // wrapTLSListener takes a plain Listener and security credentials, // and returns a listener that handles TLS connections. func wrapTLSListener(inner net.Listener, credentials *security.Credential) (net.Listener, error) { diff --git a/pkg/tcpserver/tcp_server_test.go b/pkg/tcpserver/tcp_server_test.go index 8e873219cc8..d4028347e5c 100644 --- a/pkg/tcpserver/tcp_server_test.go +++ b/pkg/tcpserver/tcp_server_test.go @@ -40,6 +40,10 @@ func TestTCPServerInsecureHTTP1(t *testing.T) { server, err := NewTCPServer(addr, &security.Credential{}) require.NoError(t, err) + defer func() { + err := server.Close() + require.NoError(t, err) + }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -73,6 +77,11 @@ func TestTCPServerTLSHTTP1(t *testing.T) { require.NoError(t, err) require.True(t, server.IsTLSEnabled()) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -104,6 +113,11 @@ func TestTCPServerInsecureGrpc(t *testing.T) { server, err := NewTCPServer(addr, &security.Credential{}) require.NoError(t, err) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -136,6 +150,11 @@ func TestTCPServerTLSGrpc(t *testing.T) { require.NoError(t, err) require.True(t, server.IsTLSEnabled()) + defer func() { + err := server.Close() + require.NoError(t, err) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -284,3 +303,61 @@ func testWithGrpcWorkload(ctx context.Context, t *testing.T, server TCPServer, a wg.Wait() } + +func TestTcpServerClose(t *testing.T) { + port, err := freeport.GetFreePort() + require.NoError(t, err) + addr := fmt.Sprintf("127.0.0.1:%d", port) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + server, err := NewTCPServer(addr, &security.Credential{}) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := server.Run(ctx) + require.Error(t, err) + require.Regexp(t, ".*ErrTCPServerClosed.*", err.Error()) + }() + + httpServer := &http.Server{} + http.HandleFunc("/", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(200) + _, err := writer.Write([]byte("ok")) + require.NoError(t, err) + }) + defer func() { + http.DefaultServeMux = http.NewServeMux() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := httpServer.Serve(server.HTTP1Listener()) + require.Error(t, err) + require.Regexp(t, ".*mux: server closed.*", err.Error()) + }() + + cli, err := httputil.NewClient(&security.Credential{}) + require.NoError(t, err) + + uri := fmt.Sprintf("http://%s/", addr) + resp, err := cli.Get(uri) + require.NoError(t, err) + defer func() { + _ = resp.Body.Close() + }() + require.Equal(t, 200, resp.StatusCode) + + // Close should be idempotent. + for i := 0; i < 3; i++ { + err := server.Close() + require.NoError(t, err) + } + + wg.Wait() +}