Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: try normal accelate when pump failed, issue: #116 #117

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,39 @@ func (e *executor) remoteTryTimes() int {
return 1
}

func (e *executor) onRemoteFail() (*dcSDK.BKDistCommand, error) {
blog.Infof("executor: try to execute onRemoteFail from pid(%d)", e.req.Pid)
// defer e.mgr.work.Basic().UpdateJobStats(e.stats)

// dcSDK.StatsTimeNow(&e.stats.PreWorkEnterTime)
// defer dcSDK.StatsTimeNow(&e.stats.PreWorkLeaveTime)
// e.mgr.work.Basic().UpdateJobStats(e.stats)

if e.handler.PreExecuteNeedLock(e.req.Commands) {
weight := e.handler.PreLockWeight(e.req.Commands)
blog.Infof("executor: try to execute onRemoteFail from pid(%d) lockweight(%d)", e.req.Pid, weight)
if !e.lock(dcSDK.JobUsageLocalPre, weight) {
return nil, types.ErrSlotsLockFailed
}
// dcSDK.StatsTimeNow(&e.stats.PreWorkLockTime)
// defer dcSDK.StatsTimeNow(&e.stats.PreWorkUnlockTime)
defer e.unlock(dcSDK.JobUsageLocalPre, weight)
// e.mgr.work.Basic().UpdateJobStats(e.stats)
}

// dcSDK.StatsTimeNow(&e.stats.PreWorkStartTime)
// e.mgr.work.Basic().UpdateJobStats(e.stats)
r, err := e.handler.OnRemoteFail(e.req.Commands)
// dcSDK.StatsTimeNow(&e.stats.PreWorkEndTime)
if err != nil {
return nil, err
}

// e.stats.PreWorkSuccess = true
blog.Infof("executor: success to execute onRemoteFail from pid(%d)", e.req.Pid)
return r, nil
}

func (e *executor) executePostTask(result *dcSDK.BKDistResult) error {
blog.Infof("executor: try to execute post-task from pid(%d)", e.req.Pid)
defer e.mgr.work.Basic().UpdateJobStats(e.stats)
Expand Down
66 changes: 66 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ func (m *Mgr) ExecuteTask(
blog.Warnf("local: execute post-task for work(%s) from pid(%d) failed: %v", m.work.ID(), req.Pid, err)
req.Stats.RemoteErrorMessage = err.Error()

lr, err := m.retryOnRemoteFail(req, globalWork, e)
if err == nil && lr != nil {
return lr, err
}

if !e.skipLocalRetry() {
return e.executeLocalTask(), nil
}
Expand Down Expand Up @@ -242,6 +247,67 @@ func (m *Mgr) ExecuteTask(
}, nil
}

// 远程失败后调用handle的特殊处理函数,方便支持某些特殊流程
func (m *Mgr) retryOnRemoteFail(
req *types.LocalTaskExecuteRequest,
globalWork *types.Work,
e *executor) (*types.LocalTaskExecuteResult, error) {
blog.Infof("local: onRemoteFail with task(%s) for work(%s) from pid(%d) ",
strings.Join(req.Commands, " "), m.work.ID(), req.Pid)

m.work.Basic().Info().IncPrepared()
m.work.Remote().IncRemoteJobs()

// 重新走流程,比如预处理
cnew, errnew := e.onRemoteFail()
if cnew != nil && errnew == nil {
remoteReqNew := &types.RemoteTaskExecuteRequest{
Pid: req.Pid,
Req: cnew,
Stats: req.Stats,
Sandbox: e.sandbox,
IOTimeout: e.ioTimeout,
BanWorkerList: []*protocol.Host{},
}
// 重新远程执行命令
r, err := m.work.Remote().ExecuteTask(remoteReqNew)

m.work.Basic().Info().DecPrepared()
m.work.Remote().DecRemoteJobs()

if err != nil {
blog.Warnf("local: failed to remote in onRemoteFail from work(%s) from pid(%d) with error(%v)", m.work.ID(), req.Pid, err)
return nil, err
}

blog.Infof("local: succeed to remote in onRemoteFail from work(%s) from pid(%d)", m.work.ID(), req.Pid)
// 重新post环节
err = e.executePostTask(r.Result)
if err != nil {
blog.Warnf("local: execute post-task in onRemoteFail for work(%s) from pid(%d) failed: %v", m.work.ID(), req.Pid, err)
return nil, err
}

req.Stats.Success = true
m.work.Basic().UpdateJobStats(req.Stats)
blog.Infof("local: success to execute post task in onRemoteFail for work(%s) from pid(%d) in env(%v) dir(%s)",
m.work.ID(), req.Pid, req.Environments, req.Dir)
return &types.LocalTaskExecuteResult{
Result: &dcSDK.LocalTaskResult{
ExitCode: 0,
Stdout: e.Stdout(),
Stderr: e.Stderr(),
Message: "success to process all steps",
},
}, nil
}

m.work.Basic().Info().DecPrepared()
m.work.Remote().DecRemoteJobs()

return nil, nil
}

// Slots get current total and occupied slots
func (m *Mgr) Slots() (int, int) {
return m.resource.GetStatus()
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/cc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (cc *TaskCC) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (cc *TaskCC) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (cc *TaskCC) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/custom/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ func (c *Custom) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (c *Custom) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (c *Custom) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/echo/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ func (c *Echo) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (c *Echo) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (c *Echo) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/find/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ func (c *Finder) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (c *Finder) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (c *Finder) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
3 changes: 3 additions & 0 deletions src/backend/booster/bk_dist/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type Handler interface {
// RemoteRetryTimes will return the remote retry times
RemoteRetryTimes() int

// OnRemoteFail give chance to try other way if failed to remote execute
OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error)

// PostExecuteNeedLock decide whether executor should lock before post execution
PostExecuteNeedLock(result *dcSDK.BKDistResult) bool

Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/tc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ func (tc *TextureCompressor) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (tc *TextureCompressor) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (tc *TextureCompressor) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/astc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ func (tc *TextureCompressor) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (tc *TextureCompressor) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (tc *TextureCompressor) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
17 changes: 16 additions & 1 deletion src/backend/booster/bk_dist/handler/ue4/cc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type TaskCC struct {
forcedepend bool
pumpremote bool
needcopypumpheadfile bool
pumpremotefailed bool

pchFileDesc *dcSDK.FileDesc

Expand Down Expand Up @@ -157,6 +158,20 @@ func (cc *TaskCC) RemoteRetryTimes() int {
return 0
}

// TODO : OnRemoteFail give chance to try other way if failed to remote execute
func (cc *TaskCC) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
blog.Infof("cc: start OnRemoteFail for: %v", command)

if cc.pumpremote {
blog.Infof("cc: set pumpremotefailed to true now")
cc.pumpremotefailed = true
cc.needcopypumpheadfile = true
cc.pumpremote = false
return cc.preExecute(command)
}
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (cc *TaskCC) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down Expand Up @@ -649,7 +664,7 @@ func (cc *TaskCC) preExecute(command []string) (*dcSDK.BKDistCommand, error) {
cc.originArgs = command

// ++ try with pump,only support windows now
if dcPump.SupportPump(cc.sandbox.Env) {
if !cc.pumpremotefailed && dcPump.SupportPump(cc.sandbox.Env) {
if satisfied, _ := cc.isPumpActionNumSatisfied(); satisfied {
req, err, notifyerr := cc.trypump(command)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion src/backend/booster/bk_dist/handler/ue4/cl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type TaskCL struct {
forcedepend bool
pumpremote bool
needcopypumpheadfile bool
pumpremotefailed bool

// how to save result file
customSave bool
Expand Down Expand Up @@ -254,6 +255,20 @@ func (cl *TaskCL) RemoteRetryTimes() int {
return 0
}

// TODO : OnRemoteFail give chance to try other way if failed to remote execute
func (cl *TaskCL) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
blog.Infof("cl: start OnRemoteFail for: %v", command)

if cl.pumpremote {
blog.Infof("cl: set pumpremotefailed to true now")
cl.pumpremotefailed = true
cl.needcopypumpheadfile = true
cl.pumpremote = false
return cl.preExecute(command)
}
return nil, nil
}

// LocalLockWeight decide local-execute lock weight, default 1
func (cl *TaskCL) LocalLockWeight(command []string) int32 {
return 1
Expand Down Expand Up @@ -762,7 +777,7 @@ func (cl *TaskCL) preExecute(command []string) (*dcSDK.BKDistCommand, error) {
cl.originArgs = command

// ++ try with pump,only support windows now
if dcPump.SupportPump(cl.sandbox.Env) {
if !cl.pumpremotefailed && dcPump.SupportPump(cl.sandbox.Env) {
if satisfied, _ := cl.isPumpActionNumSatisfied(); satisfied {
req, err, notifyerr := cl.trypump(command)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/clfilter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ func (cf *TaskCLFilter) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (cf *TaskCLFilter) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
if cf.clhandle != nil {
return cf.clhandle.OnRemoteFail(command)
}

return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (cf *TaskCLFilter) PostLockWeight(result *dcSDK.BKDistResult) int32 {
if cf.clhandle != nil {
Expand Down
9 changes: 9 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ func (u *UE4) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (u *UE4) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
if u.innerhandler != nil {
return u.innerhandler.OnRemoteFail(command)
}

return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (u *UE4) PostLockWeight(result *dcSDK.BKDistResult) int32 {
if u.innerhandler != nil {
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/lib/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (l *TaskLib) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (l *TaskLib) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (l *TaskLib) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/link/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (l *TaskLink) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (l *TaskLink) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (l *TaskLink) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/linkfilter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (lf *TaskLinkFilter) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (lf *TaskLinkFilter) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (lf *TaskLinkFilter) PostLockWeight(result *dcSDK.BKDistResult) int32 {
if lf.handle != nil {
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/ue4/shader/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ func (u *UE4Shader) RemoteRetryTimes() int {
return 1
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (u *UE4Shader) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (u *UE4Shader) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
5 changes: 5 additions & 0 deletions src/backend/booster/bk_dist/handler/winclangcl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func (cc *WinClangCl) RemoteRetryTimes() int {
return 0
}

// OnRemoteFail give chance to try other way if failed to remote execute
func (cc *WinClangCl) OnRemoteFail(command []string) (*dcSDK.BKDistCommand, error) {
return nil, nil
}

// PostLockWeight decide post-execute lock weight, default 1
func (cc *WinClangCl) PostLockWeight(result *dcSDK.BKDistResult) int32 {
return 1
Expand Down
Loading