Skip to content

Commit

Permalink
Merge pull request #135 from tbs60/dev_tming
Browse files Browse the repository at this point in the history
Dev tming
  • Loading branch information
tming authored Oct 30, 2023
2 parents 722a7d1 + 8ed021f commit 198cb6e
Show file tree
Hide file tree
Showing 18 changed files with 652 additions and 143 deletions.
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/booster/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
FlagEnableLib = "enable_lib"
FlagLongTCP = "long_tcp"
FlagUseDefaultWorker = "use_default_worker"
FlagDynamicPort = "dynamic_port"

EnvBuildIDOld = "TURBO_PLAN_BUILD_ID"
EnvBuildID = "TBS_BUILD_ID"
Expand Down Expand Up @@ -442,6 +443,10 @@ var (
Name: "use_default_worker",
Usage: "use default worker if worker id is empty",
},
commandCli.BoolFlag{
Name: "dynamic_port",
Usage: "controller will listen dynamic port if true",
},
}
)

Expand Down
7 changes: 7 additions & 0 deletions src/backend/booster/bk_dist/booster/command/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
remoteRetryTimes = c.Int(FlagRemoteRetryTimes)
}

withDynamicPort := c.Bool(FlagDynamicPort)
// bazel 暂时不支持动态端口,没办法解决参数变化的问题
if c.Bool(FlagBazel) || c.Bool(FlagBazelPlus) || c.Bool(FlagBazel4Plus) || c.Bool(FlagBazelNoLauncher) {
withDynamicPort = false
}

// generate a new booster.
cmdConfig := dcType.BoosterConfig{
Type: dcType.GetBoosterType(bt),
Expand Down Expand Up @@ -348,6 +354,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
EnableLib: c.Bool(FlagEnableLib),
LongTCP: c.Bool(FlagLongTCP),
UseDefaultWorker: c.Bool(FlagUseDefaultWorker) || c.Bool(FlagBazelNoLauncher),
DynamicPort: withDynamicPort,
},
}

Expand Down
9 changes: 7 additions & 2 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (b *Booster) getWorkersEnv() map[string]string {
if b.work != nil {
requiredEnv[env.KeyExecutorControllerWorkID] = b.workID
}

for k, v := range dcSDK.GetControllerConfigToEnv(b.config.Controller) {
requiredEnv[k] = v
}
Expand Down Expand Up @@ -545,12 +546,16 @@ func (b *Booster) registerWork() error {
}

blog.Debugf("booster: try to find controller or launch it")
pid, err := b.controller.EnsureServer()
pid, port, err := b.controller.EnsureServer()
if err != nil {
blog.Errorf("booster: ensure controller failed: %v", err)
return err
}
blog.Infof("booster: success to connect to controller: %s", b.config.Controller.Target())

blog.Infof("booster: success to connect to controller: %s, real port[%d]", b.config.Controller.Target(), port)
os.Setenv(env.GetEnvKey(env.KeyExecutorControllerPort), strconv.Itoa(port))
blog.Infof("booster: set env %s=%d]", env.GetEnvKey(env.KeyExecutorControllerPort), port)
b.config.Controller.Port = port

b.work, err = b.controller.Register(dcSDK.ControllerRegisterConfig{
BatchMode: b.config.BatchMode,
Expand Down
20 changes: 15 additions & 5 deletions src/backend/booster/bk_dist/common/sdk/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

// ControllerSDK describe the controller handler SDK
type ControllerSDK interface {
EnsureServer() (int, error)
// support dinamic listen port,return pid,port,error
EnsureServer() (int, int, error)
Register(config ControllerRegisterConfig) (ControllerWorkSDK, error)
GetWork(workID string) ControllerWorkSDK
SetConfig(config *CommonControllerConfig) error
Expand Down Expand Up @@ -114,10 +115,11 @@ func GetControllerConfigToEnv(config ControllerConfig) map[string]string {
// ControllerConfig describe the config of controller
type ControllerConfig struct {
// 需要传递给executor的信息
NoLocal bool
Scheme string
IP string
Port int
NoLocal bool
Scheme string
IP string
Port int
DynamicPort bool

// controller参数
Timeout time.Duration
Expand Down Expand Up @@ -205,6 +207,14 @@ type ControllerWorkSettings struct {
GlobalSlots bool
}

// ControllerProcessInfo describe the running controller process info
type ControllerProcessInfo struct {
ProcessID int `json:"process_id"`
ListenPort int `json:"listen_port"`
Success bool `json:"success"`
Message string `json:"message"`
}

// ControllerJobStats describe a single job's stats info
type ControllerJobStats struct {
ID string `json:"id"`
Expand Down
2 changes: 2 additions & 0 deletions src/backend/booster/bk_dist/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type ServerConfig struct {
LongTCP bool `json:"long_tcp" value:"false" usage:"if true, controller will connect to remote worker with long tcp connection"`

UseDefaultWorker bool `json:"use_default_worker" value:"true" usage:"if true, controller will use first worker available"`

DynamicPort bool `json:"dynamic_port" value:"false" usage:"if true, controller will listen dynamic port"`
}

// CertConfig configuration of Cert
Expand Down
174 changes: 162 additions & 12 deletions src/backend/booster/bk_dist/controller/pkg/api/v1/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ package v1

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -78,7 +80,10 @@ type sdk struct {
}

// EnsureServer make sure the controller is running and ready to work
func (s *sdk) EnsureServer() (int, error) {
func (s *sdk) EnsureServer() (int, int, error) {
if s.config.DynamicPort {
return s.ensureServerDynamicPort()
}
return s.ensureServer()
}

Expand All @@ -97,21 +102,152 @@ func (s *sdk) SetConfig(config *dcSDK.CommonControllerConfig) error {
return s.setConfig(config)
}

func (s *sdk) ensureServer() (int, error) {
var (
ControllerProcessfile = "bk-dist-controller-process.json"
ShaderProcessfile = "bk-shader-tool-process.json" // 文件名不能改,和ue源码中保持一致
)

func getProcessConfig(d, f string) (string, error) {
dir := d
if dir == "" {
dir = dcUtil.GetGlobalDir()
}
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return "", err
}
return filepath.Join(dir, f), nil
}

// save and load the controller process info
func saveProcessInfo(i *dcSDK.ControllerProcessInfo, d, f string) error {
f, err := getProcessConfig(d, f)
if err != nil {
blog.Infof("[controller]: get process config file error:%v\n", err)
return err
}

jsonData, err := json.Marshal(i)
if err != nil {
blog.Infof("[controller]: encode [%v] to json with error:%v\n", *i, err)
return err
}

err = ioutil.WriteFile(f, jsonData, os.ModePerm)
if err != nil {
blog.Infof("[controller]: save process config file[%f] with error:%v\n", f, err)
return err
}

return nil
}

func SaveControllerInfo(pid, port int, success bool, message string, d, f string) error {
i := &dcSDK.ControllerProcessInfo{
ProcessID: pid,
ListenPort: port,
Success: success,
Message: message,
}

return saveProcessInfo(i, d, f)
}

func loadProcessInfo(d, f string) (*dcSDK.ControllerProcessInfo, error) {
f, err := getProcessConfig(d, f)
if err != nil {
blog.Infof("[controller]: get process config file error:%v\n", err)
return nil, err
}

data, err := ioutil.ReadFile(f)
if err != nil {
blog.Infof("[controller]: read process config json file %s with error %v", f, err)
return nil, err
}

var t dcSDK.ControllerProcessInfo
if err = codec.DecJSON(data, &t); err != nil {
blog.Infof("[controller]: decode json content[%s] failed: %v", string(data), err)
return nil, err
}

return &t, nil
}

func getListenPort(d, f string) (int, error) {
i, err := loadProcessInfo(d, f)
if err != nil {
return 0, err
}

return i.ListenPort, nil
}

// support dynamic listen port
func (s *sdk) ensureServerDynamicPort() (int, int, error) {
blog.Infof("[controller]: ensure bk-dist-controller with dynamic port now")

// 0: first check
pid, port, err := s.checkServerByDynamicPort("", ControllerProcessfile)
if err == nil && pid > 0 && port > 0 {
return pid, port, err
}

// 1. launch with --dynamic_port
err = s.launchServer()
if err != nil {
return 0, s.config.Port, err
}

// 2. check from local config file
timeout := time.After(serverEnsureTime)

for ; ; time.Sleep(100 * time.Millisecond) {
select {
case <-timeout:
return 0, s.config.Port, fmt.Errorf("ensure server timeout")
default:
pid, port, err := s.checkServerByDynamicPort("", ControllerProcessfile)
if err == nil && pid > 0 && port > 0 {
return pid, port, err
}
}
}
}

func (s *sdk) checkServerByDynamicPort(d, f string) (int, int, error) {
var err error
var pid, port int
port, err = getListenPort(d, f)
if err == nil && port > 0 {
s.config.Port = port
pid, err = s.checkServerByPort()
if err == nil && pid > 0 {
return pid, port, nil
}
}

return 0, 0, err
}

// return pid,port,error
func (s *sdk) ensureServer() (int, int, error) {
blog.Infof("[controller]: ensure bk-dist-controller now")

launched := false
var launchedtime int64
timeout := time.After(serverEnsureTime)

for ; ; time.Sleep(100 * time.Millisecond) {
select {
case <-timeout:
return 0, fmt.Errorf("ensure server timeout")
return 0, s.config.Port, fmt.Errorf("ensure server timeout")
default:
pid, err := s.checkServer(launchedtime)
pid, err := s.checkServer(launchedtime, 30)

switch err {
case nil:
return pid, nil
return pid, s.config.Port, nil

case dcSDK.ErrControllerNotReady:
if launched {
Expand All @@ -120,7 +256,7 @@ func (s *sdk) ensureServer() (int, error) {

err = s.launchServer()
if err != nil {
return 0, err
return 0, s.config.Port, err
}
launched = true
launchedtime = time.Now().Unix()
Expand All @@ -129,33 +265,39 @@ func (s *sdk) ensureServer() (int, error) {
case dcSDK.ErrControllerKilled:
err = s.launchServer()
if err != nil {
return 0, err
return 0, s.config.Port, err
}
launched = true
launchedtime = time.Now().Unix()
continue

default:
return 0, err
return 0, s.config.Port, err
}
}
}
}

func (s *sdk) checkServer(launchedtime int64) (int, error) {
func (s *sdk) checkServer(launchedtime int64, waitsecs int64) (int, error) {
blog.Infof("sdk: check server...")

// ProcessExistTimeoutAndKill(windows.EnumProcesses) maybe block, call it carefully
// kill launched process if not succeed after 30 seconds
// kill launched process if not succeed after waitsecs seconds
if launchedtime > 0 {
nowsecs := time.Now().Unix()
if nowsecs-launchedtime > 30 {
if nowsecs-launchedtime > waitsecs {
blog.Infof("sdk: try kill existed server for unavailable after long time")
_ = dcUtil.ProcessExistTimeoutAndKill(controllerTarget(dcSDK.ControllerBinary), 1*time.Minute)
return 0, dcSDK.ErrControllerKilled
}
}

return s.checkServerByPort()
}

func (s *sdk) checkServerByPort() (int, error) {
blog.Infof("sdk: check server by port with target:%s", s.config.Target())

// check tcp port
conn, err := net.DialTimeout("tcp", s.config.Target(), 1*time.Second)
if err != nil {
Expand Down Expand Up @@ -258,6 +400,12 @@ func (s *sdk) launchServer() error {
useDefaultWorker = ""
}

dynamicPort := ""
if s.config.DynamicPort {
dynamicPort = "--dynamic_port"
s.config.Port = 0
}

return dcSyscall.RunServer(fmt.Sprintf("%s%s -a=%s -p=%d --log-dir=%s --v=%d --local_slots=%d "+
"--local_pre_slots=%d --local_exe_slots=%d --local_post_slots=%d --async_flush %s --remain_time=%d "+
"--use_local_cpu_percent=%d %s"+
Expand All @@ -267,7 +415,8 @@ func (s *sdk) launchServer() error {
" --net_error_limit=%d"+
" --remote_retry_times=%d"+
" %s %s"+
" %s %s",
" %s %s"+
" %s",
sudo,
ctrlPath,
s.config.IP,
Expand All @@ -292,6 +441,7 @@ func (s *sdk) launchServer() error {
enablelink,
longTCP,
useDefaultWorker,
dynamicPort,
))
}

Expand Down
Loading

0 comments on commit 198cb6e

Please sign in to comment.