英文 | 中文
ants
是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
go get -u github.com/panjf2000/ants
go get -u github.com/panjf2000/ants/v2
写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 ants
,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
// Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
// If you use -1 as the pool size parameter, the size will be unlimited.
// There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
defer mp.ReleaseTimeout(5 * time.Second)
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = mp.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", mp.Running())
fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
myFunc(i)
wg.Done()
}, ants.LeastTasks)
defer mpf.ReleaseTimeout(5 * time.Second)
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = mpf.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", mpf.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500*2 {
panic("the final result is wrong!!!")
}
}
// Option represents the optional function.
type Option func(opts *Options)
// Options contains all options which will be applied when instantiating a ants pool.
type Options struct {
// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
// used for more than `ExpiryDuration`.
ExpiryDuration time.Duration
// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
Logger Logger
}
// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
// WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
// WithPreAlloc indicates whether it should malloc for workers.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
// WithNonblocking indicates that pool will return nil when there is no available workers.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
// WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
// WithLogger sets up a customized logger.
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
通过在调用NewPool
/NewPoolWithFunc
之时使用各种 optional function,可以设置ants.Options
中各个配置项的值,然后用它来定制化 goroutine pool.
ants
支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 NewPool
方法可以实例化一个新的带有指定容量的 Pool ,如下:
p, _ := ants.NewPool(10000)
提交任务通过调用 ants.Submit(func())
方法:
ants.Submit(func(){})
需要动态调整 goroutine 池容量可以通过调用Tune(int)
:
pool.Tune(1000) // Tune its capacity to 1000
pool.Tune(100000) // Tune its capacity to 100000
该方法是线程安全的。
ants
允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。
// ants will pre-malloc the whole capacity of pool when you invoke this function
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
pool.Release()
// 只要调用 Reboot() 方法,就可以重新激活一个之前已经被销毁掉的池,并且投入使用。
pool.Reboot()
ants
并不保证提交的任务被执行的顺序,执行的顺序也不是和提交的顺序保持一致,因为在 ants
是并发地处理所有提交的任务,提交的任务会被分派到正在并发运行的 workers 上去,因此那些任务将会被并发且无序地被执行。
-
BenchmarkGoroutine-4 代表原生 goroutine
-
BenchmarkPoolGroutine-4 代表使用 goroutine 池
ants
这里为了模拟大规模 goroutine 的场景,两次测试的并发次数分别是 100w 和 1000w,前两个测试分别是执行 100w 个并发任务不使用 Pool 和使用了ants
的 Goroutine Pool 的性能,后两个则是 1000w 个任务下的表现,可以直观的看出在执行速度和内存使用上,ants
的 Pool 都占有明显的优势。100w 的任务量,使用ants
,执行速度与原生 goroutine 相当甚至略快,但只实际使用了不到 5w 个 goroutine 完成了全部任务,且内存消耗仅为原生并发的 40%;而当任务量达到 1000w,优势则更加明显了:用了 70w 左右的 goroutine 完成全部任务,执行速度比原生 goroutine 提高了 100%,且内存消耗依旧保持在不使用 Pool 的 40% 左右。
因为PoolWithFunc
这个 Pool 只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于Pool
对原生 goroutine 在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生 goroutine 的 300%,而内存消耗的优势已经达到了两位数的差距,原生 goroutine 的内存消耗达到了ants
的35倍且原生 goroutine 的每次执行的内存分配次数也达到了ants
45倍,1000w 的任务量,ants
的初始分配容量是 5w,因此它完成了所有的任务依旧只使用了 5w 个 goroutine!事实上,ants
的 Goroutine Pool 的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。
从该 demo 测试吞吐性能对比可以看出,使用ants
的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。
请在提 PR 之前仔细阅读 Contributing Guidelines,感谢那些为 ants
贡献过代码的开发者!
ants
的源码允许用户在遵循 MIT 开源证书 规则的前提下使用。
- Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池
- Visually Understanding Worker Pool
- The Case For A Go Worker Pool
- Go Concurrency - GoRoutines, Worker Pools and Throttling Made Simple
以下公司/组织在生产环境上使用了 ants
。
这些开源项目借助 ants
进行并发编程。
- gnet: gnet 是一个高性能、轻量级、非阻塞的事件驱动 Go 网络框架。
- milvus: 一个高度灵活、可靠且速度极快的云原生开源向量数据库。
- nps: 一款轻量级、高性能、功能强大的内网穿透代理服务器。
- siyuan: 思源笔记是一款本地优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。
- osmedeus: A Workflow Engine for Offensive Security.
- jitsu: An open-source Segment alternative. Fully-scriptable data ingestion engine for modern data teams. Set-up a real-time data pipeline in minutes, not days.
- triangula: Generate high-quality triangulated and polygonal art from images.
- teler: Real-time HTTP Intrusion Detection.
- bsc: A Binance Smart Chain client based on the go-ethereum fork.
- jaeles: The Swiss Army knife for automated Web Application Testing.
- devlake: The open-source dev data platform & dashboard for your DevOps tools.
- matrixone: MatrixOne 是一款面向未来的超融合异构云原生数据库,通过超融合数据引擎支持事务/分析/流处理等混合工作负载,通过异构云原生架构支持跨机房协同/多地协同/云边协同。简化开发运维,消简数据碎片,打破数据的系统、位置和创新边界。
- bk-bcs: 蓝鲸容器管理平台(Blueking Container Service)定位于打造云原生技术和业务实际应用场景之间的桥梁;聚焦于复杂应用场景的容器化部署技术方案的研发、整合和产品化;致力于为游戏等复杂应用提供一站式、低门槛的容器编排和服务治理服务。
- trueblocks-core: TrueBlocks improves access to blockchain data for any EVM-compatible chain (particularly Ethereum mainnet) while remaining entirely local.
- openGemini: openGemini 是华为云开源的一款云原生分布式时序数据库,可广泛应用于物联网、车联网、运维监控、工业互联网等业务场景,具备卓越的读写性能和高效的数据分析能力,采用类SQL查询语言,无第三方软件依赖、安装简单、部署灵活、运维便捷。
- AdGuardDNS: AdGuard DNS is an alternative solution for tracker blocking, privacy protection, and parental control.
- WatchAD2.0: WatchAD2.0 是 360 信息安全中心开发的一款针对域安全的日志分析与监控系统,它可以收集所有域控上的事件日志、网络流量,通过特征匹配、协议分析、历史行为、敏感操作和蜜罐账户等方式来检测各种已知与未知威胁,功能覆盖了大部分目前的常见内网域渗透手法。
- vanus: Vanus is a Serverless, event streaming system with processing capabilities. It easily connects SaaS, Cloud Services, and Databases to help users build next-gen Event-driven Applications.
如果你的项目也在使用 ants
,欢迎给我提 Pull Request 来更新这份用户案例列表。
ants
项目一直以来都是在 JetBrains 公司旗下的 GoLand 集成开发环境中进行开发,基于 free JetBrains Open Source license(s) 正版免费授权,在此表达我的谢意。
如果有意向,可以通过每个月定量的少许捐赠来支持这个项目。
每月定量捐赠 10 刀即可成为本项目的赞助者,届时您的 logo 或者 link 可以展示在本项目的 README 上。
当您通过以下方式进行捐赠时,请务必留下姓名、GitHub 账号或其他社交媒体账号,以便我将其添加到捐赠者名单中,以表谢意。