From 152855941abb09a6fe71dea296cb48218edd2155 Mon Sep 17 00:00:00 2001 From: tbs60 Date: Sat, 14 Sep 2024 16:23:43 +0800 Subject: [PATCH] fix: opt pump cache, issue: #296 --- .../controller/pkg/manager/remote/mgr.go | 242 +++++++----------- 1 file changed, 99 insertions(+), 143 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index 26e72763..040f465c 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -687,6 +687,8 @@ func (m *Mgr) ensureFiles( r := make([]string, 0, 10) // cleaner := make([]dcSDK.FileDesc, 0, 10) corkFiles := make(map[string]*[]*corkFile, 0) + // 单文件发送模式,复用corkFile结构体 + singleFiles := make([]*corkFile, 0) // allServerCorkFiles := make(map[string]*[]*corkFile, 0) filesNum := len(fileDetails) for _, fd := range fileDetails { @@ -751,15 +753,14 @@ func (m *Mgr) ensureFiles( } count++ if !m.conf.SendCork { - go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) { - t := time.Now().Local() - err <- m.ensureSingleFile(handler, host, req, sandbox, retry) - d := time.Now().Local().Sub(t) - if d > 200*time.Millisecond { - blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s", - m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath) - } - }(wg, s, sender, retry) + // 复用corkFile结构体保存待发送文件列表 + singleFiles = append(singleFiles, &corkFile{ + handler: handler, + host: s, + sandbox: sandbox, + file: &f, + resultchan: nil, + }) } else { // for send cork cf := &corkFile{ @@ -772,7 +773,6 @@ func (m *Mgr) ensureFiles( l, ok := corkFiles[s.Server] if !ok { // 预先分配好队列,避免频繁内存分配 - // newl := []*corkFile{cf} newl := make([]*corkFile, 0, filesNum) newl = append(newl, cf) corkFiles[s.Server] = &newl @@ -781,156 +781,112 @@ func (m *Mgr) ensureFiles( } } } - - // // 分发额外的内容 - // for _, s := range servers { - // count++ - // if !m.conf.SendCork { - // go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender) { - // t := time.Now().Local() - // err <- m.ensureSingleFile(handler, host, req, sandbox) - // d := time.Now().Local().Sub(t) - // if d > 200*time.Millisecond { - // blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s", - // m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath) - // } - // }(wg, s, sender) - // } else { - // // for send cork - // cf := &corkFile{ - // handler: handler, - // host: s, - // sandbox: sandbox, - // file: &f, - // resultchan: nil, - // } - // l, ok := allServerCorkFiles[s.Server] - // if !ok { - // // 预先分配好队列,避免频繁内存分配 - // // newl := []*corkFile{cf} - // newl := make([]*corkFile, 0, filesNum) - // newl = append(newl, cf) - // allServerCorkFiles[s.Server] = &newl - // } else { - // *l = append(*l, cf) - // } - // } - // } } - if m.conf.SendCork { - blog.Debugf("remote: ready to ensure multi %d cork files for work(%s) from pid(%d) to server", - count, m.work.ID(), pid) - - for server, fs := range corkFiles { - totalFileNum := len(*fs) - descs := make([]*dcSDK.FileDesc, 0, totalFileNum) - for _, v := range *fs { - descs = append(descs, v.file) - } - results := m.checkOrLockCorkFiles(server, descs, retry) - blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server", - len(results), len(descs), count, m.work.ID(), pid) - needSendCorkFiles := make([]*corkFile, 0, totalFileNum) - for i, v := range results { - if v.match { - // 已发送完成的不启动协程了 - if v.info.SendStatus == types.FileSendSucceed { - wg <- nil - continue - } else if v.info.SendStatus == types.FileSendFailed { - wg <- types.ErrSendFileFailed - continue + if count > 0 { + receiveResult := make(chan error) + + // 启动接收协程 + go func() { + for i := 0; i < count; i++ { + if err = <-wg; err != nil { + blog.Warnf("remote: failed to ensure multi %d files for work(%s) from pid(%d) to server with err:%v", + count, m.work.ID(), pid, err) + + // 异常情况下启动一个协程将剩余消息收完,避免发送协程阻塞 + i++ + if i < count { + go func(i, count int, c <-chan error) { + for ; i < count; i++ { + _ = <-c + } + }(i, count, wg) } - } else { - // 不在缓存,意味着之前没有发送过 - (*fs)[i].resultchan = make(chan corkFileResult, 1) - needSendCorkFiles = append(needSendCorkFiles, (*fs)[i]) - } - // 启动协程跟踪未发送完成的文件 - c := (*fs)[i] - go func(err chan<- error, c *corkFile, r matchResult, retry bool) { - err <- m.ensureSingleCorkFile(c, r, retry) - }(wg, c, v, retry) + receiveResult <- err + return + } } + receiveResult <- nil + return + }() + + // 发送 + if m.conf.SendCork { + // 批量发送模式 + blog.Debugf("remote: ready to ensure multi %d cork files for work(%s) from pid(%d) to server", + count, m.work.ID(), pid) + + for server, fs := range corkFiles { + totalFileNum := len(*fs) + descs := make([]*dcSDK.FileDesc, 0, totalFileNum) + for _, v := range *fs { + descs = append(descs, v.file) + } + results := m.checkOrLockCorkFiles(server, descs, retry) + blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server", + len(results), len(descs), count, m.work.ID(), pid) + needSendCorkFiles := make([]*corkFile, 0, totalFileNum) + for i, v := range results { + if v.match { + // 已发送完成的不启动协程了 + if v.info.SendStatus == types.FileSendSucceed { + wg <- nil + continue + } else if v.info.SendStatus == types.FileSendFailed { + wg <- types.ErrSendFileFailed + continue + } + } else { + // 不在缓存,意味着之前没有发送过 + (*fs)[i].resultchan = make(chan corkFileResult, 1) + needSendCorkFiles = append(needSendCorkFiles, (*fs)[i]) + } - // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache - - blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles)) - // append to cork files queue - _ = m.appendCorkFiles(server, needSendCorkFiles) - - // notify send - m.sendCorkChan <- true - } - - // // same with corkFiles, but do not notify wg - // for server, fs := range allServerCorkFiles { - // totalFileNum := len(*fs) - // descs := make([]*dcSDK.FileDesc, 0, totalFileNum) - // for _, v := range *fs { - // descs = append(descs, v.file) - // } - // results := m.checkOrLockCorkFiles(server, descs) - // needSendCorkFiles := make([]*corkFile, 0, totalFileNum) - // for i, v := range results { - // if v.match { - // // 已发送完成的不启动协程了 - // if v.info.SendStatus == types.FileSendSucceed { - // wg <- nil - // continue - // } else if v.info.SendStatus == types.FileSendFailed { - // wg <- nil - // continue - // } - // } else { - // // 不在缓存,意味着之前没有发送过 - // (*fs)[i].resultchan = make(chan corkFileResult, 1) - // needSendCorkFiles = append(needSendCorkFiles, (*fs)[i]) - // } - - // // 启动协程跟踪未发送完成的文件 - // c := (*fs)[i] - // go func(err chan<- error, c *corkFile, r matchResult) { - // err <- m.ensureSingleCorkFile(c, r) - // }(wg, c, v) - // } - - // blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles)) - // // append to cork files queue - // _ = m.appendCorkFiles(server, needSendCorkFiles) - - // // notify send - // m.sendCorkChan <- true - // } - } + // 启动协程跟踪未发送完成的文件 + c := (*fs)[i] + go func(err chan<- error, c *corkFile, r matchResult, retry bool) { + err <- m.ensureSingleCorkFile(c, r, retry) + }(wg, c, v, retry) + } - for i := 0; i < count; i++ { - if err = <-wg; err != nil { - blog.Warnf("remote: failed to ensure multi %d files for work(%s) from pid(%d) to server with err:%v", - count, m.work.ID(), pid, err) + // TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache - // 异常情况下启动一个协程将消息收完,避免发送协程阻塞 - i++ - if i < count { - go func(i, count int, c <-chan error) { - for ; i < count; i++ { - _ = <-c + blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles)) + // append to cork files queue + _ = m.appendCorkFiles(server, needSendCorkFiles) + + // notify send + m.sendCorkChan <- true + } + } else { + // 单个文件发送模式 + for _, f := range singleFiles { + sender := &dcSDK.BKDistFileSender{Files: []dcSDK.FileDesc{*f.file}} + go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) { + t := time.Now().Local() + err <- m.ensureSingleFile(handler, host, req, sandbox, retry) + d := time.Now().Local().Sub(t) + if d > 200*time.Millisecond { + blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s", + m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath) } - }(i, count, wg) + }(wg, f.host, sender, retry) } + } + // 等待接收协程完成或者报错 + err := <-receiveResult + if err != nil { return nil, err } + + return r, nil } + blog.Infof("remote: success to ensure multi %d files for work(%s) from pid(%d) to server", count, m.work.ID(), pid) - // for _, f := range cleaner { - // go m.fileMessageBank.clean(f) - // } - return r, nil }