From 1a772de9a13f0f7f4d9a15ed7c20dce891df079f Mon Sep 17 00:00:00 2001 From: tbs60 Date: Tue, 7 Feb 2023 09:26:33 +0800 Subject: [PATCH] =?UTF-8?q?pump=E6=A8=A1=E5=BC=8F=E4=B8=8B=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=9A=84stat=E8=B0=83=E7=94=A8=E5=8E=BB=E9=87=8D,=20i?= =?UTF-8?q?ssue:=20#3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bk_dist/booster/command/command.go | 5 ++ .../bk_dist/booster/command/process.go | 1 + .../booster/bk_dist/booster/pkg/booster.go | 4 ++ src/backend/booster/bk_dist/common/env/env.go | 1 + .../booster/bk_dist/common/pump/pump.go | 8 ++- .../booster/bk_dist/common/types/booster.go | 1 + .../booster/bk_dist/handler/common/util.go | 62 +++++++++++++++++++ .../booster/bk_dist/handler/ue4/cc/error.go | 1 + .../booster/bk_dist/handler/ue4/cc/handler.go | 50 +++++++++------ .../booster/bk_dist/handler/ue4/cl/handler.go | 45 ++++++++------ .../booster/server/pkg/engine/queue.go | 6 +- 11 files changed, 143 insertions(+), 41 deletions(-) diff --git a/src/backend/booster/bk_dist/booster/command/command.go b/src/backend/booster/bk_dist/booster/command/command.go index 8f0a552a..5e5cbc2a 100644 --- a/src/backend/booster/bk_dist/booster/command/command.go +++ b/src/backend/booster/bk_dist/booster/command/command.go @@ -76,6 +76,7 @@ const ( FlagPumpCacheRemoveAll = "pump_cache_remove_all" FlagPumpBlackList = "pump_black_list" FlagPumpMinActionNum = "pump_min_action_num" + FlagPumpDisableStatCache = "pump_disable_stat_cache" FlagForceLocalList = "force_local_list" FlagNoWork = "no_work" FlagControllerNoWait = "controller_no_wait" @@ -316,6 +317,10 @@ var ( Name: "pump_min_action_num", Usage: "do not use pump if total actions less this", }, + commandCli.BoolFlag{ + Name: "pump_disable_stat_cache", + Usage: "whether disable pump depend file stat info cache, default is false", + }, commandCli.StringSliceFlag{ Name: "force_local_list, fll", Usage: "key list which will be force executed locally", diff --git a/src/backend/booster/bk_dist/booster/command/process.go b/src/backend/booster/bk_dist/booster/command/process.go index 9229fadd..8c9fbcb4 100644 --- a/src/backend/booster/bk_dist/booster/command/process.go +++ b/src/backend/booster/bk_dist/booster/command/process.go @@ -281,6 +281,7 @@ func newBooster(c *commandCli.Context) (*pkg.Booster, error) { PumpCacheRemoveAll: c.Bool(FlagPumpCacheRemoveAll), PumpBlackList: c.StringSlice(FlagPumpBlackList), PumpMinActionNum: int32(pumpMinActionNum), + PumpDisableStatCache: c.Bool(FlagPumpDisableStatCache), ForceLocalList: c.StringSlice(FlagForceLocalList), NoWork: c.Bool(FlagNoWork), WriteMemroy: c.Bool(FlagWriteMemroMemroy), diff --git a/src/backend/booster/bk_dist/booster/pkg/booster.go b/src/backend/booster/bk_dist/booster/pkg/booster.go index d0d9c765..673ea9b9 100644 --- a/src/backend/booster/bk_dist/booster/pkg/booster.go +++ b/src/backend/booster/bk_dist/booster/pkg/booster.go @@ -306,6 +306,10 @@ func (b *Booster) getWorkersEnv() map[string]string { } requiredEnv[env.KeyExecutorPumpMinActionNum] = strconv.Itoa(int(b.config.Works.PumpMinActionNum)) + if b.config.Works.PumpDisableStatCache { + requiredEnv[env.KeyExecutorPumpDisableStatCache] = envValueTrue + } + if b.config.Works.IOTimeoutSecs > 0 { requiredEnv[env.KeyExecutorIOTimeout] = strconv.Itoa(b.config.Works.IOTimeoutSecs) } diff --git a/src/backend/booster/bk_dist/common/env/env.go b/src/backend/booster/bk_dist/common/env/env.go index b0b577e6..5f8aa3a6 100644 --- a/src/backend/booster/bk_dist/common/env/env.go +++ b/src/backend/booster/bk_dist/common/env/env.go @@ -48,6 +48,7 @@ const ( KeyExecutorPumpCacheSizeMaxMB = "PUMP_CACHE_SIZE_MAX_MB" // cache pump inlude files KeyExecutorPumpBlackKeys = "PUMP_BLACK_KEYS" KeyExecutorPumpMinActionNum = "PUMP_MIN_ACTION_NUM" + KeyExecutorPumpDisableStatCache = "PUMP_DISABLE_STAT_CACHE" KeyExecutorForceLocalKeys = "FORCE_LOCAL_KEYS" KeyExecutorEnvProfile = "ENV_PROFILE" KeyExecutorWorkerSideCache = "WORKER_SIDE_CACHE" diff --git a/src/backend/booster/bk_dist/common/pump/pump.go b/src/backend/booster/bk_dist/common/pump/pump.go index ee0ae707..61abf464 100644 --- a/src/backend/booster/bk_dist/common/pump/pump.go +++ b/src/backend/booster/bk_dist/common/pump/pump.go @@ -150,8 +150,8 @@ func IsPump(env *env.Sandbox) bool { } func SupportPump(env *env.Sandbox) bool { - // return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin") - return IsPump(env) && runtime.GOOS == "windows" + return IsPump(env) && (runtime.GOOS == "windows" || runtime.GOOS == "darwin") + // return IsPump(env) && runtime.GOOS == "windows" } func IsPumpCache(env *env.Sandbox) bool { @@ -189,3 +189,7 @@ func PumpMinActionNum(env *env.Sandbox) int32 { return 0 } + +func SupportPumpStatCache(env *env.Sandbox) bool { + return env.GetEnv(dcEnv.KeyExecutorPumpDisableStatCache) == "" +} diff --git a/src/backend/booster/bk_dist/common/types/booster.go b/src/backend/booster/bk_dist/common/types/booster.go index 84e440a4..0630487e 100644 --- a/src/backend/booster/bk_dist/common/types/booster.go +++ b/src/backend/booster/bk_dist/common/types/booster.go @@ -111,6 +111,7 @@ type BoosterWorks struct { PumpCacheRemoveAll bool PumpBlackList []string PumpMinActionNum int32 + PumpDisableStatCache bool ForceLocalList []string diff --git a/src/backend/booster/bk_dist/handler/common/util.go b/src/backend/booster/bk_dist/handler/common/util.go index a773f0b2..77a0cfbf 100644 --- a/src/backend/booster/bk_dist/handler/common/util.go +++ b/src/backend/booster/bk_dist/handler/common/util.go @@ -14,12 +14,14 @@ import ( "os" "path" "strings" + "sync" "github.com/Tencent/bk-ci/src/booster/bk_dist/common/env" dcFile "github.com/Tencent/bk-ci/src/booster/bk_dist/common/file" "github.com/Tencent/bk-ci/src/booster/bk_dist/common/protocol" dcSyscall "github.com/Tencent/bk-ci/src/booster/bk_dist/common/syscall" "github.com/Tencent/bk-ci/src/booster/bk_dist/common/types" + "github.com/Tencent/bk-ci/src/booster/common/blog" ) // GetHandlerEnv get env by booster type @@ -58,3 +60,63 @@ func GetHandlerTmpDir(sandBox *dcSyscall.Sandbox) string { return "" } + +var ( + fileInfoCacheLock sync.RWMutex + fileInfoCache = map[string]*dcFile.Info{} +) + +// 支持并发read,但会有重复Stat操作,考虑并发和去重的平衡 +func GetFileInfo(fs []string, mustexisted bool, notdir bool) []*dcFile.Info { + // read + fileInfoCacheLock.RLock() + notfound := []string{} + is := make([]*dcFile.Info, 0, len(fs)) + for _, f := range fs { + i, ok := fileInfoCache[f] + if !ok { + notfound = append(notfound, f) + continue + } + + if mustexisted && !i.Exist() { + continue + } + if notdir && i.Basic().IsDir() { + continue + } + is = append(is, i) + } + fileInfoCacheLock.RUnlock() + + blog.Infof("common util: got %d file stat and %d not found", len(is), len(notfound)) + if len(notfound) == 0 { + return is + } + + // query + tempis := make(map[string]*dcFile.Info, len(notfound)) + for _, f := range notfound { + i := dcFile.Stat(f) + tempis[f] = i + + if mustexisted && !i.Exist() { + continue + } + if notdir && i.Basic().IsDir() { + continue + } + is = append(is, i) + } + + // write + go func(tempis *map[string]*dcFile.Info) { + fileInfoCacheLock.Lock() + for f, i := range *tempis { + fileInfoCache[f] = i + } + fileInfoCacheLock.Unlock() + }(&tempis) + + return is +} diff --git a/src/backend/booster/bk_dist/handler/ue4/cc/error.go b/src/backend/booster/bk_dist/handler/ue4/cc/error.go index b97045ba..3fca784d 100644 --- a/src/backend/booster/bk_dist/handler/ue4/cc/error.go +++ b/src/backend/booster/bk_dist/handler/ue4/cc/error.go @@ -38,6 +38,7 @@ var ( ErrorNotSupportGch = fmt.Errorf("output with .gch, must be local") ErrorNoPumpHeadFile = fmt.Errorf("pump head file not exist") ErrorNoDependFile = fmt.Errorf("depend file not exist") + ErrorInvalidDependFile = fmt.Errorf("depend file invalid") ErrorNotSupportRemote = fmt.Errorf("not support to remote execute") ErrorInPumpBlack = fmt.Errorf("in pump black list") ) diff --git a/src/backend/booster/bk_dist/handler/ue4/cc/handler.go b/src/backend/booster/bk_dist/handler/ue4/cc/handler.go index b3533aa5..a05ed633 100644 --- a/src/backend/booster/bk_dist/handler/ue4/cc/handler.go +++ b/src/backend/booster/bk_dist/handler/ue4/cc/handler.go @@ -236,23 +236,27 @@ func (cc *TaskCC) analyzeIncludes(f string, workdir string) ([]*dcFile.Info, err sep = "\r\n" } lines := strings.Split(string(data), sep) - includes := []*dcFile.Info{} uniqlines := uniqArr(lines) blog.Infof("cc: got %d uniq include file from file: %s", len(uniqlines), f) - for _, l := range uniqlines { - if !filepath.IsAbs(l) { - l, _ = filepath.Abs(filepath.Join(workdir, l)) - } - fstat := dcFile.Stat(l) - if fstat.Exist() && !fstat.Basic().IsDir() { - includes = append(includes, fstat) - } else { - blog.Infof("cc: do not deal include file: %s in file:%s for not existed or is dir", l, f) + if dcPump.SupportPumpStatCache(cc.sandbox.Env) { + return commonUtil.GetFileInfo(uniqlines, true, true), nil + } else { + includes := []*dcFile.Info{} + for _, l := range uniqlines { + if !filepath.IsAbs(l) { + l, _ = filepath.Abs(filepath.Join(workdir, l)) + } + fstat := dcFile.Stat(l) + if fstat.Exist() && !fstat.Basic().IsDir() { + includes = append(includes, fstat) + } else { + blog.Infof("cc: do not deal include file: %s in file:%s for not existed or is dir", l, f) + } } - } - return includes, nil + return includes, nil + } } func (cc *TaskCC) checkFstat(f string, workdir string) (*dcFile.Info, error) { @@ -300,8 +304,18 @@ func (cc *TaskCC) copyPumpHeadFile(workdir string) error { blog.Infof("cc: copy pump head got %d uniq include file from file: %s", len(includes), cc.sourcedependfile) + if len(includes) == 0 { + blog.Warnf("cl: depend file: %s data:[%s] is invalid", cc.sourcedependfile, string(data)) + return ErrorInvalidDependFile + } + + for i := range includes { + includes[i] = strings.Replace(includes[i], "\\", "/", -1) + } + uniqlines := uniqArr(includes) + // TODO : save to cc.pumpHeadFile - newdata := strings.Join(includes, sep) + newdata := strings.Join(uniqlines, sep) err = ioutil.WriteFile(cc.pumpHeadFile, []byte(newdata), os.ModePerm) if err != nil { blog.Warnf("cc: copy pump head failed to write file: %s with err:%v", cc.pumpHeadFile, err) @@ -767,15 +781,15 @@ ERROREND: } func (cc *TaskCC) finalExecute([]string) { - if cc.saveTemp() { - return + if cc.needcopypumpheadfile { + go cc.copyPumpHeadFile(cc.sandbox.Dir) } - if cc.needcopypumpheadfile { - cc.copyPumpHeadFile(cc.sandbox.Dir) + if cc.saveTemp() { + return } - cc.cleanTmpFile() + go cc.cleanTmpFile() } func (cc *TaskCC) saveTemp() bool { diff --git a/src/backend/booster/bk_dist/handler/ue4/cl/handler.go b/src/backend/booster/bk_dist/handler/ue4/cl/handler.go index f2369456..9ef94eb1 100644 --- a/src/backend/booster/bk_dist/handler/ue4/cl/handler.go +++ b/src/backend/booster/bk_dist/handler/ue4/cl/handler.go @@ -352,23 +352,27 @@ func (cl *TaskCL) analyzeIncludes(f string, workdir string) ([]*dcFile.Info, err } lines := strings.Split(string(data), "\r\n") - includes := []*dcFile.Info{} uniqlines := uniqArr(lines) blog.Infof("cl: got %d uniq include file from file: %s", len(uniqlines), f) - for _, l := range uniqlines { - if !filepath.IsAbs(l) { - l, _ = filepath.Abs(filepath.Join(workdir, l)) - } - fstat := dcFile.Stat(l) - if fstat.Exist() && !fstat.Basic().IsDir() { - includes = append(includes, fstat) - } else { - blog.Infof("cl: do not deal include file: %s in file:%s for not existed or is dir", l, f) + if dcPump.SupportPumpStatCache(cl.sandbox.Env) { + return commonUtil.GetFileInfo(uniqlines, true, true), nil + } else { + includes := []*dcFile.Info{} + for _, l := range uniqlines { + if !filepath.IsAbs(l) { + l, _ = filepath.Abs(filepath.Join(workdir, l)) + } + fstat := dcFile.Stat(l) + if fstat.Exist() && !fstat.Basic().IsDir() { + includes = append(includes, fstat) + } else { + blog.Infof("cl: do not deal include file: %s in file:%s for not existed or is dir", l, f) + } } - } - return includes, nil + return includes, nil + } } func (cl *TaskCL) checkFstat(f string, workdir string) (*dcFile.Info, error) { @@ -475,8 +479,13 @@ func (cl *TaskCL) copyPumpHeadFile(workdir string) error { return ErrorInvalidDependFile } + for i := range includes { + includes[i] = strings.Replace(includes[i], "/", "\\", -1) + } + uniqlines := uniqArr(includes) + // TODO : save to cc.pumpHeadFile - newdata := strings.Join(includes, sep) + newdata := strings.Join(uniqlines, sep) err = ioutil.WriteFile(cl.pumpHeadFile, []byte(newdata), os.ModePerm) if err != nil { blog.Warnf("cl: copy pump head failed to write file: %s with err:%v", cl.pumpHeadFile, err) @@ -993,15 +1002,15 @@ ERROREND: } func (cl *TaskCL) finalExecute([]string) { - if cl.saveTemp() { - return + if cl.needcopypumpheadfile { + go cl.copyPumpHeadFile(cl.sandbox.Dir) } - if cl.needcopypumpheadfile { - cl.copyPumpHeadFile(cl.sandbox.Dir) + if cl.saveTemp() { + return } - cl.cleanTmpFile() + go cl.cleanTmpFile() } func (cl *TaskCL) saveTemp() bool { diff --git a/src/backend/booster/server/pkg/engine/queue.go b/src/backend/booster/server/pkg/engine/queue.go index 90516434..351ad1d0 100644 --- a/src/backend/booster/server/pkg/engine/queue.go +++ b/src/backend/booster/server/pkg/engine/queue.go @@ -262,8 +262,8 @@ type QueueBriefInfo struct { type QueueShareType int const ( - QueueShareTypeAllAllowed QueueShareType = iota - QueueShareTypeOnlyTakeFromPublic - QueueShareTypeOnlyGiveToPublic + QueueShareTypeAllAllowed QueueShareType = iota + QueueShareTypeOnlyTakeFromPublic // 是否允许从公共队列中获取任务,即用当前资源来为公共队列中的任务进行加速 + QueueShareTypeOnlyGiveToPublic // 是否允许将当前处理不了的任务放置到公共队列中,即是否用其它组的资源来为当前组的任务进行加速 QueueShareTypeNoneAllowed )