Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev tming #135

Merged
merged 7 commits into from
Oct 30, 2023
Merged
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/booster/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
FlagEnableLib = "enable_lib"
FlagLongTCP = "long_tcp"
FlagUseDefaultWorker = "use_default_worker"
FlagDynamicPort = "dynamic_port"

EnvBuildIDOld = "TURBO_PLAN_BUILD_ID"
EnvBuildID = "TBS_BUILD_ID"
Expand Down Expand Up @@ -442,6 +443,10 @@ var (
Name: "use_default_worker",
Usage: "use default worker if worker id is empty",
},
commandCli.BoolFlag{
Name: "dynamic_port",
Usage: "controller will listen dynamic port if true",
},
}
)

Expand Down
7 changes: 7 additions & 0 deletions src/backend/booster/bk_dist/booster/command/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
remoteRetryTimes = c.Int(FlagRemoteRetryTimes)
}

withDynamicPort := c.Bool(FlagDynamicPort)
// bazel 暂时不支持动态端口,没办法解决参数变化的问题
if c.Bool(FlagBazel) || c.Bool(FlagBazelPlus) || c.Bool(FlagBazel4Plus) || c.Bool(FlagBazelNoLauncher) {
withDynamicPort = false
}

// generate a new booster.
cmdConfig := dcType.BoosterConfig{
Type: dcType.GetBoosterType(bt),
Expand Down Expand Up @@ -348,6 +354,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
EnableLib: c.Bool(FlagEnableLib),
LongTCP: c.Bool(FlagLongTCP),
UseDefaultWorker: c.Bool(FlagUseDefaultWorker) || c.Bool(FlagBazelNoLauncher),
DynamicPort: withDynamicPort,
},
}

Expand Down
9 changes: 7 additions & 2 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (b *Booster) getWorkersEnv() map[string]string {
if b.work != nil {
requiredEnv[env.KeyExecutorControllerWorkID] = b.workID
}

for k, v := range dcSDK.GetControllerConfigToEnv(b.config.Controller) {
requiredEnv[k] = v
}
Expand Down Expand Up @@ -545,12 +546,16 @@ func (b *Booster) registerWork() error {
}

blog.Debugf("booster: try to find controller or launch it")
pid, err := b.controller.EnsureServer()
pid, port, err := b.controller.EnsureServer()
if err != nil {
blog.Errorf("booster: ensure controller failed: %v", err)
return err
}
blog.Infof("booster: success to connect to controller: %s", b.config.Controller.Target())

blog.Infof("booster: success to connect to controller: %s, real port[%d]", b.config.Controller.Target(), port)
os.Setenv(env.GetEnvKey(env.KeyExecutorControllerPort), strconv.Itoa(port))
blog.Infof("booster: set env %s=%d]", env.GetEnvKey(env.KeyExecutorControllerPort), port)
b.config.Controller.Port = port

b.work, err = b.controller.Register(dcSDK.ControllerRegisterConfig{
BatchMode: b.config.BatchMode,
Expand Down
20 changes: 15 additions & 5 deletions src/backend/booster/bk_dist/common/sdk/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

// ControllerSDK describe the controller handler SDK
type ControllerSDK interface {
EnsureServer() (int, error)
// support dinamic listen port,return pid,port,error
EnsureServer() (int, int, error)
Register(config ControllerRegisterConfig) (ControllerWorkSDK, error)
GetWork(workID string) ControllerWorkSDK
SetConfig(config *CommonControllerConfig) error
Expand Down Expand Up @@ -114,10 +115,11 @@ func GetControllerConfigToEnv(config ControllerConfig) map[string]string {
// ControllerConfig describe the config of controller
type ControllerConfig struct {
// 需要传递给executor的信息
NoLocal bool
Scheme string
IP string
Port int
NoLocal bool
Scheme string
IP string
Port int
DynamicPort bool

// controller参数
Timeout time.Duration
Expand Down Expand Up @@ -205,6 +207,14 @@ type ControllerWorkSettings struct {
GlobalSlots bool
}

// ControllerProcessInfo describe the running controller process info
type ControllerProcessInfo struct {
ProcessID int `json:"process_id"`
ListenPort int `json:"listen_port"`
Success bool `json:"success"`
Message string `json:"message"`
}

// ControllerJobStats describe a single job's stats info
type ControllerJobStats struct {
ID string `json:"id"`
Expand Down
2 changes: 2 additions & 0 deletions src/backend/booster/bk_dist/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type ServerConfig struct {
LongTCP bool `json:"long_tcp" value:"false" usage:"if true, controller will connect to remote worker with long tcp connection"`

UseDefaultWorker bool `json:"use_default_worker" value:"true" usage:"if true, controller will use first worker available"`

DynamicPort bool `json:"dynamic_port" value:"false" usage:"if true, controller will listen dynamic port"`
}

// CertConfig configuration of Cert
Expand Down
174 changes: 162 additions & 12 deletions src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ package v1

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -78,7 +80,10 @@ type sdk struct {
}

// EnsureServer make sure the controller is running and ready to work
func (s *sdk) EnsureServer() (int, error) {
func (s *sdk) EnsureServer() (int, int, error) {
if s.config.DynamicPort {
return s.ensureServerDynamicPort()
}
return s.ensureServer()
}

Expand All @@ -97,21 +102,152 @@ func (s *sdk) SetConfig(config *dcSDK.CommonControllerConfig) error {
return s.setConfig(config)
}

func (s *sdk) ensureServer() (int, error) {
var (
ControllerProcessfile = "bk-dist-controller-process.json"
ShaderProcessfile = "bk-shader-tool-process.json" // 文件名不能改,和ue源码中保持一致
)

func getProcessConfig(d, f string) (string, error) {
dir := d
if dir == "" {
dir = dcUtil.GetGlobalDir()
}
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return "", err
}
return filepath.Join(dir, f), nil
}

// save and load the controller process info
func saveProcessInfo(i *dcSDK.ControllerProcessInfo, d, f string) error {
f, err := getProcessConfig(d, f)
if err != nil {
blog.Infof("[controller]: get process config file error:%v\n", err)
return err
}

jsonData, err := json.Marshal(i)
if err != nil {
blog.Infof("[controller]: encode [%v] to json with error:%v\n", *i, err)
return err
}

err = ioutil.WriteFile(f, jsonData, os.ModePerm)
if err != nil {
blog.Infof("[controller]: save process config file[%f] with error:%v\n", f, err)
return err
}

return nil
}

func SaveControllerInfo(pid, port int, success bool, message string, d, f string) error {
i := &dcSDK.ControllerProcessInfo{
ProcessID: pid,
ListenPort: port,
Success: success,
Message: message,
}

return saveProcessInfo(i, d, f)
}

func loadProcessInfo(d, f string) (*dcSDK.ControllerProcessInfo, error) {
f, err := getProcessConfig(d, f)
if err != nil {
blog.Infof("[controller]: get process config file error:%v\n", err)
return nil, err
}

data, err := ioutil.ReadFile(f)
if err != nil {
blog.Infof("[controller]: read process config json file %s with error %v", f, err)
return nil, err
}

var t dcSDK.ControllerProcessInfo
if err = codec.DecJSON(data, &t); err != nil {
blog.Infof("[controller]: decode json content[%s] failed: %v", string(data), err)
return nil, err
}

return &t, nil
}

func getListenPort(d, f string) (int, error) {
i, err := loadProcessInfo(d, f)
if err != nil {
return 0, err
}

return i.ListenPort, nil
}

// support dynamic listen port
func (s *sdk) ensureServerDynamicPort() (int, int, error) {
blog.Infof("[controller]: ensure bk-dist-controller with dynamic port now")

// 0: first check
pid, port, err := s.checkServerByDynamicPort("", ControllerProcessfile)
if err == nil && pid > 0 && port > 0 {
return pid, port, err
}

// 1. launch with --dynamic_port
err = s.launchServer()
if err != nil {
return 0, s.config.Port, err
}

// 2. check from local config file
timeout := time.After(serverEnsureTime)

for ; ; time.Sleep(100 * time.Millisecond) {
select {
case <-timeout:
return 0, s.config.Port, fmt.Errorf("ensure server timeout")
default:
pid, port, err := s.checkServerByDynamicPort("", ControllerProcessfile)
if err == nil && pid > 0 && port > 0 {
return pid, port, err
}
}
}
}

func (s *sdk) checkServerByDynamicPort(d, f string) (int, int, error) {
var err error
var pid, port int
port, err = getListenPort(d, f)
if err == nil && port > 0 {
s.config.Port = port
pid, err = s.checkServerByPort()
if err == nil && pid > 0 {
return pid, port, nil
}
}

return 0, 0, err
}

// return pid,port,error
func (s *sdk) ensureServer() (int, int, error) {
blog.Infof("[controller]: ensure bk-dist-controller now")

launched := false
var launchedtime int64
timeout := time.After(serverEnsureTime)

for ; ; time.Sleep(100 * time.Millisecond) {
select {
case <-timeout:
return 0, fmt.Errorf("ensure server timeout")
return 0, s.config.Port, fmt.Errorf("ensure server timeout")
default:
pid, err := s.checkServer(launchedtime)
pid, err := s.checkServer(launchedtime, 30)

switch err {
case nil:
return pid, nil
return pid, s.config.Port, nil

case dcSDK.ErrControllerNotReady:
if launched {
Expand All @@ -120,7 +256,7 @@ func (s *sdk) ensureServer() (int, error) {

err = s.launchServer()
if err != nil {
return 0, err
return 0, s.config.Port, err
}
launched = true
launchedtime = time.Now().Unix()
Expand All @@ -129,33 +265,39 @@ func (s *sdk) ensureServer() (int, error) {
case dcSDK.ErrControllerKilled:
err = s.launchServer()
if err != nil {
return 0, err
return 0, s.config.Port, err
}
launched = true
launchedtime = time.Now().Unix()
continue

default:
return 0, err
return 0, s.config.Port, err
}
}
}
}

func (s *sdk) checkServer(launchedtime int64) (int, error) {
func (s *sdk) checkServer(launchedtime int64, waitsecs int64) (int, error) {
blog.Infof("sdk: check server...")

// ProcessExistTimeoutAndKill(windows.EnumProcesses) maybe block, call it carefully
// kill launched process if not succeed after 30 seconds
// kill launched process if not succeed after waitsecs seconds
if launchedtime > 0 {
nowsecs := time.Now().Unix()
if nowsecs-launchedtime > 30 {
if nowsecs-launchedtime > waitsecs {
blog.Infof("sdk: try kill existed server for unavailable after long time")
_ = dcUtil.ProcessExistTimeoutAndKill(controllerTarget(dcSDK.ControllerBinary), 1*time.Minute)
return 0, dcSDK.ErrControllerKilled
}
}

return s.checkServerByPort()
}

func (s *sdk) checkServerByPort() (int, error) {
blog.Infof("sdk: check server by port with target:%s", s.config.Target())

// check tcp port
conn, err := net.DialTimeout("tcp", s.config.Target(), 1*time.Second)
if err != nil {
Expand Down Expand Up @@ -258,6 +400,12 @@ func (s *sdk) launchServer() error {
useDefaultWorker = ""
}

dynamicPort := ""
if s.config.DynamicPort {
dynamicPort = "--dynamic_port"
s.config.Port = 0
}

return dcSyscall.RunServer(fmt.Sprintf("%s%s -a=%s -p=%d --log-dir=%s --v=%d --local_slots=%d "+
"--local_pre_slots=%d --local_exe_slots=%d --local_post_slots=%d --async_flush %s --remain_time=%d "+
"--use_local_cpu_percent=%d %s"+
Expand All @@ -267,7 +415,8 @@ func (s *sdk) launchServer() error {
" --net_error_limit=%d"+
" --remote_retry_times=%d"+
" %s %s"+
" %s %s",
" %s %s"+
" %s",
sudo,
ctrlPath,
s.config.IP,
Expand All @@ -292,6 +441,7 @@ func (s *sdk) launchServer() error {
enablelink,
longTCP,
useDefaultWorker,
dynamicPort,
))
}

Expand Down
Loading