From ad27babbc0c21f54a8b8c14cb43df19aabefb70b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 12 Jul 2023 17:56:13 +0800 Subject: [PATCH] mcs: add scheduling microservice framework (#6542) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/apis/v1/api.go | 94 ++++ pkg/mcs/scheduling/server/config.go | 172 ++++++++ pkg/mcs/scheduling/server/grpc_service.go | 71 +++ pkg/mcs/scheduling/server/metrics.go | 37 ++ pkg/mcs/scheduling/server/server.go | 500 ++++++++++++++++++++++ pkg/mcs/utils/constant.go | 2 + pkg/storage/endpoint/key_path.go | 6 + server/cluster/cluster.go | 1 + 8 files changed, 883 insertions(+) create mode 100644 pkg/mcs/scheduling/server/apis/v1/api.go create mode 100644 pkg/mcs/scheduling/server/config.go create mode 100644 pkg/mcs/scheduling/server/grpc_service.go create mode 100644 pkg/mcs/scheduling/server/metrics.go create mode 100644 pkg/mcs/scheduling/server/server.go diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go new file mode 100644 index 00000000000..26eab6d9424 --- /dev/null +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -0,0 +1,94 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apis + +import ( + "net/http" + "sync" + + "github.com/gin-contrib/cors" + "github.com/gin-contrib/gzip" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "github.com/joho/godotenv" + scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/unrolled/render" +) + +// APIPathPrefix is the prefix of the API path. +const APIPathPrefix = "/scheduling/api/v1/" + +var ( + once sync.Once + apiServiceGroup = apiutil.APIServiceGroup{ + Name: "scheduling", + Version: "v1", + IsCore: false, + PathPrefix: APIPathPrefix, + } +) + +func init() { + scheserver.SetUpRestHandler = func(srv *scheserver.Service) (http.Handler, apiutil.APIServiceGroup) { + s := NewService(srv) + return s.apiHandlerEngine, apiServiceGroup + } +} + +// Service is the tso service. +type Service struct { + apiHandlerEngine *gin.Engine + root *gin.RouterGroup + + srv *scheserver.Service + rd *render.Render +} + +func createIndentRender() *render.Render { + return render.New(render.Options{ + IndentJSON: true, + }) +} + +// NewService returns a new Service. +func NewService(srv *scheserver.Service) *Service { + once.Do(func() { + // These global modification will be effective only for the first invoke. + _ = godotenv.Load() + gin.SetMode(gin.ReleaseMode) + }) + apiHandlerEngine := gin.New() + apiHandlerEngine.Use(gin.Recovery()) + apiHandlerEngine.Use(cors.Default()) + apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) + apiHandlerEngine.Use(func(c *gin.Context) { + c.Set(multiservicesapi.ServiceContextKey, srv) + c.Next() + }) + apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) + apiHandlerEngine.GET("metrics", utils.PromHandler()) + pprof.Register(apiHandlerEngine) + root := apiHandlerEngine.Group(APIPathPrefix) + s := &Service{ + srv: srv, + apiHandlerEngine: apiHandlerEngine, + root: root, + rd: createIndentRender(), + } + return s +} diff --git a/pkg/mcs/scheduling/server/config.go b/pkg/mcs/scheduling/server/config.go new file mode 100644 index 00000000000..be628f4519d --- /dev/null +++ b/pkg/mcs/scheduling/server/config.go @@ -0,0 +1,172 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/spf13/pflag" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/configutil" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "go.uber.org/zap" +) + +const ( + defaultName = "Scheduling" + defaultBackendEndpoints = "http://127.0.0.1:2379" + defaultListenAddr = "http://127.0.0.1:3379" +) + +// Config is the configuration for the scheduling. +type Config struct { + BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"` + ListenAddr string `toml:"listen-addr" json:"listen-addr"` + AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"` + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring + EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it + + Metric metricutil.MetricConfig `toml:"metric" json:"metric"` + + // Log related config. + Log log.Config `toml:"log" json:"log"` + Logger *zap.Logger + LogProps *log.ZapProperties + + Security configutil.SecurityConfig `toml:"security" json:"security"` + + // WarningMsgs contains all warnings during parsing. + WarningMsgs []string + + // LeaderLease defines the time within which a Scheduling primary/leader must + // update its TTL in etcd, otherwise etcd will expire the leader key and other servers + // can campaign the primary/leader again. Etcd only supports seconds TTL, so here is + // second too. + LeaderLease int64 `toml:"lease" json:"lease"` +} + +// NewConfig creates a new config. +func NewConfig() *Config { + return &Config{} +} + +// Parse parses flag definitions from the argument list. +func (c *Config) Parse(flagSet *pflag.FlagSet) error { + // Load config file if specified. + var ( + meta *toml.MetaData + err error + ) + if configFile, _ := flagSet.GetString("config"); configFile != "" { + meta, err = configutil.ConfigFromFile(c, configFile) + if err != nil { + return err + } + } + + // Ignore the error check here + configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level") + configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file") + configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr") + configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert") + configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert") + configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key") + configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints") + configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") + configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") + + return c.Adjust(meta, false) +} + +// Adjust is used to adjust the scheduling configurations. +func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { + configMetaData := configutil.NewConfigMetadata(meta) + if err := configMetaData.CheckUndecoded(); err != nil { + c.WarningMsgs = append(c.WarningMsgs, err.Error()) + } + + if c.Name == "" { + hostname, err := os.Hostname() + if err != nil { + return err + } + configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) + } + configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) + configutil.AdjustPath(&c.DataDir) + + if err := c.Validate(); err != nil { + return err + } + + configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints) + configutil.AdjustString(&c.ListenAddr, defaultListenAddr) + configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr) + + if !configMetaData.IsDefined("enable-grpc-gateway") { + c.EnableGRPCGateway = utils.DefaultEnableGRPCGateway + } + + c.adjustLog(configMetaData.Child("log")) + c.Security.Encryption.Adjust() + + if len(c.Log.Format) == 0 { + c.Log.Format = utils.DefaultLogFormat + } + + configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) + + return nil +} + +func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("disable-error-verbose") { + c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose + } +} + +// GetTLSConfig returns the TLS config. +func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { + return &c.Security.TLSConfig +} + +// Validate is used to validate if some configurations are right. +func (c *Config) Validate() error { + dataDir, err := filepath.Abs(c.DataDir) + if err != nil { + return errors.WithStack(err) + } + logFile, err := filepath.Abs(c.Log.File.Filename) + if err != nil { + return errors.WithStack(err) + } + rel, err := filepath.Rel(dataDir, filepath.Dir(logFile)) + if err != nil { + return errors.WithStack(err) + } + if !strings.HasPrefix(rel, "..") { + return errors.New("log directory shouldn't be the subdirectory of data directory") + } + + return nil +} diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go new file mode 100644 index 00000000000..3b0e51f1f66 --- /dev/null +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -0,0 +1,71 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + "github.com/pingcap/log" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/utils/apiutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// gRPC errors +var ( + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") +) + +// SetUpRestHandler is a hook to sets up the REST service. +var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) { + return dummyRestService{}, apiutil.APIServiceGroup{} +} + +type dummyRestService struct{} + +func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("not implemented")) +} + +// Service is the scheduling grpc service. +type Service struct { + *Server +} + +// NewService creates a new TSO service. +func NewService(svr bs.Server) registry.RegistrableService { + server, ok := svr.(*Server) + if !ok { + log.Fatal("create scheduling server failed") + } + return &Service{ + Server: server, + } +} + +// RegisterGRPCService registers the service to gRPC server. +func (s *Service) RegisterGRPCService(g *grpc.Server) { +} + +// RegisterRESTHandler registers the service to REST server. +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { + handler, group := SetUpRestHandler(s) + apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) +} diff --git a/pkg/mcs/scheduling/server/metrics.go b/pkg/mcs/scheduling/server/metrics.go new file mode 100644 index 00000000000..b3f5b7b41de --- /dev/null +++ b/pkg/mcs/scheduling/server/metrics.go @@ -0,0 +1,37 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import "github.com/prometheus/client_golang/prometheus" + +const ( + namespace = "scheduling" + serverSubsystem = "server" +) + +var ( + // Meta & Server info. + serverInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: serverSubsystem, + Name: "info", + Help: "Indicate the scheduling server info, and the value is the start timestamp (s).", + }, []string{"version", "hash"}) +) + +func init() { + prometheus.MustRegister(serverInfo) +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go new file mode 100644 index 00000000000..7b564e3433e --- /dev/null +++ b/pkg/mcs/scheduling/server/server.go @@ -0,0 +1,500 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "path" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/log" + "github.com/pingcap/sysutil" + "github.com/soheilhy/cmux" + "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/pkg/versioninfo" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/types" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// Server is the scheduling server, and it implements bs.Server. +type Server struct { + diagnosticspb.DiagnosticsServer + // Server state. 0 is not running, 1 is running. + isRunning int64 + // Server start timestamp + startTimestamp int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + cfg *Config + clusterID uint64 + name string + listenURL *url.URL + + // for the primary election of scheduling + participant *member.Participant + etcdClient *clientv3.Client + httpClient *http.Client + + muxListener net.Listener + service *Service + + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + // primaryCallbacks will be called after the server becomes leader. + primaryCallbacks []func(context.Context) + + serviceRegister *discovery.ServiceRegister +} + +// Name returns the unique etcd name for this server in etcd cluster. +func (s *Server) Name() string { + return s.name +} + +// Context returns the context. +func (s *Server) Context() context.Context { + return s.ctx +} + +// GetAddr returns the server address. +func (s *Server) GetAddr() string { + return s.cfg.ListenAddr +} + +// Run runs the Scheduling server. +func (s *Server) Run() (err error) { + if err = s.initClient(); err != nil { + return err + } + if err = s.startServer(); err != nil { + return err + } + + s.startServerLoop() + + return nil +} + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.primaryElectionLoop() +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit scheduling primary election loop") + return + } + + primary, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + log.Info("start to watch the primary", zap.Stringer("scheduling-primary", primary)) + // Watch will keep looping and never return unless the primary/leader has changed. + primary.Watch(s.serverLoopCtx) + log.Info("the scheduling primary has changed, try to re-campaign a primary") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", + zap.String("campaign-scheduling-primary-name", s.participant.Name())) + } else { + log.Error("campaign scheduling primary meets error due to etcd error", + zap.String("campaign-scheduling-primary-name", s.participant.Name()), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable Scheduling service. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the leadership, after this, Scheduling could be ready to provide service. + s.participant.KeepLeader(ctx) + log.Info("campaign scheduling primary ok", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + s.participant.EnableLeader() + log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) + + leaderTicker := time.NewTicker(utils.LeaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a primary/leader because lease has expired, the scheduling primary/leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +// Close closes the server. +func (s *Server) Close() { + if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { + // server is already closed + return + } + + log.Info("closing scheduling server ...") + s.serviceRegister.Deregister() + s.muxListener.Close() + s.serverLoopCancel() + s.serverLoopWg.Wait() + + if s.etcdClient != nil { + if err := s.etcdClient.Close(); err != nil { + log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } + + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + + log.Info("scheduling server is closed") +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return s.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return s.httpClient +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Server) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) IsServing() bool { + return !s.IsClosed() && s.participant.IsLeader() +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + return s != nil && atomic.LoadInt64(&s.isRunning) == 0 +} + +// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) +} + +func (s *Server) initClient() error { + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) + if err != nil { + return err + } + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + return err +} + +func (s *Server) startGRPCServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + gs := grpc.NewServer() + s.service.RegisterGRPCService(gs) + err := gs.Serve(l) + log.Info("gRPC server stop serving") + + // Attempt graceful stop (waits for pending RPCs), but force a stop if + // it doesn't happen in a reasonable amount of time. + done := make(chan struct{}) + go func() { + defer logutil.LogPanic() + log.Info("try to gracefully stop the server now") + gs.GracefulStop() + close(done) + }() + timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout) + defer timer.Stop() + select { + case <-done: + case <-timer.C: + log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) + gs.Stop() + } + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startHTTPServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + handler, _ := SetUpRestHandler(s.service) + hs := &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ReadHeaderTimeout: 5 * time.Second, + } + err := hs.Serve(l) + log.Info("http server stop serving") + + ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + if err := hs.Shutdown(ctx); err != nil { + log.Error("http server shutdown encountered problem", errs.ZapError(err)) + } else { + log.Info("all http(s) requests finished") + } + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startGRPCAndHTTPServers(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + mux := cmux.New(l) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + httpL := mux.Match(cmux.Any()) + + s.serverLoopWg.Add(2) + go s.startGRPCServer(grpcL) + go s.startHTTPServer(httpL) + + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stop serving", errs.ZapError(err)) + } else { + log.Fatal("mux stop serving unexpectedly", errs.ZapError(err)) + } + } +} + +// GetLeaderListenUrls gets service endpoints from the leader in election group. +func (s *Server) GetLeaderListenUrls() []string { + return s.participant.GetLeaderListenUrls() +} + +func (s *Server) startServer() (err error) { + if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { + return err + } + log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + // The independent Scheduling service still reuses PD version info since PD and Scheduling are just + // different service modes provided by the same pd-server binary + serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + + uniqueName := s.cfg.ListenAddr + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) + schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID) + s.participant = member.NewParticipant(s.etcdClient) + s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), + utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr) + + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } + if tlsConfig != nil { + s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) + } else { + s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) + } + if err != nil { + return err + } + + s.serverLoopWg.Add(1) + go s.startGRPCAndHTTPServers(s.muxListener) + + // Run callbacks + log.Info("triggering the start callback functions") + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} + serializedEntry, err := entry.Serialize() + if err != nil { + return err + } + s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + utils.SchedulingServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) + if err := s.serviceRegister.Register(); err != nil { + log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err)) + return err + } + atomic.StoreInt64(&s.isRunning, 1) + return nil +} + +// CreateServer creates the Server +func CreateServer(ctx context.Context, cfg *Config) *Server { + svr := &Server{ + DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), + startTimestamp: time.Now().Unix(), + cfg: cfg, + ctx: ctx, + } + return svr +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(cmd *cobra.Command, args []string) { + cmd.Flags().Parse(args) + cfg := NewConfig() + flagSet := cmd.Flags() + err := cfg.Parse(flagSet) + defer logutil.LogPanic() + + if err != nil { + cmd.Println(err) + return + } + + if printVersion, err := flagSet.GetBool("version"); err != nil { + cmd.Println(err) + return + } else if printVersion { + versioninfo.Print() + exit(0) + } + + // New zap logger + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err == nil { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + } else { + log.Fatal("initialize logger error", errs.ZapError(err)) + } + // Flushing any buffered log entries + defer log.Sync() + + versioninfo.Log("Scheduling") + log.Info("Scheduling config", zap.Reflect("config", cfg)) + + grpcprometheus.EnableHandlingTimeHistogram() + metricutil.Push(&cfg.Metric) + + ctx, cancel := context.WithCancel(context.Background()) + svr := CreateServer(ctx, cfg) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 26c0f369819..259b7dc166f 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -57,6 +57,8 @@ const ( TSOServiceName = "tso" // ResourceManagerServiceName is the name of resource manager server. ResourceManagerServiceName = "resource_manager" + // SchedulingServiceName is the name of scheduling server. + SchedulingServiceName = "scheduling" // KeyspaceGroupsKey is the path component of keyspace groups. KeyspaceGroupsKey = "keyspace_groups" // KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups. diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 9c16916ca5e..8b1a63b3751 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -281,6 +281,12 @@ func ResourceManagerSvcRootPath(clusterID uint64) string { return svcRootPath(clusterID, utils.ResourceManagerServiceName) } +// SchedulingSvcRootPath returns the root path of scheduling service. +// Path: /ms/{cluster_id}/scheduling +func SchedulingSvcRootPath(clusterID uint64) string { + return svcRootPath(clusterID, utils.SchedulingServiceName) +} + // TSOSvcRootPath returns the root path of tso service. // Path: /ms/{cluster_id}/tso func TSOSvcRootPath(clusterID uint64) string { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 8cdc8a14ff3..9be12f6e244 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -593,6 +593,7 @@ func (c *RaftCluster) runUpdateStoreStats() { } } +// runCoordinator runs the main scheduling loop. func (c *RaftCluster) runCoordinator() { defer logutil.LogPanic() defer c.wg.Done()