diff --git a/src/backend/booster/bk_dist/booster/command/command.go b/src/backend/booster/bk_dist/booster/command/command.go index b59edd95..f4d99e2e 100644 --- a/src/backend/booster/bk_dist/booster/command/command.go +++ b/src/backend/booster/bk_dist/booster/command/command.go @@ -105,6 +105,7 @@ const ( FlagEnableLib = "enable_lib" FlagLongTCP = "long_tcp" FlagUseDefaultWorker = "use_default_worker" + FlagDynamicPort = "dynamic_port" EnvBuildIDOld = "TURBO_PLAN_BUILD_ID" EnvBuildID = "TBS_BUILD_ID" @@ -442,6 +443,10 @@ var ( Name: "use_default_worker", Usage: "use default worker if worker id is empty", }, + commandCli.BoolFlag{ + Name: "dynamic_port", + Usage: "controller will listen dynamic port if true", + }, } ) diff --git a/src/backend/booster/bk_dist/booster/command/process.go b/src/backend/booster/bk_dist/booster/command/process.go index 0e966c1c..49727fc5 100644 --- a/src/backend/booster/bk_dist/booster/command/process.go +++ b/src/backend/booster/bk_dist/booster/command/process.go @@ -236,6 +236,12 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) { remoteRetryTimes = c.Int(FlagRemoteRetryTimes) } + withDynamicPort := c.Bool(FlagDynamicPort) + // bazel 暂时不支持动态端口,没办法解决参数变化的问题 + if c.Bool(FlagBazel) || c.Bool(FlagBazelPlus) || c.Bool(FlagBazel4Plus) || c.Bool(FlagBazelNoLauncher) { + withDynamicPort = false + } + // generate a new booster. cmdConfig := dcType.BoosterConfig{ Type: dcType.GetBoosterType(bt), @@ -348,6 +354,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) { EnableLib: c.Bool(FlagEnableLib), LongTCP: c.Bool(FlagLongTCP), UseDefaultWorker: c.Bool(FlagUseDefaultWorker) || c.Bool(FlagBazelNoLauncher), + DynamicPort: withDynamicPort, }, } diff --git a/src/backend/booster/bk_dist/booster/pkg/booster.go b/src/backend/booster/bk_dist/booster/pkg/booster.go index 6eeb4e58..c1ce3d18 100644 --- a/src/backend/booster/bk_dist/booster/pkg/booster.go +++ b/src/backend/booster/bk_dist/booster/pkg/booster.go @@ -268,6 +268,7 @@ func (b *Booster) getWorkersEnv() map[string]string { if b.work != nil { requiredEnv[env.KeyExecutorControllerWorkID] = b.workID } + for k, v := range dcSDK.GetControllerConfigToEnv(b.config.Controller) { requiredEnv[k] = v } @@ -545,12 +546,16 @@ func (b *Booster) registerWork() error { } blog.Debugf("booster: try to find controller or launch it") - pid, err := b.controller.EnsureServer() + pid, port, err := b.controller.EnsureServer() if err != nil { blog.Errorf("booster: ensure controller failed: %v", err) return err } - blog.Infof("booster: success to connect to controller: %s", b.config.Controller.Target()) + + blog.Infof("booster: success to connect to controller: %s, real port[%d]", b.config.Controller.Target(), port) + os.Setenv(env.GetEnvKey(env.KeyExecutorControllerPort), strconv.Itoa(port)) + blog.Infof("booster: set env %s=%d]", env.GetEnvKey(env.KeyExecutorControllerPort), port) + b.config.Controller.Port = port b.work, err = b.controller.Register(dcSDK.ControllerRegisterConfig{ BatchMode: b.config.BatchMode, diff --git a/src/backend/booster/bk_dist/common/sdk/controller.go b/src/backend/booster/bk_dist/common/sdk/controller.go index 7c953b3a..87985585 100644 --- a/src/backend/booster/bk_dist/common/sdk/controller.go +++ b/src/backend/booster/bk_dist/common/sdk/controller.go @@ -24,7 +24,8 @@ import ( // ControllerSDK describe the controller handler SDK type ControllerSDK interface { - EnsureServer() (int, error) + // support dinamic listen port,return pid,port,error + EnsureServer() (int, int, error) Register(config ControllerRegisterConfig) (ControllerWorkSDK, error) GetWork(workID string) ControllerWorkSDK SetConfig(config *CommonControllerConfig) error @@ -114,10 +115,11 @@ func GetControllerConfigToEnv(config ControllerConfig) map[string]string { // ControllerConfig describe the config of controller type ControllerConfig struct { // 需要传递给executor的信息 - NoLocal bool - Scheme string - IP string - Port int + NoLocal bool + Scheme string + IP string + Port int + DynamicPort bool // controller参数 Timeout time.Duration @@ -205,6 +207,14 @@ type ControllerWorkSettings struct { GlobalSlots bool } +// ControllerProcessInfo describe the running controller process info +type ControllerProcessInfo struct { + ProcessID int `json:"process_id"` + ListenPort int `json:"listen_port"` + Success bool `json:"success"` + Message string `json:"message"` +} + // ControllerJobStats describe a single job's stats info type ControllerJobStats struct { ID string `json:"id"` diff --git a/src/backend/booster/bk_dist/controller/config/config.go b/src/backend/booster/bk_dist/controller/config/config.go index 05751919..f77d3edc 100644 --- a/src/backend/booster/bk_dist/controller/config/config.go +++ b/src/backend/booster/bk_dist/controller/config/config.go @@ -53,6 +53,8 @@ type ServerConfig struct { LongTCP bool `json:"long_tcp" value:"false" usage:"if true, controller will connect to remote worker with long tcp connection"` UseDefaultWorker bool `json:"use_default_worker" value:"true" usage:"if true, controller will use first worker available"` + + DynamicPort bool `json:"dynamic_port" value:"false" usage:"if true, controller will listen dynamic port"` } // CertConfig configuration of Cert diff --git a/src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go b/src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go index 26376ef8..abfcc4b1 100644 --- a/src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go +++ b/src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go @@ -11,7 +11,9 @@ package v1 import ( "context" + "encoding/json" "fmt" + "io/ioutil" "net" "net/http" "os" @@ -78,7 +80,10 @@ type sdk struct { } // EnsureServer make sure the controller is running and ready to work -func (s *sdk) EnsureServer() (int, error) { +func (s *sdk) EnsureServer() (int, int, error) { + if s.config.DynamicPort { + return s.ensureServerDynamicPort() + } return s.ensureServer() } @@ -97,7 +102,138 @@ func (s *sdk) SetConfig(config *dcSDK.CommonControllerConfig) error { return s.setConfig(config) } -func (s *sdk) ensureServer() (int, error) { +var ( + ControllerProcessfile = "bk-dist-controller-process.json" + ShaderProcessfile = "bk-shader-tool-process.json" // 文件名不能改,和ue源码中保持一致 +) + +func getProcessConfig(d, f string) (string, error) { + dir := d + if dir == "" { + dir = dcUtil.GetGlobalDir() + } + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return "", err + } + return filepath.Join(dir, f), nil +} + +// save and load the controller process info +func saveProcessInfo(i *dcSDK.ControllerProcessInfo, d, f string) error { + f, err := getProcessConfig(d, f) + if err != nil { + blog.Infof("[controller]: get process config file error:%v\n", err) + return err + } + + jsonData, err := json.Marshal(i) + if err != nil { + blog.Infof("[controller]: encode [%v] to json with error:%v\n", *i, err) + return err + } + + err = ioutil.WriteFile(f, jsonData, os.ModePerm) + if err != nil { + blog.Infof("[controller]: save process config file[%f] with error:%v\n", f, err) + return err + } + + return nil +} + +func SaveControllerInfo(pid, port int, success bool, message string, d, f string) error { + i := &dcSDK.ControllerProcessInfo{ + ProcessID: pid, + ListenPort: port, + Success: success, + Message: message, + } + + return saveProcessInfo(i, d, f) +} + +func loadProcessInfo(d, f string) (*dcSDK.ControllerProcessInfo, error) { + f, err := getProcessConfig(d, f) + if err != nil { + blog.Infof("[controller]: get process config file error:%v\n", err) + return nil, err + } + + data, err := ioutil.ReadFile(f) + if err != nil { + blog.Infof("[controller]: read process config json file %s with error %v", f, err) + return nil, err + } + + var t dcSDK.ControllerProcessInfo + if err = codec.DecJSON(data, &t); err != nil { + blog.Infof("[controller]: decode json content[%s] failed: %v", string(data), err) + return nil, err + } + + return &t, nil +} + +func getListenPort(d, f string) (int, error) { + i, err := loadProcessInfo(d, f) + if err != nil { + return 0, err + } + + return i.ListenPort, nil +} + +// support dynamic listen port +func (s *sdk) ensureServerDynamicPort() (int, int, error) { + blog.Infof("[controller]: ensure bk-dist-controller with dynamic port now") + + // 0: first check + pid, port, err := s.checkServerByDynamicPort("", ControllerProcessfile) + if err == nil && pid > 0 && port > 0 { + return pid, port, err + } + + // 1. launch with --dynamic_port + err = s.launchServer() + if err != nil { + return 0, s.config.Port, err + } + + // 2. check from local config file + timeout := time.After(serverEnsureTime) + + for ; ; time.Sleep(100 * time.Millisecond) { + select { + case <-timeout: + return 0, s.config.Port, fmt.Errorf("ensure server timeout") + default: + pid, port, err := s.checkServerByDynamicPort("", ControllerProcessfile) + if err == nil && pid > 0 && port > 0 { + return pid, port, err + } + } + } +} + +func (s *sdk) checkServerByDynamicPort(d, f string) (int, int, error) { + var err error + var pid, port int + port, err = getListenPort(d, f) + if err == nil && port > 0 { + s.config.Port = port + pid, err = s.checkServerByPort() + if err == nil && pid > 0 { + return pid, port, nil + } + } + + return 0, 0, err +} + +// return pid,port,error +func (s *sdk) ensureServer() (int, int, error) { + blog.Infof("[controller]: ensure bk-dist-controller now") + launched := false var launchedtime int64 timeout := time.After(serverEnsureTime) @@ -105,13 +241,13 @@ func (s *sdk) ensureServer() (int, error) { for ; ; time.Sleep(100 * time.Millisecond) { select { case <-timeout: - return 0, fmt.Errorf("ensure server timeout") + return 0, s.config.Port, fmt.Errorf("ensure server timeout") default: - pid, err := s.checkServer(launchedtime) + pid, err := s.checkServer(launchedtime, 30) switch err { case nil: - return pid, nil + return pid, s.config.Port, nil case dcSDK.ErrControllerNotReady: if launched { @@ -120,7 +256,7 @@ func (s *sdk) ensureServer() (int, error) { err = s.launchServer() if err != nil { - return 0, err + return 0, s.config.Port, err } launched = true launchedtime = time.Now().Unix() @@ -129,33 +265,39 @@ func (s *sdk) ensureServer() (int, error) { case dcSDK.ErrControllerKilled: err = s.launchServer() if err != nil { - return 0, err + return 0, s.config.Port, err } launched = true launchedtime = time.Now().Unix() continue default: - return 0, err + return 0, s.config.Port, err } } } } -func (s *sdk) checkServer(launchedtime int64) (int, error) { +func (s *sdk) checkServer(launchedtime int64, waitsecs int64) (int, error) { blog.Infof("sdk: check server...") // ProcessExistTimeoutAndKill(windows.EnumProcesses) maybe block, call it carefully - // kill launched process if not succeed after 30 seconds + // kill launched process if not succeed after waitsecs seconds if launchedtime > 0 { nowsecs := time.Now().Unix() - if nowsecs-launchedtime > 30 { + if nowsecs-launchedtime > waitsecs { blog.Infof("sdk: try kill existed server for unavailable after long time") _ = dcUtil.ProcessExistTimeoutAndKill(controllerTarget(dcSDK.ControllerBinary), 1*time.Minute) return 0, dcSDK.ErrControllerKilled } } + return s.checkServerByPort() +} + +func (s *sdk) checkServerByPort() (int, error) { + blog.Infof("sdk: check server by port with target:%s", s.config.Target()) + // check tcp port conn, err := net.DialTimeout("tcp", s.config.Target(), 1*time.Second) if err != nil { @@ -258,6 +400,12 @@ func (s *sdk) launchServer() error { useDefaultWorker = "" } + dynamicPort := "" + if s.config.DynamicPort { + dynamicPort = "--dynamic_port" + s.config.Port = 0 + } + return dcSyscall.RunServer(fmt.Sprintf("%s%s -a=%s -p=%d --log-dir=%s --v=%d --local_slots=%d "+ "--local_pre_slots=%d --local_exe_slots=%d --local_post_slots=%d --async_flush %s --remain_time=%d "+ "--use_local_cpu_percent=%d %s"+ @@ -267,7 +415,8 @@ func (s *sdk) launchServer() error { " --net_error_limit=%d"+ " --remote_retry_times=%d"+ " %s %s"+ - " %s %s", + " %s %s"+ + " %s", sudo, ctrlPath, s.config.IP, @@ -292,6 +441,7 @@ func (s *sdk) launchServer() error { enablelink, longTCP, useDefaultWorker, + dynamicPort, )) } diff --git a/src/backend/booster/bk_dist/controller/pkg/server.go b/src/backend/booster/bk_dist/controller/pkg/server.go index f792515a..efb05026 100644 --- a/src/backend/booster/bk_dist/controller/pkg/server.go +++ b/src/backend/booster/bk_dist/controller/pkg/server.go @@ -10,16 +10,20 @@ package pkg import ( + "fmt" + "net" + "net/http" "os" "path/filepath" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/flock" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/config" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/api" // 初始化api资源 - _ "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/api/v1" + v1 "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/api/v1" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/dashboard" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/manager" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/types" @@ -28,7 +32,9 @@ import ( ) var ( - lockfile = "bk-dist-controller.lock" + lockfile = "bk-dist-controller.lock" + listenPortBlackList = []int{30118} // 先写死,后面如果有需要,考虑改成可配置的 + maxTryListenTimes = 10 ) func getLockFile() (string, error) { @@ -46,6 +52,17 @@ func lock() bool { return false } blog.Infof("[controller]: ready lock file: %s\n", f) + + if !file.Stat(f).Exist() { + newf, err := os.OpenFile(f, os.O_CREATE, os.ModePerm) + if err != nil { + blog.Infof("[controller]: create file: %s failed with error:%v", f, err) + } else { + blog.Infof("[controller]: created lock file: %s", f) + newf.Close() + } + } + flag, err := flock.TryLock(f) if err != nil { blog.Errorf("[controller]: failed to start with error:%v\n", err) @@ -75,7 +92,11 @@ func NewServer(conf *config.ServerConfig) (*Server, error) { s := &Server{conf: conf} // Http server - s.httpServer = httpserver.NewHTTPServer(s.conf.Port, s.conf.Address, "") + port := s.conf.Port + if s.conf.DynamicPort { + port = 0 + } + s.httpServer = httpserver.NewHTTPServer(port, s.conf.Address, "") if s.conf.ServerCert.IsSSL { s.httpServer.SetSSL( s.conf.ServerCert.CAFile, s.conf.ServerCert.CertFile, s.conf.ServerCert.KeyFile, s.conf.ServerCert.CertPwd) @@ -84,7 +105,7 @@ func NewServer(conf *config.ServerConfig) (*Server, error) { return s, nil } -// Start : start listen +// Start : start listen and serve func (server *Server) Start() error { var err error server.manager = manager.NewMgr(server.conf) @@ -105,26 +126,76 @@ func (server *Server) Start() error { return err } + if server.conf.DynamicPort { + return server.ListenAndServeWithDynamicPort() + } + return server.httpServer.ListenAndServe() } +func (server *Server) ListenAndServeWithDynamicPort() error { + srv, ln, port, err := server.tryListen() + if err != nil || srv == nil || ln == nil || port <= 0 { + v1.SaveControllerInfo(0, 0, false, err.Error(), "", v1.ControllerProcessfile) + return err + } + + err = v1.SaveControllerInfo(os.Getpid(), port, true, "", "", v1.ControllerProcessfile) + if err != nil { + blog.Errorf("[controller]: save process info failed with error: %v", err) + return err + } + + return server.httpServer.Serve(srv, ln) +} + +func (server *Server) tryListen() (*http.Server, net.Listener, int, error) { + for i := 0; i < maxTryListenTimes; i++ { + srv, ln, err := server.httpServer.Listen() + if err != nil { + blog.Errorf("[controller]: listen failed with error: %v", err) + continue + } + + port := ln.Addr().(*net.TCPAddr).Port + blog.Infof("[controller]: listen with port %d", port) + + inBlack := false + for _, p := range listenPortBlackList { + if p == port { + inBlack = true + break + } + } + + if inBlack { + blog.Infof("[controller]: port %d in blacklist, ignore this port", port) + ln.Close() + continue + } + + return srv, ln, port, nil + } + + blog.Errorf("[controller]: listen failed after %d try!!", maxTryListenTimes) + return nil, nil, 0, fmt.Errorf("bind listen port failed") +} + // Run brings up the server func Run(conf *config.ServerConfig) error { - // if !conf.DisableFileLock { - // if !lock() { - // return types.ErrFileLock - // } - // defer unlock() - // } - - // if err := common.SavePid(conf.ProcessConfig); err != nil { - // blog.Errorf("save pid failed: %v", err) - // return err - // } + // avoid lauched multiple instances with diffrent ports + if conf.DynamicPort { + if !lock() { + return types.ErrFileLock + } + defer unlock() + + conf.Port = 0 + } server, err := NewServer(conf) if err != nil { - blog.Errorf("init server failed: %v", err) + blog.Errorf("[controller]: init server failed: %v", err) return err } diff --git a/src/backend/booster/bk_dist/shadertool/command/command.go b/src/backend/booster/bk_dist/shadertool/command/command.go index 58f055b8..aca66a6e 100644 --- a/src/backend/booster/bk_dist/shadertool/command/command.go +++ b/src/backend/booster/bk_dist/shadertool/command/command.go @@ -20,14 +20,15 @@ import ( // define const vars const ( - FlagLog = "log" - FlagLogDir = "log_dir" - FlagToolDir = "tool_dir" - FlagJobDir = "job_dir" - FlagJobJSONPrefix = "job_json_prefix" - FlagJobStartIndex = "job_start_index" - FlagCommitSuicide = "commit_suicide" - FlagPort = "port" + FlagLog = "log" + FlagLogDir = "log_dir" + FlagToolDir = "tool_dir" + FlagJobDir = "job_dir" + FlagJobJSONPrefix = "job_json_prefix" + FlagJobStartIndex = "job_start_index" + FlagCommitSuicide = "commit_suicide" + FlagPort = "port" + FlagProcessInfoFile = "process_info_file" ) // Run main entrance @@ -83,6 +84,10 @@ func GetApp(ct ClientType) *commandCli.App { Name: "port", Usage: "port to listen", }, + commandCli.StringFlag{ + Name: "process_info_file", + Usage: "full path of this process info", + }, } switch ct { diff --git a/src/backend/booster/bk_dist/shadertool/command/process.go b/src/backend/booster/bk_dist/shadertool/command/process.go index 787547d3..859d0fc2 100644 --- a/src/backend/booster/bk_dist/shadertool/command/process.go +++ b/src/backend/booster/bk_dist/shadertool/command/process.go @@ -14,9 +14,7 @@ import ( "os" "os/signal" "syscall" - "time" - "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" dcUtil "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/shadertool/common" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/shadertool/pkg" @@ -78,26 +76,15 @@ func sysSignalHandler(cancel context.CancelFunc, handle *pkg.ShaderTool) { func newCustomProcess(c *commandCli.Context) *pkg.ShaderTool { return pkg.NewShaderTool(&common.Flags{ - ToolDir: c.String(FlagToolDir), - JobDir: c.String(FlagJobDir), - JobJSONPrefix: c.String(FlagJobJSONPrefix), - JobStartIndex: int32(c.Int(FlagJobStartIndex)), - CommitSuicide: c.Bool(FlagCommitSuicide), - Port: int32(c.Int(FlagPort)), - }, sdk.ControllerConfig{ - NoLocal: false, - Scheme: ControllerScheme, - IP: ControllerIP, - Port: ControllerPort, - Timeout: 5 * time.Second, - LogDir: getLogDir(c.String(FlagLogDir)), - LogVerbosity: func() int { - // debug模式下, --v=3 - if c.String(FlagLog) == dcUtil.PrintDebug.String() { - return 3 - } - return 0 - }(), + ToolDir: c.String(FlagToolDir), + JobDir: c.String(FlagJobDir), + JobJSONPrefix: c.String(FlagJobJSONPrefix), + JobStartIndex: int32(c.Int(FlagJobStartIndex)), + CommitSuicide: c.Bool(FlagCommitSuicide), + Port: int32(c.Int(FlagPort)), + LogLevel: c.String(FlagLog), + LogDir: getLogDir(c.String(FlagLogDir)), + ProcessInfoFile: c.String(FlagProcessInfoFile), }) } diff --git a/src/backend/booster/bk_dist/shadertool/common/types.go b/src/backend/booster/bk_dist/shadertool/common/types.go index 9b5e8bb8..abaf1815 100644 --- a/src/backend/booster/bk_dist/shadertool/common/types.go +++ b/src/backend/booster/bk_dist/shadertool/common/types.go @@ -30,12 +30,15 @@ type AvailableResp struct { // Flags define flags needed by shader tool type Flags struct { - ToolDir string - JobDir string - JobJSONPrefix string - JobStartIndex int32 - CommitSuicide bool - Port int32 + ToolDir string + JobDir string + JobJSONPrefix string + JobStartIndex int32 + CommitSuicide bool + Port int32 + LogLevel string + LogDir string + ProcessInfoFile string } // Action define shader action @@ -62,6 +65,8 @@ type ApplyParameters struct { BatchMode bool `json:"batch_mode"` WorkerList []string `json:"specific_host_list"` NeedApply bool `json:"need_apply"` + ControllerDynamicPort bool `json:"controller_dynamic_port" value:"false" usage:"if true, controller will listen dynamic port"` + ShaderDynamicPort bool `json:"shader_dynamic_port" value:"false" usage:"if true, shader will listen dynamic port"` BuildID string `json:"build_id"` ShaderToolIdleRunSeconds int `json:"shader_tool_idle_run_seconds"` ControllerIdleRunSeconds int `json:"controller_idle_run_seconds" value:"120" usage:"controller remain time after there is no active work (seconds)"` diff --git a/src/backend/booster/bk_dist/shadertool/pkg/server.go b/src/backend/booster/bk_dist/shadertool/pkg/server.go new file mode 100644 index 00000000..3e1d8c52 --- /dev/null +++ b/src/backend/booster/bk_dist/shadertool/pkg/server.go @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2021 THL A29 Limited, a Tencent company. All rights reserved + * + * This source code file is licensed under the MIT License, you may obtain a copy of the License at + * + * http://opensource.org/licenses/MIT + * + */ + +package pkg + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "path/filepath" + + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/flock" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util" + v1 "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/api/v1" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/types" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog" + "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/http/httpserver" +) + +var ( + lockfile = "bk-shader-tool.lock" + listenPortBlackList = []int{30117} // 先写死,后面如果有需要,考虑改成可配置的 + maxTryListenTimes = 10 +) + +func getLockFile() (string, error) { + dir := util.GetGlobalDir() + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return "", err + } + return filepath.Join(dir, lockfile), nil +} + +func lock() bool { + f, err := getLockFile() + if err != nil { + blog.Errorf("[shadertool]: failed to start with error:%v\n", err) + return false + } + blog.Infof("[shadertool]: ready lock file: %s\n", f) + + if !file.Stat(f).Exist() { + newf, err := os.OpenFile(f, os.O_CREATE, os.ModePerm) + if err != nil { + blog.Infof("[shadertool]: create file: %s failed with error:%v", f, err) + } else { + blog.Infof("[shadertool]: created lock file: %s", f) + newf.Close() + } + } + + flag, err := flock.TryLock(f) + if err != nil { + blog.Errorf("[shadertool]: failed to start with error:%v\n", err) + return false + } + if !flag { + blog.Infof("[shadertool]: program is maybe running for lock file has been locked \n") + return false + } + + return true +} + +func unlock() { + flock.Unlock() +} + +func (h *ShaderTool) getProcessFilePath() (string, string) { + if h.flags.ProcessInfoFile != "" { + d := filepath.Dir(h.flags.ProcessInfoFile) + f := filepath.Base(h.flags.ProcessInfoFile) + return d, f + } + + return "", v1.ShaderProcessfile +} + +func (h *ShaderTool) ListenAndServeWithDynamicPort() error { + d, f := h.getProcessFilePath() + + srv, ln, port, err := h.tryListen() + if err != nil || srv == nil || ln == nil || port <= 0 { + v1.SaveControllerInfo(0, 0, false, err.Error(), d, f) + return err + } + + err = v1.SaveControllerInfo(os.Getpid(), port, true, "", d, f) + if err != nil { + blog.Errorf("[shadertool]: save process info failed with error: %v", err) + return err + } + + return h.httpserver.Serve(srv, ln) +} + +func (h *ShaderTool) tryListen() (*http.Server, net.Listener, int, error) { + for i := 0; i < maxTryListenTimes; i++ { + srv, ln, err := h.httpserver.Listen() + if err != nil { + blog.Errorf("[shadertool]: listen failed with error: %v", err) + continue + } + + port := ln.Addr().(*net.TCPAddr).Port + blog.Infof("[shadertool]: listen with port %d", port) + + inBlack := false + for _, p := range listenPortBlackList { + if p == port { + inBlack = true + break + } + } + + if inBlack { + blog.Infof("[shadertool]: port %d in blacklist, ignore this port", port) + ln.Close() + continue + } + + return srv, ln, port, nil + } + + blog.Errorf("[shadertool]: listen failed after %d try!!", maxTryListenTimes) + return nil, nil, 0, fmt.Errorf("bind listen port failed") +} + +func (h *ShaderTool) server(ctx context.Context) error { + blog.Infof("[shadertool]: server") + + withDynamicPort := h.settings.ShaderDynamicPort || h.flags.Port == 0 + + if withDynamicPort { + if !lock() { + return types.ErrFileLock + } + defer unlock() + + h.flags.Port = 0 + } + + h.httpserver = httpserver.NewHTTPServer(uint(h.flags.Port), "127.0.0.1", "") + var err error + + h.httphandle, err = NewHTTPHandle(h) + if h.httphandle == nil || err != nil { + return ErrInitHTTPHandle + } + + h.httpserver.RegisterWebServer(PathV1, nil, h.httphandle.GetActions()) + + if withDynamicPort { + return h.ListenAndServeWithDynamicPort() + } + + return h.httpserver.ListenAndServe() +} diff --git a/src/backend/booster/bk_dist/shadertool/pkg/shadertool.go b/src/backend/booster/bk_dist/shadertool/pkg/shadertool.go index f27a3601..250ca65b 100644 --- a/src/backend/booster/bk_dist/shadertool/pkg/shadertool.go +++ b/src/backend/booster/bk_dist/shadertool/pkg/shadertool.go @@ -64,19 +64,19 @@ var ( ) // NewShaderTool get a new ShaderTool -func NewShaderTool(flagsparam *common.Flags, config dcSDK.ControllerConfig) *ShaderTool { - blog.Debugf("ShaderTool: new helptool with config:%+v", config) +func NewShaderTool(flagsparam *common.Flags) *ShaderTool { + blog.Debugf("ShaderTool: new helptool with flags:%+v", *flagsparam) return &ShaderTool{ flags: flagsparam, // controller: v1.NewSDK(config), - controllerconfig: config, - actionlist: list.New(), - finishednumber: 0, - runningnumber: 0, - maxjobs: 0, - actionchan: nil, - resourcestatus: common.ResourceInit, + // controllerconfig: config, + actionlist: list.New(), + finishednumber: 0, + runningnumber: 0, + maxjobs: 0, + actionchan: nil, + resourcestatus: common.ResourceInit, // removelist: []string{}, } } @@ -86,7 +86,7 @@ type ShaderTool struct { flags *common.Flags pCtx context.Context - controllerconfig dcSDK.ControllerConfig + controllerconfig *dcSDK.ControllerConfig controller dcSDK.ControllerSDK booster *pkg.Booster executor *Executor @@ -165,6 +165,40 @@ func (h *ShaderTool) run(pCtx context.Context) (int, error) { return 0, nil } +func (h *ShaderTool) getControllerConfig() dcSDK.ControllerConfig { + if h.controllerconfig != nil { + return *h.controllerconfig + } + + h.controllerconfig = &dcSDK.ControllerConfig{ + NoLocal: false, + Scheme: common.ControllerScheme, + IP: common.ControllerIP, + Port: common.ControllerPort, + Timeout: 5 * time.Second, + LogDir: h.flags.LogDir, + LogVerbosity: func() int { + // debug模式下, --v=3 + if h.flags.LogLevel == dcUtil.PrintDebug.String() { + return 3 + } + return 0 + }(), + RemainTime: h.settings.ControllerIdleRunSeconds, + NoWait: h.settings.ControllerNoBatchWait, + SendCork: h.settings.ControllerSendCork, + SendFileMemoryLimit: h.settings.ControllerSendFileMemoryLimit, + NetErrorLimit: h.settings.ControllerNetErrorLimit, + RemoteRetryTimes: h.settings.ControllerRemoteRetryTimes, + EnableLink: h.settings.ControllerEnableLink, + EnableLib: h.settings.ControllerEnableLib, + LongTCP: h.settings.ControllerLongTCP, + DynamicPort: h.settings.ControllerDynamicPort, + } + + return *h.controllerconfig +} + func (h *ShaderTool) initsettings() error { var err error h.projectSettingFile, err = h.getProjectSettingFile() @@ -191,32 +225,31 @@ func (h *ShaderTool) initsettings() error { } os.Setenv(DevOPSProcessTreeKillKey, "true") + h.controller = v1.NewSDK(h.getControllerConfig()) + return nil } func (h *ShaderTool) launchController() error { if h.controller == nil { - h.controllerconfig.RemainTime = h.settings.ControllerIdleRunSeconds - h.controllerconfig.NoWait = h.settings.ControllerNoBatchWait - h.controllerconfig.SendCork = h.settings.ControllerSendCork - h.controllerconfig.SendFileMemoryLimit = h.settings.ControllerSendFileMemoryLimit - h.controllerconfig.NetErrorLimit = h.settings.ControllerNetErrorLimit - h.controllerconfig.RemoteRetryTimes = h.settings.ControllerRemoteRetryTimes - h.controllerconfig.EnableLink = h.settings.ControllerEnableLink - h.controllerconfig.EnableLib = h.settings.ControllerEnableLib - h.controllerconfig.LongTCP = h.settings.ControllerLongTCP - h.controller = v1.NewSDK(h.controllerconfig) + h.controller = v1.NewSDK(h.getControllerConfig()) } var err error + var port int for i := 0; i < 4; i++ { blog.Infof("ShaderTool: try launch controller for the [%d] times", i) - _, err = h.controller.EnsureServer() + _, port, err = h.controller.EnsureServer() if err != nil { blog.Warnf("ShaderTool: ensure controller failed with error: %v for the [%d] times", err, i) continue } - blog.Infof("ShaderTool: success to launch controller for the [%d] times", i) + + blog.Infof("ShaderTool: success to launch controller port[%d] for the [%d] times", port, i) + h.controllerconfig.Port = port + os.Setenv(env.GetEnvKey(env.KeyExecutorControllerPort), strconv.Itoa(port)) + blog.Infof("ShaderTool: set env %s=%d", env.GetEnvKey(env.KeyExecutorControllerPort), port) + return nil } return err @@ -324,22 +357,6 @@ func (h *ShaderTool) checkQuit(ctx context.Context) error { } } -func (h *ShaderTool) server(ctx context.Context) error { - blog.Infof("ShaderTool: server") - - h.httpserver = httpserver.NewHTTPServer(uint(h.flags.Port), "127.0.0.1", "") - var err error - - h.httphandle, err = NewHTTPHandle(h) - if h.httphandle == nil || err != nil { - return ErrInitHTTPHandle - } - - h.httpserver.RegisterWebServer(PathV1, nil, h.httphandle.GetActions()) - - return h.httpserver.ListenAndServe() -} - // execute actions got from ready queue func (h *ShaderTool) tryExecuteActions(ctx context.Context) error { if h.actionlist.Len() <= 0 { @@ -365,6 +382,11 @@ func (h *ShaderTool) tryExecuteActions(ctx context.Context) error { } } + if h.controllerconfig.DynamicPort && h.controllerconfig.Port > 0 { + os.Setenv(env.GetEnvKey(env.KeyExecutorControllerPort), strconv.Itoa(h.controllerconfig.Port)) + blog.Infof("ShaderTool: set env %s=%d]", env.GetEnvKey(env.KeyExecutorControllerPort), h.controllerconfig.Port) + } + h.executor = NewExecutor() h.executor.usewebsocket = h.usewebsocket } @@ -700,11 +722,12 @@ func (h *ShaderTool) newBooster() (*pkg.Booster, error) { }, Controller: sdk.ControllerConfig{ - NoLocal: false, - Scheme: common.ControllerScheme, - IP: common.ControllerIP, - Port: common.ControllerPort, - Timeout: 5 * time.Second, + NoLocal: false, + Scheme: common.ControllerScheme, + IP: common.ControllerIP, + Port: common.ControllerPort, + Timeout: 5 * time.Second, + DynamicPort: h.settings.ControllerDynamicPort, }, } @@ -889,7 +912,9 @@ func (h *ShaderTool) dealWorkNotFound(retcode int, retmsg string) error { if workerid.NewWorkID != "" { // update local workid with new workid env.SetEnv(env.KeyExecutorControllerWorkID, workerid.NewWorkID) - h.executor.Update() + if h.executor != nil { + h.executor.Update() + } // set tool chain with new workid h.setToolChain() diff --git a/src/backend/booster/bk_dist/ubttool/command/process.go b/src/backend/booster/bk_dist/ubttool/command/process.go index 04a0a42d..b0505551 100644 --- a/src/backend/booster/bk_dist/ubttool/command/process.go +++ b/src/backend/booster/bk_dist/ubttool/command/process.go @@ -14,10 +14,8 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/env" - "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/sdk" dcUtil "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/ubttool/common" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/ubttool/pkg" @@ -79,20 +77,8 @@ func newCustomProcess(c *commandCli.Context) *pkg.UBTTool { ActionChainFile: c.String(FlagActionJSONFile), ToolChainFile: c.String(FlagToolChainJSONFile), MostDepentFirst: c.Bool(FlagMostDependFirst), - }, sdk.ControllerConfig{ - NoLocal: false, - Scheme: ControllerScheme, - IP: ControllerIP, - Port: ControllerPort, - Timeout: 5 * time.Second, - LogDir: getLogDir(c.String(FlagLogDir)), - LogVerbosity: func() int { - // debug模式下, --v=3 - if c.String(FlagLog) == dcUtil.PrintDebug.String() { - return 3 - } - return 0 - }(), + LogLevel: c.String(FlagLog), + LogDir: getLogDir(FlagLogDir), }) } diff --git a/src/backend/booster/bk_dist/ubttool/command/types.go b/src/backend/booster/bk_dist/ubttool/command/types.go index 8dbdd234..26b1e52f 100644 --- a/src/backend/booster/bk_dist/ubttool/command/types.go +++ b/src/backend/booster/bk_dist/ubttool/command/types.go @@ -15,10 +15,6 @@ type ClientType string // define vars var ( ClientBKUBTTool ClientType = "bk-ubt-tool" - - ControllerScheme = "http" - ControllerIP = "127.0.0.1" - ControllerPort = 30117 ) // const vars diff --git a/src/backend/booster/bk_dist/ubttool/common/types.go b/src/backend/booster/bk_dist/ubttool/common/types.go index dccfb36d..db9b7602 100644 --- a/src/backend/booster/bk_dist/ubttool/common/types.go +++ b/src/backend/booster/bk_dist/ubttool/common/types.go @@ -15,11 +15,19 @@ import ( "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog" ) +var ( + ControllerScheme = "http" + ControllerIP = "127.0.0.1" + ControllerPort = 30117 +) + // Flags to desc flag of this tool type Flags struct { ActionChainFile string ToolChainFile string MostDepentFirst bool + LogLevel string + LogDir string } // Action to desc single ubt action diff --git a/src/backend/booster/bk_dist/ubttool/pkg/executor.go b/src/backend/booster/bk_dist/ubttool/pkg/executor.go index 5961f384..9a91951d 100644 --- a/src/backend/booster/bk_dist/ubttool/pkg/executor.go +++ b/src/backend/booster/bk_dist/ubttool/pkg/executor.go @@ -45,6 +45,10 @@ type Executor struct { // NewExecutor return new Executor func NewExecutor() *Executor { + // for debug by tomtian + c := dcSDK.GetControllerConfigFromEnv() + blog.Debugf("ubtexecutor: got config [%+v],env:%+v", c, os.Environ()) + return &Executor{ bt: dcTypes.GetBoosterType(env.GetEnv(env.BoosterType)), work: v1.NewSDK(dcSDK.GetControllerConfigFromEnv()).GetWork(env.GetEnv(env.KeyExecutorControllerWorkID)), diff --git a/src/backend/booster/bk_dist/ubttool/pkg/ubttool.go b/src/backend/booster/bk_dist/ubttool/pkg/ubttool.go index 892a2819..65b582b6 100644 --- a/src/backend/booster/bk_dist/ubttool/pkg/ubttool.go +++ b/src/backend/booster/bk_dist/ubttool/pkg/ubttool.go @@ -30,6 +30,8 @@ import ( dcUtil "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/api" v1 "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/controller/pkg/api/v1" + + // "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/ubttool/command" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/ubttool/common" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog" "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/codec" @@ -50,12 +52,14 @@ const ( ) // NewUBTTool get a new UBTTool -func NewUBTTool(flagsparam *common.Flags, config dcSDK.ControllerConfig) *UBTTool { - blog.Infof("UBTTool: new ubt tool with config:%+v,flags:%+v", config, *flagsparam) +// func NewUBTTool(flagsparam *common.Flags, config dcSDK.ControllerConfig) *UBTTool { +func NewUBTTool(flagsparam *common.Flags) *UBTTool { + // blog.Infof("UBTTool: new ubt tool with config:%+v,flags:%+v", config, *flagsparam) + blog.Infof("UBTTool: new ubt tool with flags:%+v", *flagsparam) return &UBTTool{ - flags: flagsparam, - controller: v1.NewSDK(config), + flags: flagsparam, + // controller: v1.NewSDK(config), allactions: []common.Action{}, readyactions: []common.Action{}, finishednumber: 0, @@ -63,7 +67,7 @@ func NewUBTTool(flagsparam *common.Flags, config dcSDK.ControllerConfig) *UBTToo maxjobs: 0, finished: false, actionchan: nil, - executor: NewExecutor(), + // executor: NewExecutor(), moduleselected: make(map[string]int, 0), } } @@ -116,12 +120,20 @@ func (h *UBTTool) run(pCtx context.Context) (int, error) { } blog.Infof("UBTTool: try to find controller or launch it") - _, err = h.controller.EnsureServer() + // support dinamic listen port + var port int + _, port, err = h.controller.EnsureServer() if err != nil { blog.Errorf("UBTTool: ensure controller failed: %v", err) return 1, err } - blog.Infof("UBTTool: success to connect to controller") + + blog.Infof("UBTTool: success to connect to controller with port[%d]", port) + os.Setenv(env.GetEnvKey(env.KeyExecutorControllerPort), strconv.Itoa(port)) + blog.Infof("UBTTool: set env %s=%d]", env.GetEnvKey(env.KeyExecutorControllerPort), port) + + // executor依赖动态端口 + h.executor = NewExecutor() if !h.executor.Valid() { blog.Errorf("UBTTool: ensure controller failed: %v", ErrorInvalidWorkID) @@ -559,7 +571,35 @@ func (h *UBTTool) dump() { blog.Infof("UBTTool: -------------------dump end-----------------------") } -// ---------------------------------to support set tool chain---------------------------------------------------------- +//---------------------------------to support set tool chain---------------------------------------------------------- +func (h *UBTTool) getControllerConfig() dcSDK.ControllerConfig { + return dcSDK.ControllerConfig{ + NoLocal: false, + Scheme: common.ControllerScheme, + IP: common.ControllerIP, + Port: common.ControllerPort, + Timeout: 5 * time.Second, + LogDir: h.flags.LogDir, + LogVerbosity: func() int { + // debug模式下, --v=3 + if h.flags.LogLevel == dcUtil.PrintDebug.String() { + return 3 + } + return 0 + }(), + RemainTime: h.settings.ControllerIdleRunSeconds, + NoWait: h.settings.ControllerNoBatchWait, + SendCork: h.settings.ControllerSendCork, + SendFileMemoryLimit: h.settings.ControllerSendFileMemoryLimit, + NetErrorLimit: h.settings.ControllerNetErrorLimit, + RemoteRetryTimes: h.settings.ControllerRemoteRetryTimes, + EnableLink: h.settings.ControllerEnableLink, + EnableLib: h.settings.ControllerEnableLib, + LongTCP: h.settings.ControllerLongTCP, + DynamicPort: h.settings.ControllerDynamicPort, + } +} + func (h *UBTTool) initsettings() error { var err error h.projectSettingFile, err = h.getProjectSettingFile() @@ -586,6 +626,8 @@ func (h *UBTTool) initsettings() error { } os.Setenv(DevOPSProcessTreeKillKey, "true") + h.controller = v1.NewSDK(h.getControllerConfig()) + return nil } @@ -672,6 +714,7 @@ func (h *UBTTool) newBooster() (*pkg.Booster, error) { CommitSuicideCheckTick: 5 * time.Second, }, + // got controller listen port from local file Controller: sdk.ControllerConfig{ NoLocal: false, Scheme: shaderToolComm.ControllerScheme, diff --git a/src/backend/booster/common/http/httpserver/server.go b/src/backend/booster/common/http/httpserver/server.go index 1bf96144..4cc5bc88 100644 --- a/src/backend/booster/common/http/httpserver/server.go +++ b/src/backend/booster/common/http/httpserver/server.go @@ -160,3 +160,36 @@ func (s *HTTPServer) ListenAndServe() error { return <-chError } + +// 将 ListenAndServe 拆分成 listen和server,方便支持绑定动态端口,并获取真正的端口 +func (s *HTTPServer) Listen() (*http.Server, net.Listener, error) { + addrport := net.JoinHostPort(s.addr, strconv.FormatUint(uint64(s.port), 10)) + srv := &http.Server{Addr: addrport, Handler: s.webContainer} + ln, err := net.Listen("tcp", srv.Addr) + if err != nil { + return nil, nil, err + } + + return srv, ln, nil +} + +func (s *HTTPServer) Serve(srv *http.Server, ln net.Listener) error { + var chError = make(chan error) + go func() { + if s.isSSL { + defer ln.Close() + tlsConf, err := ssl.ServerTSLConf(s.caFile, s.certFile, s.keyFile, s.certPasswd) + if err != nil { + blog.Error("fail to load certfile, err:%s", err.Error()) + chError <- fmt.Errorf("fail to load certfile") + return + } + srv.TLSConfig = tlsConf + chError <- srv.ServeTLS(ln, "", "") + } else { + chError <- srv.Serve(ln) + } + }() + + return <-chError +}