Skip to content

Commit

Permalink
fix: Use singleton delete pool and avoid goroutine leakage
Browse files Browse the repository at this point in the history
Related to milvus-io#36887

Previously using newly create pool per request shall
cause goroutine leakage. This PR change this behavior
by using singleton delete pool. This change could also
provide better concurrency control over delete memory
usage.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Oct 28, 2024
1 parent 1e75a42 commit d314894
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
15 changes: 15 additions & 0 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ var (
warmupPool atomic.Pointer[conc.Pool[any]]
warmupOnce sync.Once

deletePool atomic.Pointer[conc.Pool[struct{}]]
deletePoolOnce sync.Once

bfPool atomic.Pointer[conc.Pool[any]]
bfApplyOnce sync.Once
)
Expand Down Expand Up @@ -131,6 +134,13 @@ func initBFApplyPool() {
})
}

func initDeletePool() {
deletePoolOnce.Do(func() {
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
deletePool.Store(pool)
})
}

// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
Expand Down Expand Up @@ -158,6 +168,11 @@ func GetBFApplyPool() *conc.Pool[any] {
return bfPool.Load()
}

func GetDeletePool() *conc.Pool[struct{}] {
initDeletePool()
return deletePool.Load()
}

func ResizeSQPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
Expand Down
3 changes: 1 addition & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package querynodev2
import (
"context"
"fmt"
"runtime"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -1423,7 +1422,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch

// control the execution batch parallel with P number
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
pool := segments.GetDeletePool()
futures := make([]*conc.Future[struct{}], 0, len(segs))
errSet := typeutil.NewConcurrentSet[int64]()

Expand Down

0 comments on commit d314894

Please sign in to comment.