Skip to content

Commit

Permalink
Merge pull request #5 from tbs60/dev_tming
Browse files Browse the repository at this point in the history
pump模式下文件的stat调用去重, issue: #3
  • Loading branch information
tming authored Feb 7, 2023
2 parents 1b8b2f8 + 1a772de commit fa34d78
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 41 deletions.
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/booster/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
FlagPumpCacheRemoveAll = "pump_cache_remove_all"
FlagPumpBlackList = "pump_black_list"
FlagPumpMinActionNum = "pump_min_action_num"
FlagPumpDisableStatCache = "pump_disable_stat_cache"
FlagForceLocalList = "force_local_list"
FlagNoWork = "no_work"
FlagControllerNoWait = "controller_no_wait"
Expand Down Expand Up @@ -316,6 +317,10 @@ var (
Name: "pump_min_action_num",
Usage: "do not use pump if total actions less this",
},
commandCli.BoolFlag{
Name: "pump_disable_stat_cache",
Usage: "whether disable pump depend file stat info cache, default is false",
},
commandCli.StringSliceFlag{
Name: "force_local_list, fll",
Usage: "key list which will be force executed locally",
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/booster/command/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) {
PumpCacheRemoveAll: c.Bool(FlagPumpCacheRemoveAll),
PumpBlackList: c.StringSlice(FlagPumpBlackList),
PumpMinActionNum: int32(pumpMinActionNum),
PumpDisableStatCache: c.Bool(FlagPumpDisableStatCache),
ForceLocalList: c.StringSlice(FlagForceLocalList),
NoWork: c.Bool(FlagNoWork),
WriteMemroy: c.Bool(FlagWriteMemroMemroy),
Expand Down
4 changes: 4 additions & 0 deletions src/backend/booster/bk_dist/booster/pkg/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func (b *Booster) getWorkersEnv() map[string]string {
}
requiredEnv[env.KeyExecutorPumpMinActionNum] = strconv.Itoa(int(b.config.Works.PumpMinActionNum))

if b.config.Works.PumpDisableStatCache {
requiredEnv[env.KeyExecutorPumpDisableStatCache] = envValueTrue
}

if b.config.Works.IOTimeoutSecs > 0 {
requiredEnv[env.KeyExecutorIOTimeout] = strconv.Itoa(b.config.Works.IOTimeoutSecs)
}
Expand Down
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/common/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
KeyExecutorPumpCacheSizeMaxMB = "PUMP_CACHE_SIZE_MAX_MB" // cache pump inlude files
KeyExecutorPumpBlackKeys = "PUMP_BLACK_KEYS"
KeyExecutorPumpMinActionNum = "PUMP_MIN_ACTION_NUM"
KeyExecutorPumpDisableStatCache = "PUMP_DISABLE_STAT_CACHE"
KeyExecutorForceLocalKeys = "FORCE_LOCAL_KEYS"
KeyExecutorEnvProfile = "ENV_PROFILE"
KeyExecutorWorkerSideCache = "WORKER_SIDE_CACHE"
Expand Down
8 changes: 6 additions & 2 deletions src/backend/booster/bk_dist/common/pump/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func IsPump(env *env.Sandbox) bool {
}

func SupportPump(env *env.Sandbox) bool {
// return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin")
return IsPump(env) && runtime.GOOS == "windows"
return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin")
// return IsPump(env) && runtime.GOOS == "windows"
}

func IsPumpCache(env *env.Sandbox) bool {
Expand Down Expand Up @@ -189,3 +189,7 @@ func PumpMinActionNum(env *env.Sandbox) int32 {

return 0
}

func SupportPumpStatCache(env *env.Sandbox) bool {
return env.GetEnv(dcEnv.KeyExecutorPumpDisableStatCache) == ""
}
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/common/types/booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type BoosterWorks struct {
PumpCacheRemoveAll bool
PumpBlackList []string
PumpMinActionNum int32
PumpDisableStatCache bool

ForceLocalList []string

Expand Down
62 changes: 62 additions & 0 deletions src/backend/booster/bk_dist/handler/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"os"
"path"
"strings"
"sync"

"github.com/Tencent/bk-ci/src/booster/bk_dist/common/env"
dcFile "github.com/Tencent/bk-ci/src/booster/bk_dist/common/file"
"github.com/Tencent/bk-ci/src/booster/bk_dist/common/protocol"
dcSyscall "github.com/Tencent/bk-ci/src/booster/bk_dist/common/syscall"
"github.com/Tencent/bk-ci/src/booster/bk_dist/common/types"
"github.com/Tencent/bk-ci/src/booster/common/blog"
)

// GetHandlerEnv get env by booster type
Expand Down Expand Up @@ -58,3 +60,63 @@ func GetHandlerTmpDir(sandBox *dcSyscall.Sandbox) string {

return ""
}

var (
fileInfoCacheLock sync.RWMutex
fileInfoCache = map[string]*dcFile.Info{}
)

// 支持并发read,但会有重复Stat操作,考虑并发和去重的平衡
func GetFileInfo(fs []string, mustexisted bool, notdir bool) []*dcFile.Info {
// read
fileInfoCacheLock.RLock()
notfound := []string{}
is := make([]*dcFile.Info, 0, len(fs))
for _, f := range fs {
i, ok := fileInfoCache[f]
if !ok {
notfound = append(notfound, f)
continue
}

if mustexisted && !i.Exist() {
continue
}
if notdir && i.Basic().IsDir() {
continue
}
is = append(is, i)
}
fileInfoCacheLock.RUnlock()

blog.Infof("common util: got %d file stat and %d not found", len(is), len(notfound))
if len(notfound) == 0 {
return is
}

// query
tempis := make(map[string]*dcFile.Info, len(notfound))
for _, f := range notfound {
i := dcFile.Stat(f)
tempis[f] = i

if mustexisted && !i.Exist() {
continue
}
if notdir && i.Basic().IsDir() {
continue
}
is = append(is, i)
}

// write
go func(tempis *map[string]*dcFile.Info) {
fileInfoCacheLock.Lock()
for f, i := range *tempis {
fileInfoCache[f] = i
}
fileInfoCacheLock.Unlock()
}(&tempis)

return is
}
1 change: 1 addition & 0 deletions src/backend/booster/bk_dist/handler/ue4/cc/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
ErrorNotSupportGch = fmt.Errorf("output with .gch, must be local")
ErrorNoPumpHeadFile = fmt.Errorf("pump head file not exist")
ErrorNoDependFile = fmt.Errorf("depend file not exist")
ErrorInvalidDependFile = fmt.Errorf("depend file invalid")
ErrorNotSupportRemote = fmt.Errorf("not support to remote execute")
ErrorInPumpBlack = fmt.Errorf("in pump black list")
)
50 changes: 32 additions & 18 deletions src/backend/booster/bk_dist/handler/ue4/cc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,27 @@ func (cc *TaskCC) analyzeIncludes(f string, workdir string) ([]*dcFile.Info, err
sep = "\r\n"
}
lines := strings.Split(string(data), sep)
includes := []*dcFile.Info{}
uniqlines := uniqArr(lines)
blog.Infof("cc: got %d uniq include file from file: %s", len(uniqlines), f)

for _, l := range uniqlines {
if !filepath.IsAbs(l) {
l, _ = filepath.Abs(filepath.Join(workdir, l))
}
fstat := dcFile.Stat(l)
if fstat.Exist() && !fstat.Basic().IsDir() {
includes = append(includes, fstat)
} else {
blog.Infof("cc: do not deal include file: %s in file:%s for not existed or is dir", l, f)
if dcPump.SupportPumpStatCache(cc.sandbox.Env) {
return commonUtil.GetFileInfo(uniqlines, true, true), nil
} else {
includes := []*dcFile.Info{}
for _, l := range uniqlines {
if !filepath.IsAbs(l) {
l, _ = filepath.Abs(filepath.Join(workdir, l))
}
fstat := dcFile.Stat(l)
if fstat.Exist() && !fstat.Basic().IsDir() {
includes = append(includes, fstat)
} else {
blog.Infof("cc: do not deal include file: %s in file:%s for not existed or is dir", l, f)
}
}
}

return includes, nil
return includes, nil
}
}

func (cc *TaskCC) checkFstat(f string, workdir string) (*dcFile.Info, error) {
Expand Down Expand Up @@ -300,8 +304,18 @@ func (cc *TaskCC) copyPumpHeadFile(workdir string) error {

blog.Infof("cc: copy pump head got %d uniq include file from file: %s", len(includes), cc.sourcedependfile)

if len(includes) == 0 {
blog.Warnf("cl: depend file: %s data:[%s] is invalid", cc.sourcedependfile, string(data))
return ErrorInvalidDependFile
}

for i := range includes {
includes[i] = strings.Replace(includes[i], "\\", "/", -1)
}
uniqlines := uniqArr(includes)

// TODO : save to cc.pumpHeadFile
newdata := strings.Join(includes, sep)
newdata := strings.Join(uniqlines, sep)
err = ioutil.WriteFile(cc.pumpHeadFile, []byte(newdata), os.ModePerm)
if err != nil {
blog.Warnf("cc: copy pump head failed to write file: %s with err:%v", cc.pumpHeadFile, err)
Expand Down Expand Up @@ -767,15 +781,15 @@ ERROREND:
}

func (cc *TaskCC) finalExecute([]string) {
if cc.saveTemp() {
return
if cc.needcopypumpheadfile {
go cc.copyPumpHeadFile(cc.sandbox.Dir)
}

if cc.needcopypumpheadfile {
cc.copyPumpHeadFile(cc.sandbox.Dir)
if cc.saveTemp() {
return
}

cc.cleanTmpFile()
go cc.cleanTmpFile()
}

func (cc *TaskCC) saveTemp() bool {
Expand Down
45 changes: 27 additions & 18 deletions src/backend/booster/bk_dist/handler/ue4/cl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,23 +352,27 @@ func (cl *TaskCL) analyzeIncludes(f string, workdir string) ([]*dcFile.Info, err
}

lines := strings.Split(string(data), "\r\n")
includes := []*dcFile.Info{}
uniqlines := uniqArr(lines)
blog.Infof("cl: got %d uniq include file from file: %s", len(uniqlines), f)

for _, l := range uniqlines {
if !filepath.IsAbs(l) {
l, _ = filepath.Abs(filepath.Join(workdir, l))
}
fstat := dcFile.Stat(l)
if fstat.Exist() && !fstat.Basic().IsDir() {
includes = append(includes, fstat)
} else {
blog.Infof("cl: do not deal include file: %s in file:%s for not existed or is dir", l, f)
if dcPump.SupportPumpStatCache(cl.sandbox.Env) {
return commonUtil.GetFileInfo(uniqlines, true, true), nil
} else {
includes := []*dcFile.Info{}
for _, l := range uniqlines {
if !filepath.IsAbs(l) {
l, _ = filepath.Abs(filepath.Join(workdir, l))
}
fstat := dcFile.Stat(l)
if fstat.Exist() && !fstat.Basic().IsDir() {
includes = append(includes, fstat)
} else {
blog.Infof("cl: do not deal include file: %s in file:%s for not existed or is dir", l, f)
}
}
}

return includes, nil
return includes, nil
}
}

func (cl *TaskCL) checkFstat(f string, workdir string) (*dcFile.Info, error) {
Expand Down Expand Up @@ -475,8 +479,13 @@ func (cl *TaskCL) copyPumpHeadFile(workdir string) error {
return ErrorInvalidDependFile
}

for i := range includes {
includes[i] = strings.Replace(includes[i], "/", "\\", -1)
}
uniqlines := uniqArr(includes)

// TODO : save to cc.pumpHeadFile
newdata := strings.Join(includes, sep)
newdata := strings.Join(uniqlines, sep)
err = ioutil.WriteFile(cl.pumpHeadFile, []byte(newdata), os.ModePerm)
if err != nil {
blog.Warnf("cl: copy pump head failed to write file: %s with err:%v", cl.pumpHeadFile, err)
Expand Down Expand Up @@ -993,15 +1002,15 @@ ERROREND:
}

func (cl *TaskCL) finalExecute([]string) {
if cl.saveTemp() {
return
if cl.needcopypumpheadfile {
go cl.copyPumpHeadFile(cl.sandbox.Dir)
}

if cl.needcopypumpheadfile {
cl.copyPumpHeadFile(cl.sandbox.Dir)
if cl.saveTemp() {
return
}

cl.cleanTmpFile()
go cl.cleanTmpFile()
}

func (cl *TaskCL) saveTemp() bool {
Expand Down
6 changes: 3 additions & 3 deletions src/backend/booster/server/pkg/engine/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ type QueueBriefInfo struct {
type QueueShareType int

const (
QueueShareTypeAllAllowed QueueShareType = iota
QueueShareTypeOnlyTakeFromPublic
QueueShareTypeOnlyGiveToPublic
QueueShareTypeAllAllowed QueueShareType = iota
QueueShareTypeOnlyTakeFromPublic // 是否允许从公共队列中获取任务,即用当前资源来为公共队列中的任务进行加速
QueueShareTypeOnlyGiveToPublic // 是否允许将当前处理不了的任务放置到公共队列中,即是否用其它组的资源来为当前组的任务进行加速
QueueShareTypeNoneAllowed
)

0 comments on commit fa34d78

Please sign in to comment.