diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go index 1a2d90ec..4721b7b8 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/executor.go @@ -321,7 +321,7 @@ func (e *executor) realExecuteLocalTask(locallockweight int32) *types.LocalTaskE var outBuf, errBuf bytes.Buffer sandbox.Stdout = &outBuf sandbox.Stderr = &errBuf - blog.Infof("executor:ready run cmd:%v", e.req.Commands) + blog.Infof("executor: ready from pid(%d) run cmd:%v", e.req.Pid, e.req.Commands) cmd := e.req.Commands[0] if strings.HasSuffix(cmd, "cmd.exe") || strings.HasSuffix(cmd, "Cmd.exe") { arg := strings.Join(e.req.Commands, " ") diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index e7232116..2e054d18 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -39,6 +39,8 @@ const ( corkMaxSize = 1024 * 1024 * 10 // corkMaxSize = 1024 * 1024 * 1024 largeFileSize = 1024 * 1024 * 100 // 100MB + + fileMaxFailCount = 5 ) // NewMgr get a new Remote Mgr @@ -130,11 +132,11 @@ type Mgr struct { } type fileSendMap struct { - sync.Mutex + sync.RWMutex cache map[string]*[]*types.FileInfo } -func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.FileInfo, bool) { +func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, query bool) (*types.FileInfo, bool) { fsm.Lock() defer fsm.Unlock() @@ -162,9 +164,10 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.F for _, ci := range *c { if ci.Match(desc) { - //if worker is retrying and send failed before, try to set send status to retrying - if retry && ci.SendStatus == types.FileSendFailed { - ci.SendStatus = types.FileSendRetrying + //if worker is send failed before, try to send it again + if ci.SendStatus == types.FileSendFailed && !query { + blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) + ci.SendStatus = types.FileSending return ci, false } return ci, true @@ -175,7 +178,7 @@ func (fsm *fileSendMap) matchOrInsert(desc dcSDK.FileDesc, retry bool) (*types.F return info, false } -func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc, retry bool) []matchResult { +func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc) []matchResult { fsm.Lock() defer fsm.Unlock() @@ -211,10 +214,11 @@ func (fsm *fileSendMap) matchOrInserts(descs []*dcSDK.FileDesc, retry bool) []ma for _, ci := range *c { if ci.Match(*desc) { fileMatched := true - //if worker is retrying and send failed before, try to set send status to retrying - if retry && ci.SendStatus == types.FileSendFailed { + //if file is send failed before, try to send it again + if ci.SendStatus == types.FileSendFailed { + blog.Debugf("file: retry send file %s, fail count %d", desc.FilePath, ci.FailCount) fileMatched = false - ci.SendStatus = types.FileSendRetrying + ci.SendStatus = types.FileSending } result = append(result, matchResult{ info: ci, @@ -255,6 +259,7 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS FileMode: desc.Filemode, LinkTarget: desc.LinkTarget, SendStatus: status, + FailCount: 0, } c, ok := fsm.cache[desc.FilePath] @@ -267,6 +272,12 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS for _, ci := range *c { if ci.Match(desc) { ci.SendStatus = status + if status == types.FileSendFailed { + ci.FailCount++ + } + if status == types.FileSendSucceed { + ci.FailCount = 0 + } return } } @@ -275,6 +286,30 @@ func (fsm *fileSendMap) updateStatus(desc dcSDK.FileDesc, status types.FileSendS return } +func (fsm *fileSendMap) hasReachedFailCount(descs []dcSDK.FileDesc) bool { + fsm.RLock() + defer fsm.RUnlock() + + if fsm.cache == nil { + return false + } + for _, desc := range descs { + c, ok := fsm.cache[desc.FilePath] + if !ok || c == nil || len(*c) == 0 { + continue + } + for _, ci := range *fsm.cache[desc.FilePath] { + if ci.Match(desc) { + if ci.FailCount > fileMaxFailCount { + return true + } + } + } + } + + return false +} + // Init do the initialization for remote manager // !! only call once !! func (m *Mgr) Init() { @@ -527,6 +562,9 @@ func (m *Mgr) ExecuteTask(req *types.RemoteTaskExecuteRequest) (*types.RemoteTas return nil, err } + if m.isFilesAlreadySendFailed(req.Server.Server, req.Req.Commands) { + return nil, fmt.Errorf("remote: no need to send files for work(%s) from pid(%d) to server(%s)", m.work.ID(), req.Pid, req.Server.Server) + } remoteDirs, err := m.ensureFilesWithPriority(handler, req.Pid, req.Sandbox, getFileDetailsFromExecuteRequest(req)) if err != nil { req.BanWorkerList = append(req.BanWorkerList, req.Server) @@ -602,6 +640,24 @@ func (m *Mgr) SendFiles(req *types.RemoteTaskSendFileRequest) ([]string, error) ) } +// check if files send to remote worker failed and no need to send again +func (m *Mgr) isFilesAlreadySendFailed(server string, commands []dcSDK.BKCommand) bool { + m.fileSendMutex.Lock() + target, ok := m.fileSendMap[server] + if !ok { + m.fileSendMutex.Unlock() + return false + } + m.fileSendMutex.Unlock() + + for _, c := range commands { + if target.hasReachedFailCount(c.Inputfiles) { + return true + } + } + return false +} + func (m *Mgr) ensureFilesWithPriority( handler dcSDK.RemoteWorkerHandler, pid int, @@ -650,7 +706,7 @@ func (m *Mgr) ensureFilesWithPriority( blog.Infof("remote: try to ensure priority(%d) files(%d) for work(%s) from pid(%d) dir(%s) to server", i, len(*f), m.work.ID(), pid, sandbox.Dir) - r, err := m.ensureFiles(handler, pid, sandbox, *f, false) + r, err := m.ensureFiles(handler, pid, sandbox, *f) if err != nil { return nil, err } @@ -669,8 +725,7 @@ func (m *Mgr) ensureFiles( handler dcSDK.RemoteWorkerHandler, pid int, sandbox *dcSyscall.Sandbox, - fileDetails []*types.FilesDetails, - retry bool) ([]string, error) { + fileDetails []*types.FilesDetails) ([]string, error) { settings := m.work.Basic().Settings() blog.Infof("remote: try to ensure multi %d files for work(%s) from pid(%d) dir(%s) to server", @@ -823,7 +878,7 @@ func (m *Mgr) ensureFiles( for _, v := range *fs { descs = append(descs, v.file) } - results := m.checkOrLockCorkFiles(server, descs, retry) + results := m.checkOrLockCorkFiles(server, descs) blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server", len(results), len(descs), count, m.work.ID(), pid) needSendCorkFiles := make([]*corkFile, 0, totalFileNum) @@ -845,9 +900,9 @@ func (m *Mgr) ensureFiles( // 启动协程跟踪未发送完成的文件 c := (*fs)[i] - go func(err chan<- error, c *corkFile, r matchResult, retry bool) { - err <- m.ensureSingleCorkFile(c, r, retry) - }(wg, c, v, retry) + go func(err chan<- error, c *corkFile, r matchResult) { + err <- m.ensureSingleCorkFile(c, r) + }(wg, c, v) } // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache @@ -863,15 +918,15 @@ func (m *Mgr) ensureFiles( // 单个文件发送模式 for _, f := range singleFiles { sender := &dcSDK.BKDistFileSender{Files: []dcSDK.FileDesc{*f.file}} - go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) { + go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender) { t := time.Now().Local() - err <- m.ensureSingleFile(handler, host, req, sandbox, retry) + err <- m.ensureSingleFile(handler, host, req, sandbox) d := time.Now().Local().Sub(t) if d > 200*time.Millisecond { blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s", m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath) } - }(wg, f.host, sender, retry) + }(wg, f.host, sender) } } @@ -899,30 +954,29 @@ func (m *Mgr) ensureSingleFile( handler dcSDK.RemoteWorkerHandler, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, - sandbox *dcSyscall.Sandbox, - retry bool) (err error) { + sandbox *dcSyscall.Sandbox) (err error) { if len(req.Files) == 0 { return fmt.Errorf("empty files") } req.Files = req.Files[:1] desc := req.Files[0] - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s) with retry %t", - desc.FilePath, m.work.ID(), host.Server, retry) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s)", + desc.FilePath, m.work.ID(), host.Server) - status, ok := m.checkOrLockSendFile(host.Server, desc, retry) + status, ok := m.checkOrLockSendFile(host.Server, desc, false) // 已经有人发送了文件, 等待文件就绪 if ok { blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), "+ - "some one is sending this file with retry %t", desc.FilePath, m.work.ID(), host.Server, retry) + "some one is sending this file", desc.FilePath, m.work.ID(), host.Server) tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() - for status == types.FileSending || (retry && status == types.FileSendRetrying) { + for status == types.FileSending { select { case <-tick.C: - // 不是发送文件的goroutine,不需要发送failed文件 - status, _ = m.checkOrLockSendFile(host.Server, desc, false) + // 不是发送文件的goroutine,不需要修改状态,仅查询状态 + status, _ = m.checkOrLockSendFile(host.Server, desc, true) } } @@ -935,10 +989,6 @@ func (m *Mgr) ensureSingleFile( blog.Debugf("remote: success to ensure single file(%s) for work(%s) to server(%s)", desc.FilePath, m.work.ID(), host.Server) return nil - case types.FileSendRetrying: - blog.Warnf("remote: single file(%s) for work(%s) to server(%s) is retrying now", - desc.FilePath, m.work.ID(), host.Server) - return types.ErrSendFileRetrying default: return fmt.Errorf("unknown file send status: %s", status.String()) } @@ -962,8 +1012,8 @@ func (m *Mgr) ensureSingleFile( // } // } - blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file with retry %t", - desc.FilePath, m.work.ID(), host.Server, retry) + blog.Debugf("remote: try to ensure single file(%s) for work(%s) to server(%s), going to send this file", + desc.FilePath, m.work.ID(), host.Server) req.Messages = m.fileMessageBank.get(desc) // 同步发送文件 @@ -988,8 +1038,8 @@ func (m *Mgr) ensureSingleFile( } if err != nil { - blog.Errorf("remote: execute send file(%s) for work(%s) to server(%s) failed with retry %t: %v", - desc.FilePath, m.work.ID(), host.Server, retry, err) + blog.Errorf("remote: execute send file(%s) for work(%s) to server(%s) failed: %v", + desc.FilePath, m.work.ID(), host.Server, err) return err } @@ -998,13 +1048,13 @@ func (m *Mgr) ensureSingleFile( desc.FilePath, m.work.ID(), host.Server, retCode) } - blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s) with retry %t", - desc.FilePath, m.work.ID(), host.Server, retry) + blog.Debugf("remote: success to execute send file(%s) for work(%s) to server(%s)", + desc.FilePath, m.work.ID(), host.Server) return nil } // ensureSingleCorkFile 保证给到的第一个文件被正确分发到目标机器上, 若给到的文件多于一个, 多余的部分会被忽略 -func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err error) { +func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult) (err error) { status := r.info.SendStatus host := c.host desc := c.file @@ -1019,11 +1069,11 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err tick := time.NewTicker(m.checkSendFileTick) defer tick.Stop() - for status == types.FileSending || (retry && status == types.FileSendRetrying) { + for status == types.FileSending { select { case <-tick.C: // 不是发送文件的goroutine,不能修改状态 - status, _ = m.checkOrLockSendFile(host.Server, *desc, false) + status, _ = m.checkOrLockSendFile(host.Server, *desc, true) } } @@ -1036,9 +1086,6 @@ func (m *Mgr) ensureSingleCorkFile(c *corkFile, r matchResult, retry bool) (err blog.Debugf("remote: end ensure single cork file(%s) for work(%s) to server(%s) succeed", desc.FilePath, m.work.ID(), host.Server) return nil - case types.FileSendRetrying: - blog.Warnf("remote: single cork file(%s) for work(%s) to server(%s) is retrying now", desc.FilePath, m.work.ID(), host.Server) - return types.ErrSendFileRetrying default: blog.Errorf("remote: end ensure single cork file(%s) for work(%s) to server(%s), "+ " with unknown status", desc.FilePath, m.work.ID(), host.Server) @@ -1137,7 +1184,7 @@ func (m *Mgr) checkBatchCache( } // checkOrLockFile 检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, retry bool) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, query bool) (types.FileSendStatus, bool) { t1 := time.Now().Local() m.fileSendMutex.Lock() @@ -1159,7 +1206,7 @@ func (m *Mgr) checkOrLockSendFile(server string, desc dcSDK.FileDesc, retry bool } m.fileSendMutex.Unlock() - info, match := target.matchOrInsert(desc, retry) + info, match := target.matchOrInsert(desc, query) return info.SendStatus, match } @@ -1169,7 +1216,7 @@ type matchResult struct { } // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry bool) []matchResult { +func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -1178,7 +1225,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc, retry } m.fileSendMutex.Unlock() - return target.matchOrInserts(descs, retry) + return target.matchOrInserts(descs) } func (m *Mgr) updateSendFile(server string, desc dcSDK.FileDesc, status types.FileSendStatus) { @@ -1198,7 +1245,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote // TODO : update all file path for p2p fileCollections := m.getToolChainFromExecuteRequest(req) if fileCollections != nil && len(fileCollections) > 0 { - err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) + err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1216,7 +1263,7 @@ func (m *Mgr) sendToolchain(handler dcSDK.RemoteWorkerHandler, req *types.Remote if fileCollections != nil && len(fileCollections) > 0 { blog.Infof("remote: found tool chain changed, send toolchain to server[%s] again", req.Server.Server) - err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, false) + err = m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections) if err != nil { blog.Errorf("remote: execute remote task for work(%s) from pid(%d) to server(%s), "+ "ensure tool chain files failed: %v", m.work.ID(), req.Pid, req.Server.Server, err) @@ -1282,7 +1329,7 @@ func (m *Mgr) retrySendToolChain(handler dcSDK.RemoteWorkerHandler, req *types.R continue } - if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections, true); err != nil { + if err := m.sendFileCollectionOnce(handler, req.Pid, req.Sandbox, req.Server, fileCollections); err != nil { blog.Errorf("remote: retry to send tool chain for work(%s) for the %dth times from pid(%d) to server(%s), "+ "send tool chain files failed: %v", m.work.ID(), i, req.Pid, req.Server.Server, err) time.Sleep(m.toolChainRetryTick) @@ -1309,8 +1356,7 @@ func (m *Mgr) sendFileCollectionOnce( pid int, sandbox *dcSyscall.Sandbox, server *dcProtocol.Host, - filecollections []*types.FileCollectionInfo, - retry bool) error { + filecollections []*types.FileCollectionInfo) error { blog.Infof("remote: try to send %d file collection for work(%s) from pid(%d) dir(%s) to server", len(filecollections), m.work.ID(), pid, sandbox.Dir) @@ -1319,10 +1365,10 @@ func (m *Mgr) sendFileCollectionOnce( count := 0 for _, fc := range filecollections { count++ - go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo, retry bool) { - err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox, retry) + go func(err chan<- error, host *dcProtocol.Host, filecollection *types.FileCollectionInfo) { + err <- m.ensureOneFileCollection(handler, pid, host, filecollection, sandbox) // err <- m.ensureOneFileCollectionByFiles(handler, pid, host, filecollection, sandbox) - }(wg, server, fc, retry) + }(wg, server, fc) } for i := 0; i < count; i++ { @@ -1352,12 +1398,11 @@ func (m *Mgr) ensureOneFileCollection( pid int, host *dcProtocol.Host, fc *types.FileCollectionInfo, - sandbox *dcSyscall.Sandbox, - retry bool) (err error) { + sandbox *dcSyscall.Sandbox) (err error) { blog.Infof("remote: try to ensure one file collection(%s) for work(%s) to server(%s)", fc.UniqID, m.work.ID(), host.Server) - status, ok := m.checkOrLockFileCollection(host.Server, fc, retry) + status, ok := m.checkOrLockFileCollection(host.Server, fc) // 已经有人发送了文件, 等待文件就绪 if ok { @@ -1401,8 +1446,8 @@ func (m *Mgr) ensureOneFileCollection( } blog.Infof("remote: try to ensure one file collection(%s) timestamp(%d) filenum(%d) cache-hit(%d) "+ - "for work(%s) to server(%s), going to send this collection with retry:%t", - fc.UniqID, fc.Timestamp, len(needSentFiles), hit, m.work.ID(), host.Server, retry) + "for work(%s) to server(%s), going to send this collection", + fc.UniqID, fc.Timestamp, len(needSentFiles), hit, m.work.ID(), host.Server) // !! 这个地方不需要了,需要注释掉,影响性能 // req := &dcSDK.BKDistFileSender{Files: needSentFiles} @@ -1452,27 +1497,24 @@ func (m *Mgr) ensureOneFileCollection( }() if err != nil { - blog.Errorf("remote: execute send file collection(%s) for work(%s) to server(%s) failed with retry %t: %v ", - fc.UniqID, m.work.ID(), host.Server, retry, err) + blog.Errorf("remote: execute send file collection(%s) for work(%s) to server(%s) failed : %v ", + fc.UniqID, m.work.ID(), host.Server, err) return err } blog.Debugf("remote: success to execute send file collection(%s) files(%+v) timestamp(%d) filenum(%d) "+ - "for work(%s) to server(%s) with retry %t", fc.UniqID, fc.Files, fc.Timestamp, len(fc.Files), m.work.ID(), host.Server, retry) + "for work(%s) to server(%s)", fc.UniqID, fc.Files, fc.Timestamp, len(fc.Files), m.work.ID(), host.Server) return nil } // checkOrLockFileCollection 检查目标file collection的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, // 则将其置于sending, 并返回false -func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo, retry bool) (types.FileSendStatus, bool) { +func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionInfo) (types.FileSendStatus, bool) { m.fileCollectionSendMutex.Lock() defer m.fileCollectionSendMutex.Unlock() target, ok := m.fileCollectionSendMap[server] if !ok { - if retry { - blog.Warnf("remote: file collection(%s) not found in cache with retry %t", fc.UniqID, retry) - } filecollections := make([]*types.FileCollectionInfo, 0, 10) m.fileCollectionSendMap[server] = &filecollections target = m.fileCollectionSendMap[server] @@ -1480,8 +1522,8 @@ func (m *Mgr) checkOrLockFileCollection(server string, fc *types.FileCollectionI for _, f := range *target { if f.UniqID == fc.UniqID { - // if retry, set status to sending if fc send failed - if retry && f.SendStatus == types.FileSendFailed { + // set status to sending if fc send failed + if f.SendStatus == types.FileSendFailed { f.SendStatus = types.FileSending return f.SendStatus, false } diff --git a/src/backend/booster/bk_dist/controller/pkg/types/manager.go b/src/backend/booster/bk_dist/controller/pkg/types/manager.go index 5c7cfad1..f37cb0c3 100644 --- a/src/backend/booster/bk_dist/controller/pkg/types/manager.go +++ b/src/backend/booster/bk_dist/controller/pkg/types/manager.go @@ -236,18 +236,16 @@ const ( FileSending FileSendSucceed FileSendFailed - FileSendRetrying FileSendUnknown = 99 ) var ( fileStatusMap = map[FileSendStatus]string{ - FileSendInit: "sendinit", - FileSending: "sending", - FileSendSucceed: "sendsucceed", - FileSendFailed: "sendfailed", - FileSendRetrying: "sendretrying", - FileSendUnknown: "unknown", + FileSendInit: "sendinit", + FileSending: "sending", + FileSendSucceed: "sendsucceed", + FileSendFailed: "sendfailed", + FileSendUnknown: "unknown", } ) @@ -282,6 +280,7 @@ type FileInfo struct { FileMode uint32 `json:"file_mode"` LinkTarget string `json:"link_target"` SendStatus FileSendStatus `json:"send_status"` + FailCount int `json:"fail_count"` } // Match check if the FileDesc is point to some file as this FileInfo