Skip to content

Commit

Permalink
fix: opt pump cache, issue: TencentBlueKing#296
Browse files Browse the repository at this point in the history
  • Loading branch information
tbs60 committed Sep 14, 2024
1 parent 0325847 commit 1528559
Showing 1 changed file with 99 additions and 143 deletions.
242 changes: 99 additions & 143 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ func (m *Mgr) ensureFiles(
r := make([]string, 0, 10)
// cleaner := make([]dcSDK.FileDesc, 0, 10)
corkFiles := make(map[string]*[]*corkFile, 0)
// 单文件发送模式,复用corkFile结构体
singleFiles := make([]*corkFile, 0)
// allServerCorkFiles := make(map[string]*[]*corkFile, 0)
filesNum := len(fileDetails)
for _, fd := range fileDetails {
Expand Down Expand Up @@ -751,15 +753,14 @@ func (m *Mgr) ensureFiles(
}
count++
if !m.conf.SendCork {
go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) {
t := time.Now().Local()
err <- m.ensureSingleFile(handler, host, req, sandbox, retry)
d := time.Now().Local().Sub(t)
if d > 200*time.Millisecond {
blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s",
m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath)
}
}(wg, s, sender, retry)
// 复用corkFile结构体保存待发送文件列表
singleFiles = append(singleFiles, &corkFile{
handler: handler,
host: s,
sandbox: sandbox,
file: &f,
resultchan: nil,
})
} else {
// for send cork
cf := &corkFile{
Expand All @@ -772,7 +773,6 @@ func (m *Mgr) ensureFiles(
l, ok := corkFiles[s.Server]
if !ok {
// 预先分配好队列,避免频繁内存分配
// newl := []*corkFile{cf}
newl := make([]*corkFile, 0, filesNum)
newl = append(newl, cf)
corkFiles[s.Server] = &newl
Expand All @@ -781,156 +781,112 @@ func (m *Mgr) ensureFiles(
}
}
}

// // 分发额外的内容
// for _, s := range servers {
// count++
// if !m.conf.SendCork {
// go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender) {
// t := time.Now().Local()
// err <- m.ensureSingleFile(handler, host, req, sandbox)
// d := time.Now().Local().Sub(t)
// if d > 200*time.Millisecond {
// blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s",
// m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath)
// }
// }(wg, s, sender)
// } else {
// // for send cork
// cf := &corkFile{
// handler: handler,
// host: s,
// sandbox: sandbox,
// file: &f,
// resultchan: nil,
// }
// l, ok := allServerCorkFiles[s.Server]
// if !ok {
// // 预先分配好队列,避免频繁内存分配
// // newl := []*corkFile{cf}
// newl := make([]*corkFile, 0, filesNum)
// newl = append(newl, cf)
// allServerCorkFiles[s.Server] = &newl
// } else {
// *l = append(*l, cf)
// }
// }
// }
}

if m.conf.SendCork {
blog.Debugf("remote: ready to ensure multi %d cork files for work(%s) from pid(%d) to server",
count, m.work.ID(), pid)

for server, fs := range corkFiles {
totalFileNum := len(*fs)
descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
for _, v := range *fs {
descs = append(descs, v.file)
}
results := m.checkOrLockCorkFiles(server, descs, retry)
blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server",
len(results), len(descs), count, m.work.ID(), pid)
needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
for i, v := range results {
if v.match {
// 已发送完成的不启动协程了
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
if count > 0 {
receiveResult := make(chan error)

// 启动接收协程
go func() {
for i := 0; i < count; i++ {
if err = <-wg; err != nil {
blog.Warnf("remote: failed to ensure multi %d files for work(%s) from pid(%d) to server with err:%v",
count, m.work.ID(), pid, err)

// 异常情况下启动一个协程将剩余消息收完,避免发送协程阻塞
i++
if i < count {
go func(i, count int, c <-chan error) {
for ; i < count; i++ {
_ = <-c
}
}(i, count, wg)
}
} else {
// 不在缓存,意味着之前没有发送过
(*fs)[i].resultchan = make(chan corkFileResult, 1)
needSendCorkFiles = append(needSendCorkFiles, (*fs)[i])
}

// 启动协程跟踪未发送完成的文件
c := (*fs)[i]
go func(err chan<- error, c *corkFile, r matchResult, retry bool) {
err <- m.ensureSingleCorkFile(c, r, retry)
}(wg, c, v, retry)
receiveResult <- err
return
}
}
receiveResult <- nil
return
}()

// 发送
if m.conf.SendCork {
// 批量发送模式
blog.Debugf("remote: ready to ensure multi %d cork files for work(%s) from pid(%d) to server",
count, m.work.ID(), pid)

for server, fs := range corkFiles {
totalFileNum := len(*fs)
descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
for _, v := range *fs {
descs = append(descs, v.file)
}
results := m.checkOrLockCorkFiles(server, descs, retry)
blog.Debugf("remote: got %d results for %d cork files count:%d for work(%s) from pid(%d) to server",
len(results), len(descs), count, m.work.ID(), pid)
needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
for i, v := range results {
if v.match {
// 已发送完成的不启动协程了
if v.info.SendStatus == types.FileSendSucceed {
wg <- nil
continue
} else if v.info.SendStatus == types.FileSendFailed {
wg <- types.ErrSendFileFailed
continue
}
} else {
// 不在缓存,意味着之前没有发送过
(*fs)[i].resultchan = make(chan corkFileResult, 1)
needSendCorkFiles = append(needSendCorkFiles, (*fs)[i])
}

// TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache

blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles))
// append to cork files queue
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
}

// // same with corkFiles, but do not notify wg
// for server, fs := range allServerCorkFiles {
// totalFileNum := len(*fs)
// descs := make([]*dcSDK.FileDesc, 0, totalFileNum)
// for _, v := range *fs {
// descs = append(descs, v.file)
// }
// results := m.checkOrLockCorkFiles(server, descs)
// needSendCorkFiles := make([]*corkFile, 0, totalFileNum)
// for i, v := range results {
// if v.match {
// // 已发送完成的不启动协程了
// if v.info.SendStatus == types.FileSendSucceed {
// wg <- nil
// continue
// } else if v.info.SendStatus == types.FileSendFailed {
// wg <- nil
// continue
// }
// } else {
// // 不在缓存,意味着之前没有发送过
// (*fs)[i].resultchan = make(chan corkFileResult, 1)
// needSendCorkFiles = append(needSendCorkFiles, (*fs)[i])
// }

// // 启动协程跟踪未发送完成的文件
// c := (*fs)[i]
// go func(err chan<- error, c *corkFile, r matchResult) {
// err <- m.ensureSingleCorkFile(c, r)
// }(wg, c, v)
// }

// blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles))
// // append to cork files queue
// _ = m.appendCorkFiles(server, needSendCorkFiles)

// // notify send
// m.sendCorkChan <- true
// }
}
// 启动协程跟踪未发送完成的文件
c := (*fs)[i]
go func(err chan<- error, c *corkFile, r matchResult, retry bool) {
err <- m.ensureSingleCorkFile(c, r, retry)
}(wg, c, v, retry)
}

for i := 0; i < count; i++ {
if err = <-wg; err != nil {
blog.Warnf("remote: failed to ensure multi %d files for work(%s) from pid(%d) to server with err:%v",
count, m.work.ID(), pid, err)
// TODO : 检查是否在server端有缓存了,如果有,则无需发送,调用 checkBatchCache

// 异常情况下启动一个协程将消息收完,避免发送协程阻塞
i++
if i < count {
go func(i, count int, c <-chan error) {
for ; i < count; i++ {
_ = <-c
blog.Debugf("total %d cork files, need send %d files", totalFileNum, len(needSendCorkFiles))
// append to cork files queue
_ = m.appendCorkFiles(server, needSendCorkFiles)

// notify send
m.sendCorkChan <- true
}
} else {
// 单个文件发送模式
for _, f := range singleFiles {
sender := &dcSDK.BKDistFileSender{Files: []dcSDK.FileDesc{*f.file}}
go func(err chan<- error, host *dcProtocol.Host, req *dcSDK.BKDistFileSender, retry bool) {
t := time.Now().Local()
err <- m.ensureSingleFile(handler, host, req, sandbox, retry)
d := time.Now().Local().Sub(t)
if d > 200*time.Millisecond {
blog.Debugf("remote: single file cost time for work(%s) from pid(%d) to server(%s): %s, %s",
m.work.ID(), pid, host.Server, d.String(), req.Files[0].FilePath)
}
}(i, count, wg)
}(wg, f.host, sender, retry)
}
}

// 等待接收协程完成或者报错
err := <-receiveResult
if err != nil {
return nil, err
}

return r, nil
}

blog.Infof("remote: success to ensure multi %d files for work(%s) from pid(%d) to server",
count, m.work.ID(), pid)

// for _, f := range cleaner {
// go m.fileMessageBank.clean(f)
// }

return r, nil
}

Expand Down

0 comments on commit 1528559

Please sign in to comment.