-
Notifications
You must be signed in to change notification settings - Fork 721
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
mcs: add scheduling microservice framework (#6542)
ref #5839 Signed-off-by: Ryan Leung <[email protected]>
- Loading branch information
Showing
8 changed files
with
883 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package apis | ||
|
||
import ( | ||
"net/http" | ||
"sync" | ||
|
||
"github.com/gin-contrib/cors" | ||
"github.com/gin-contrib/gzip" | ||
"github.com/gin-contrib/pprof" | ||
"github.com/gin-gonic/gin" | ||
"github.com/joho/godotenv" | ||
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" | ||
"github.com/tikv/pd/pkg/mcs/utils" | ||
"github.com/tikv/pd/pkg/utils/apiutil" | ||
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" | ||
"github.com/unrolled/render" | ||
) | ||
|
||
// APIPathPrefix is the prefix of the API path. | ||
const APIPathPrefix = "/scheduling/api/v1/" | ||
|
||
var ( | ||
once sync.Once | ||
apiServiceGroup = apiutil.APIServiceGroup{ | ||
Name: "scheduling", | ||
Version: "v1", | ||
IsCore: false, | ||
PathPrefix: APIPathPrefix, | ||
} | ||
) | ||
|
||
func init() { | ||
scheserver.SetUpRestHandler = func(srv *scheserver.Service) (http.Handler, apiutil.APIServiceGroup) { | ||
s := NewService(srv) | ||
return s.apiHandlerEngine, apiServiceGroup | ||
} | ||
} | ||
|
||
// Service is the tso service. | ||
type Service struct { | ||
apiHandlerEngine *gin.Engine | ||
root *gin.RouterGroup | ||
|
||
srv *scheserver.Service | ||
rd *render.Render | ||
} | ||
|
||
func createIndentRender() *render.Render { | ||
return render.New(render.Options{ | ||
IndentJSON: true, | ||
}) | ||
} | ||
|
||
// NewService returns a new Service. | ||
func NewService(srv *scheserver.Service) *Service { | ||
once.Do(func() { | ||
// These global modification will be effective only for the first invoke. | ||
_ = godotenv.Load() | ||
gin.SetMode(gin.ReleaseMode) | ||
}) | ||
apiHandlerEngine := gin.New() | ||
apiHandlerEngine.Use(gin.Recovery()) | ||
apiHandlerEngine.Use(cors.Default()) | ||
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) | ||
apiHandlerEngine.Use(func(c *gin.Context) { | ||
c.Set(multiservicesapi.ServiceContextKey, srv) | ||
c.Next() | ||
}) | ||
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) | ||
apiHandlerEngine.GET("metrics", utils.PromHandler()) | ||
pprof.Register(apiHandlerEngine) | ||
root := apiHandlerEngine.Group(APIPathPrefix) | ||
s := &Service{ | ||
srv: srv, | ||
apiHandlerEngine: apiHandlerEngine, | ||
root: root, | ||
rd: createIndentRender(), | ||
} | ||
return s | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package server | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/BurntSushi/toml" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/log" | ||
"github.com/spf13/pflag" | ||
"github.com/tikv/pd/pkg/mcs/utils" | ||
"github.com/tikv/pd/pkg/utils/configutil" | ||
"github.com/tikv/pd/pkg/utils/grpcutil" | ||
"github.com/tikv/pd/pkg/utils/metricutil" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
defaultName = "Scheduling" | ||
defaultBackendEndpoints = "http://127.0.0.1:2379" | ||
defaultListenAddr = "http://127.0.0.1:3379" | ||
) | ||
|
||
// Config is the configuration for the scheduling. | ||
type Config struct { | ||
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"` | ||
ListenAddr string `toml:"listen-addr" json:"listen-addr"` | ||
AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"` | ||
Name string `toml:"name" json:"name"` | ||
DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring | ||
EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it | ||
|
||
Metric metricutil.MetricConfig `toml:"metric" json:"metric"` | ||
|
||
// Log related config. | ||
Log log.Config `toml:"log" json:"log"` | ||
Logger *zap.Logger | ||
LogProps *log.ZapProperties | ||
|
||
Security configutil.SecurityConfig `toml:"security" json:"security"` | ||
|
||
// WarningMsgs contains all warnings during parsing. | ||
WarningMsgs []string | ||
|
||
// LeaderLease defines the time within which a Scheduling primary/leader must | ||
// update its TTL in etcd, otherwise etcd will expire the leader key and other servers | ||
// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is | ||
// second too. | ||
LeaderLease int64 `toml:"lease" json:"lease"` | ||
} | ||
|
||
// NewConfig creates a new config. | ||
func NewConfig() *Config { | ||
return &Config{} | ||
} | ||
|
||
// Parse parses flag definitions from the argument list. | ||
func (c *Config) Parse(flagSet *pflag.FlagSet) error { | ||
// Load config file if specified. | ||
var ( | ||
meta *toml.MetaData | ||
err error | ||
) | ||
if configFile, _ := flagSet.GetString("config"); configFile != "" { | ||
meta, err = configutil.ConfigFromFile(c, configFile) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// Ignore the error check here | ||
configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level") | ||
configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file") | ||
configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr") | ||
configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert") | ||
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert") | ||
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key") | ||
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints") | ||
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") | ||
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") | ||
|
||
return c.Adjust(meta, false) | ||
} | ||
|
||
// Adjust is used to adjust the scheduling configurations. | ||
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { | ||
configMetaData := configutil.NewConfigMetadata(meta) | ||
if err := configMetaData.CheckUndecoded(); err != nil { | ||
c.WarningMsgs = append(c.WarningMsgs, err.Error()) | ||
} | ||
|
||
if c.Name == "" { | ||
hostname, err := os.Hostname() | ||
if err != nil { | ||
return err | ||
} | ||
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) | ||
} | ||
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) | ||
configutil.AdjustPath(&c.DataDir) | ||
|
||
if err := c.Validate(); err != nil { | ||
return err | ||
} | ||
|
||
configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints) | ||
configutil.AdjustString(&c.ListenAddr, defaultListenAddr) | ||
configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr) | ||
|
||
if !configMetaData.IsDefined("enable-grpc-gateway") { | ||
c.EnableGRPCGateway = utils.DefaultEnableGRPCGateway | ||
} | ||
|
||
c.adjustLog(configMetaData.Child("log")) | ||
c.Security.Encryption.Adjust() | ||
|
||
if len(c.Log.Format) == 0 { | ||
c.Log.Format = utils.DefaultLogFormat | ||
} | ||
|
||
configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) | ||
|
||
return nil | ||
} | ||
|
||
func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { | ||
if !meta.IsDefined("disable-error-verbose") { | ||
c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose | ||
} | ||
} | ||
|
||
// GetTLSConfig returns the TLS config. | ||
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { | ||
return &c.Security.TLSConfig | ||
} | ||
|
||
// Validate is used to validate if some configurations are right. | ||
func (c *Config) Validate() error { | ||
dataDir, err := filepath.Abs(c.DataDir) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
logFile, err := filepath.Abs(c.Log.File.Filename) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile)) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
if !strings.HasPrefix(rel, "..") { | ||
return errors.New("log directory shouldn't be the subdirectory of data directory") | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package server | ||
|
||
import ( | ||
"net/http" | ||
|
||
"github.com/pingcap/log" | ||
bs "github.com/tikv/pd/pkg/basicserver" | ||
"github.com/tikv/pd/pkg/mcs/registry" | ||
"github.com/tikv/pd/pkg/utils/apiutil" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
// gRPC errors | ||
var ( | ||
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") | ||
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") | ||
) | ||
|
||
// SetUpRestHandler is a hook to sets up the REST service. | ||
var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) { | ||
return dummyRestService{}, apiutil.APIServiceGroup{} | ||
} | ||
|
||
type dummyRestService struct{} | ||
|
||
func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusNotImplemented) | ||
w.Write([]byte("not implemented")) | ||
} | ||
|
||
// Service is the scheduling grpc service. | ||
type Service struct { | ||
*Server | ||
} | ||
|
||
// NewService creates a new TSO service. | ||
func NewService(svr bs.Server) registry.RegistrableService { | ||
server, ok := svr.(*Server) | ||
if !ok { | ||
log.Fatal("create scheduling server failed") | ||
} | ||
return &Service{ | ||
Server: server, | ||
} | ||
} | ||
|
||
// RegisterGRPCService registers the service to gRPC server. | ||
func (s *Service) RegisterGRPCService(g *grpc.Server) { | ||
} | ||
|
||
// RegisterRESTHandler registers the service to REST server. | ||
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { | ||
handler, group := SetUpRestHandler(s) | ||
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package server | ||
|
||
import "github.com/prometheus/client_golang/prometheus" | ||
|
||
const ( | ||
namespace = "scheduling" | ||
serverSubsystem = "server" | ||
) | ||
|
||
var ( | ||
// Meta & Server info. | ||
serverInfo = prometheus.NewGaugeVec( | ||
prometheus.GaugeOpts{ | ||
Namespace: namespace, | ||
Subsystem: serverSubsystem, | ||
Name: "info", | ||
Help: "Indicate the scheduling server info, and the value is the start timestamp (s).", | ||
}, []string{"version", "hash"}) | ||
) | ||
|
||
func init() { | ||
prometheus.MustRegister(serverInfo) | ||
} |
Oops, something went wrong.