Skip to content

Commit

Permalink
mcs: add scheduling microservice framework (tikv#6542)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 30, 2023
1 parent 5f72ae8 commit ad27bab
Show file tree
Hide file tree
Showing 8 changed files with 883 additions and 0 deletions.
94 changes: 94 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apis

import (
"net/http"
"sync"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
)

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1/"

var (
once sync.Once
apiServiceGroup = apiutil.APIServiceGroup{
Name: "scheduling",
Version: "v1",
IsCore: false,
PathPrefix: APIPathPrefix,
}
)

func init() {
scheserver.SetUpRestHandler = func(srv *scheserver.Service) (http.Handler, apiutil.APIServiceGroup) {
s := NewService(srv)
return s.apiHandlerEngine, apiServiceGroup
}
}

// Service is the tso service.
type Service struct {
apiHandlerEngine *gin.Engine
root *gin.RouterGroup

srv *scheserver.Service
rd *render.Render
}

func createIndentRender() *render.Render {
return render.New(render.Options{
IndentJSON: true,
})
}

// NewService returns a new Service.
func NewService(srv *scheserver.Service) *Service {
once.Do(func() {
// These global modification will be effective only for the first invoke.
_ = godotenv.Load()
gin.SetMode(gin.ReleaseMode)
})
apiHandlerEngine := gin.New()
apiHandlerEngine.Use(gin.Recovery())
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
pprof.Register(apiHandlerEngine)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
srv: srv,
apiHandlerEngine: apiHandlerEngine,
root: root,
rd: createIndentRender(),
}
return s
}
172 changes: 172 additions & 0 deletions pkg/mcs/scheduling/server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.uber.org/zap"
)

const (
defaultName = "Scheduling"
defaultBackendEndpoints = "http://127.0.0.1:2379"
defaultListenAddr = "http://127.0.0.1:3379"
)

// Config is the configuration for the scheduling.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring
EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger
LogProps *log.ZapProperties

Security configutil.SecurityConfig `toml:"security" json:"security"`

// WarningMsgs contains all warnings during parsing.
WarningMsgs []string

// LeaderLease defines the time within which a Scheduling primary/leader must
// update its TTL in etcd, otherwise etcd will expire the leader key and other servers
// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is
// second too.
LeaderLease int64 `toml:"lease" json:"lease"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
return &Config{}
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(flagSet *pflag.FlagSet) error {
// Load config file if specified.
var (
meta *toml.MetaData
err error
)
if configFile, _ := flagSet.GetString("config"); configFile != "" {
meta, err = configutil.ConfigFromFile(c, configFile)
if err != nil {
return err
}
}

// Ignore the error check here
configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level")
configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file")
configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr")
configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert")
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert")
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key")
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")

return c.Adjust(meta, false)
}

// Adjust is used to adjust the scheduling configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
c.WarningMsgs = append(c.WarningMsgs, err.Error())
}

if c.Name == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))
}
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name))
configutil.AdjustPath(&c.DataDir)

if err := c.Validate(); err != nil {
return err
}

configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints)
configutil.AdjustString(&c.ListenAddr, defaultListenAddr)
configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr)

if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = utils.DefaultEnableGRPCGateway
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = utils.DefaultLogFormat
}

configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)

return nil
}

func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose
}
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
}

// Validate is used to validate if some configurations are right.
func (c *Config) Validate() error {
dataDir, err := filepath.Abs(c.DataDir)
if err != nil {
return errors.WithStack(err)
}
logFile, err := filepath.Abs(c.Log.File.Filename)
if err != nil {
return errors.WithStack(err)
}
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile))
if err != nil {
return errors.WithStack(err)
}
if !strings.HasPrefix(rel, "..") {
return errors.New("log directory shouldn't be the subdirectory of data directory")
}

return nil
}
71 changes: 71 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"net/http"

"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/utils/apiutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// gRPC errors
var (
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched")
)

// SetUpRestHandler is a hook to sets up the REST service.
var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) {
return dummyRestService{}, apiutil.APIServiceGroup{}
}

type dummyRestService struct{}

func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("not implemented"))
}

// Service is the scheduling grpc service.
type Service struct {
*Server
}

// NewService creates a new TSO service.
func NewService(svr bs.Server) registry.RegistrableService {
server, ok := svr.(*Server)
if !ok {
log.Fatal("create scheduling server failed")
}
return &Service{
Server: server,
}
}

// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
handler, group := SetUpRestHandler(s)
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}
37 changes: 37 additions & 0 deletions pkg/mcs/scheduling/server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import "github.com/prometheus/client_golang/prometheus"

const (
namespace = "scheduling"
serverSubsystem = "server"
)

var (
// Meta & Server info.
serverInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: "info",
Help: "Indicate the scheduling server info, and the value is the start timestamp (s).",
}, []string{"version", "hash"})
)

func init() {
prometheus.MustRegister(serverInfo)
}
Loading

0 comments on commit ad27bab

Please sign in to comment.