From 4f3292a2dd13c2150d1ada99ac637b2831a9e69d Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 14 Jul 2017 00:20:55 +0800 Subject: [PATCH 1/7] util/goroutine_pool: add a goroutine pool package utilities --- util/goroutine_pool/gp.go | 181 +++++++++++++++++++++++++++++++++ util/goroutine_pool/gp_test.go | 131 ++++++++++++++++++++++++ 2 files changed, 312 insertions(+) create mode 100644 util/goroutine_pool/gp.go create mode 100644 util/goroutine_pool/gp_test.go diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go new file mode 100644 index 0000000000000..7287f0bb7fc7d --- /dev/null +++ b/util/goroutine_pool/gp.go @@ -0,0 +1,181 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package gp + +import ( + "sync" + "time" +) + +// Pool is a struct to represent goroutine pool. +type Pool struct { + head goroutine + tail *goroutine + count int + idleTimeout time.Duration + sync.Mutex + + // gcWorker marks whether there is a gcWorker currently. + // only gc worker goroutine can modify it, others just read it. + gcWorker struct { + sync.RWMutex + value bool + } +} + +// goroutine is actually a background goroutine, with a channel binded for communication. +type goroutine struct { + ch chan func() + lastRun time.Time + pool *Pool + next *goroutine +} + +// New returns a new *Pool object. +func New(idleTimeout time.Duration) *Pool { + pool := &Pool{ + idleTimeout: idleTimeout, + } + pool.tail = &pool.head + return pool +} + +// Go works like go func(), but goroutines are pooled for reusing. +// This strategy can avoid runtime.morestack, because pooled goroutine is already enlarged. +func (pool *Pool) Go(f func()) { + g := pool.get() + g.ch <- f + // When the goroutine finish f(), it will be put back to pool automatically, + // so it doesn't need to call pool.put() here. +} + +func (pool *Pool) get() *goroutine { + pool.Lock() + head := &pool.head + if head.next == nil { + pool.Unlock() + return pool.alloc() + } + + ret := head.next + head.next = ret.next + if ret == pool.tail { + pool.tail = head + } + pool.count-- + pool.Unlock() + ret.next = nil + return ret +} + +func (pool *Pool) put(p *goroutine) { + p.next = nil + pool.Lock() + pool.tail.next = p + pool.tail = p + pool.count++ + pool.Unlock() + + pool.gcWorker.RLock() + gcWorker := pool.gcWorker.value + pool.gcWorker.RUnlock() + if !gcWorker { + go pool.gcLoop() + } +} + +func (pool *Pool) alloc() *goroutine { + g := &goroutine{ + ch: make(chan func()), + pool: pool, + } + go func(g *goroutine) { + for work := range g.ch { + work() + g.lastRun = time.Now() + // Put g back to the pool. + // This is the normal usage for a resource pool: + // + // obj := pool.get() + // use(obj) + // pool.put(obj) + // + // But when goroutine is used as a resource, we can't pool.put() immediately, + // because the resource(goroutine) maybe still in use. + // So, put back resource is done here, when the goroutine finish its work. + pool.put(g) + } + }(g) + return g +} + +func (pool *Pool) gcLoop() { + pool.gcWorker.Lock() + if pool.gcWorker.value == true { + pool.gcWorker.Unlock() + return + } + pool.gcWorker.value = true + pool.gcWorker.Unlock() + + for { + finish, more := pool.gcOnce(30) + if finish { + pool.gcWorker.Lock() + pool.gcWorker.value = false + pool.gcWorker.Unlock() + return + } + if more { + time.Sleep(min(pool.idleTimeout/10, 500*time.Millisecond)) + } else { + time.Sleep(min(5*time.Second, pool.idleTimeout/3)) + } + } +} + +// gcOnce runs gc once, recycles at most count goroutines. +// finish indicates there're no more goroutines in the pool after gc, +// more indicates there're still many goroutines to be recycled. +func (pool *Pool) gcOnce(count int) (finish bool, more bool) { + now := time.Now() + i := 0 + pool.Lock() + head := &pool.head + for head.next != nil && i < count { + save := head.next + duration := now.Sub(save.lastRun) + if duration < pool.idleTimeout { + break + } + close(save.ch) + head.next = save.next + pool.count-- + i++ + } + if head.next == nil { + finish = true + pool.tail = head + } + pool.Unlock() + more = (i == count) + return +} + +func min(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} diff --git a/util/goroutine_pool/gp_test.go b/util/goroutine_pool/gp_test.go new file mode 100644 index 0000000000000..4de730ce5ad5b --- /dev/null +++ b/util/goroutine_pool/gp_test.go @@ -0,0 +1,131 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package gp + +import ( + "sync" + "testing" + "time" +) + +func TestBasicAPI(t *testing.T) { + gp := New(time.Second) + var wg sync.WaitGroup + wg.Add(1) + // cover alloc() + gp.Go(func() { wg.Done() }) + // cover put() + wg.Wait() + // cover get() + gp.Go(func() {}) +} + +func TestGC(t *testing.T) { + gp := New(200 * time.Millisecond) + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + idx := i + gp.Go(func() { + time.Sleep(time.Duration(idx+1) * time.Millisecond) + wg.Done() + }) + } + wg.Wait() + time.Sleep(300 * time.Millisecond) + gp.Lock() + count := gp.count + gp.Unlock() + if count != 0 { + t.Error("all goroutines should be recycled") + } +} + +func TestRace(t *testing.T) { + gp := New(200 * time.Millisecond) + var wg sync.WaitGroup + begin := make(chan struct{}) + wg.Add(500) + for i := 0; i < 50; i++ { + go func() { + <-begin + for i := 0; i < 10; i++ { + gp.Go(func() { + wg.Done() + }) + time.Sleep(5 * time.Millisecond) + } + }() + } + close(begin) + wg.Wait() +} + +func BenchmarkGoPool(b *testing.B) { + gp := New(20 * time.Second) + for i := 0; i < b.N/2; i++ { + gp.Go(func() {}) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + gp.Go(dummy) + } +} + +func BenchmarkGo(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + go dummy() + } +} + +func dummy() { +} + +func BenchmarkMorestackPool(b *testing.B) { + gp := New(5 * time.Second) + b.ResetTimer() + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(1) + gp.Go(func() { + morestack(false) + wg.Done() + }) + wg.Wait() + } +} + +func BenchmarkMoreStack(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(1) + go func() { + morestack(false) + wg.Done() + }() + wg.Wait() + } +} + +func morestack(f bool) { + var stack [8 * 1024]byte + if f { + for i := 0; i < len(stack); i++ { + stack[i] = 'a' + } + } +} From 1592c7bc2873346565ab15ecfbc22749a775e014 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 18 Jul 2017 14:37:09 +0800 Subject: [PATCH 2/7] address comment --- util/goroutine_pool/gp.go | 2 +- util/goroutine_pool/gp_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go index 7287f0bb7fc7d..511b25ad7dd7c 100644 --- a/util/goroutine_pool/gp.go +++ b/util/goroutine_pool/gp.go @@ -1,4 +1,4 @@ -// Copyright 2016 PingCAP, Inc. +// Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/util/goroutine_pool/gp_test.go b/util/goroutine_pool/gp_test.go index 4de730ce5ad5b..b3adfffe26586 100644 --- a/util/goroutine_pool/gp_test.go +++ b/util/goroutine_pool/gp_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 PingCAP, Inc. +// Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From f24850f50353648ba8d27f289fab1b991500c029 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 9 Sep 2017 18:38:00 +0800 Subject: [PATCH 3/7] remove gcWorker --- util/goroutine_pool/gp.go | 143 ++++++++++++--------------------- util/goroutine_pool/gp_test.go | 5 +- 2 files changed, 53 insertions(+), 95 deletions(-) diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go index 511b25ad7dd7c..a4ba6999bc059 100644 --- a/util/goroutine_pool/gp.go +++ b/util/goroutine_pool/gp.go @@ -15,6 +15,7 @@ package gp import ( "sync" + "sync/atomic" "time" ) @@ -25,23 +26,23 @@ type Pool struct { count int idleTimeout time.Duration sync.Mutex - - // gcWorker marks whether there is a gcWorker currently. - // only gc worker goroutine can modify it, others just read it. - gcWorker struct { - sync.RWMutex - value bool - } } // goroutine is actually a background goroutine, with a channel binded for communication. type goroutine struct { - ch chan func() - lastRun time.Time - pool *Pool - next *goroutine + ch chan func() + pool *Pool + next *goroutine + status int32 } +const ( + statusIdle int32 = 0 + statusInUse int32 = 1 + statusDying int32 = 2 // Intermediate state used to avoid race: Idle => Dying => Dead + statusDead int32 = 3 +) + // New returns a new *Pool object. func New(idleTimeout time.Duration) *Pool { pool := &Pool{ @@ -54,7 +55,18 @@ func New(idleTimeout time.Duration) *Pool { // Go works like go func(), but goroutines are pooled for reusing. // This strategy can avoid runtime.morestack, because pooled goroutine is already enlarged. func (pool *Pool) Go(f func()) { - g := pool.get() + var g *goroutine + for { + g = pool.get() + if atomic.CompareAndSwapInt32(&g.status, statusIdle, statusInUse) { + break + } + // Status already changed from statusIdle => statusDying, delete this goroutine. + if atomic.LoadInt32(&g.status) == statusDying { + g.status = statusDead + } + } + g.ch <- f // When the goroutine finish f(), it will be put back to pool automatically, // so it doesn't need to call pool.put() here. @@ -85,14 +97,8 @@ func (pool *Pool) put(p *goroutine) { pool.tail.next = p pool.tail = p pool.count++ + p.status = statusIdle pool.Unlock() - - pool.gcWorker.RLock() - gcWorker := pool.gcWorker.value - pool.gcWorker.RUnlock() - if !gcWorker { - go pool.gcLoop() - } } func (pool *Pool) alloc() *goroutine { @@ -101,81 +107,32 @@ func (pool *Pool) alloc() *goroutine { pool: pool, } go func(g *goroutine) { - for work := range g.ch { - work() - g.lastRun = time.Now() - // Put g back to the pool. - // This is the normal usage for a resource pool: - // - // obj := pool.get() - // use(obj) - // pool.put(obj) - // - // But when goroutine is used as a resource, we can't pool.put() immediately, - // because the resource(goroutine) maybe still in use. - // So, put back resource is done here, when the goroutine finish its work. - pool.put(g) + timer := time.NewTimer(pool.idleTimeout) + for { + select { + case <-timer.C: + // Check to avoid a corner case that the goroutine is take out from pool, + // and get this signal at the same time. + succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDying) + if succ { + return + } + case work := <-g.ch: + work() + // Put g back to the pool. + // This is the normal usage for a resource pool: + // + // obj := pool.get() + // use(obj) + // pool.put(obj) + // + // But when goroutine is used as a resource, we can't pool.put() immediately, + // because the resource(goroutine) maybe still in use. + // So, put back resource is done here, when the goroutine finish its work. + pool.put(g) + } + timer.Reset(pool.idleTimeout) } }(g) return g } - -func (pool *Pool) gcLoop() { - pool.gcWorker.Lock() - if pool.gcWorker.value == true { - pool.gcWorker.Unlock() - return - } - pool.gcWorker.value = true - pool.gcWorker.Unlock() - - for { - finish, more := pool.gcOnce(30) - if finish { - pool.gcWorker.Lock() - pool.gcWorker.value = false - pool.gcWorker.Unlock() - return - } - if more { - time.Sleep(min(pool.idleTimeout/10, 500*time.Millisecond)) - } else { - time.Sleep(min(5*time.Second, pool.idleTimeout/3)) - } - } -} - -// gcOnce runs gc once, recycles at most count goroutines. -// finish indicates there're no more goroutines in the pool after gc, -// more indicates there're still many goroutines to be recycled. -func (pool *Pool) gcOnce(count int) (finish bool, more bool) { - now := time.Now() - i := 0 - pool.Lock() - head := &pool.head - for head.next != nil && i < count { - save := head.next - duration := now.Sub(save.lastRun) - if duration < pool.idleTimeout { - break - } - close(save.ch) - head.next = save.next - pool.count-- - i++ - } - if head.next == nil { - finish = true - pool.tail = head - } - pool.Unlock() - more = (i == count) - return -} - -func min(a, b time.Duration) time.Duration { - if a < b { - return a - } - return b -} diff --git a/util/goroutine_pool/gp_test.go b/util/goroutine_pool/gp_test.go index b3adfffe26586..f787fedc56d22 100644 --- a/util/goroutine_pool/gp_test.go +++ b/util/goroutine_pool/gp_test.go @@ -44,11 +44,12 @@ func TestGC(t *testing.T) { } wg.Wait() time.Sleep(300 * time.Millisecond) + gp.Go(func() {}) // To trigger count change. gp.Lock() count := gp.count gp.Unlock() - if count != 0 { - t.Error("all goroutines should be recycled") + if count != 1 { + t.Error("all goroutines should be recycled", count) } } From 25665dcc5964e607bf948b70b93aa23dcb1e63fb Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 9 Sep 2017 19:15:52 +0800 Subject: [PATCH 4/7] address comment --- util/goroutine_pool/gp.go | 76 +++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go index a4ba6999bc059..4e4ebfbc3e51f 100644 --- a/util/goroutine_pool/gp.go +++ b/util/goroutine_pool/gp.go @@ -31,7 +31,6 @@ type Pool struct { // goroutine is actually a background goroutine, with a channel binded for communication. type goroutine struct { ch chan func() - pool *Pool next *goroutine status int32 } @@ -91,48 +90,49 @@ func (pool *Pool) get() *goroutine { return ret } -func (pool *Pool) put(p *goroutine) { - p.next = nil +func (pool *Pool) alloc() *goroutine { + g := &goroutine{ + ch: make(chan func()), + } + go g.workLoop(pool) + return g +} + +func (g *goroutine) put(pool *Pool) { + g.status = statusIdle + g.next = nil pool.Lock() - pool.tail.next = p - pool.tail = p + pool.tail.next = g + pool.tail = g pool.count++ - p.status = statusIdle pool.Unlock() } -func (pool *Pool) alloc() *goroutine { - g := &goroutine{ - ch: make(chan func()), - pool: pool, - } - go func(g *goroutine) { - timer := time.NewTimer(pool.idleTimeout) - for { - select { - case <-timer.C: - // Check to avoid a corner case that the goroutine is take out from pool, - // and get this signal at the same time. - succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDying) - if succ { - return - } - case work := <-g.ch: - work() - // Put g back to the pool. - // This is the normal usage for a resource pool: - // - // obj := pool.get() - // use(obj) - // pool.put(obj) - // - // But when goroutine is used as a resource, we can't pool.put() immediately, - // because the resource(goroutine) maybe still in use. - // So, put back resource is done here, when the goroutine finish its work. - pool.put(g) +func (g *goroutine) workLoop(pool *Pool) { + timer := time.NewTimer(pool.idleTimeout) + for { + select { + case <-timer.C: + // Check to avoid a corner case that the goroutine is take out from pool, + // and get this signal at the same time. + succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDying) + if succ { + return } - timer.Reset(pool.idleTimeout) + case work := <-g.ch: + work() + // Put g back to the pool. + // This is the normal usage for a resource pool: + // + // obj := pool.get() + // use(obj) + // pool.put(obj) + // + // But when goroutine is used as a resource, we can't pool.put() immediately, + // because the resource(goroutine) maybe still in use. + // So, put back resource is done here, when the goroutine finish its work. + g.put(pool) } - }(g) - return g + timer.Reset(pool.idleTimeout) + } } From d62f92fa6592d3b1616a389c76b70e6f87409dad Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 9 Sep 2017 19:50:08 +0800 Subject: [PATCH 5/7] address comment --- util/goroutine_pool/gp.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go index 4e4ebfbc3e51f..0e9f9c12e8c26 100644 --- a/util/goroutine_pool/gp.go +++ b/util/goroutine_pool/gp.go @@ -38,8 +38,7 @@ type goroutine struct { const ( statusIdle int32 = 0 statusInUse int32 = 1 - statusDying int32 = 2 // Intermediate state used to avoid race: Idle => Dying => Dead - statusDead int32 = 3 + statusDead int32 = 2 ) // New returns a new *Pool object. @@ -60,10 +59,7 @@ func (pool *Pool) Go(f func()) { if atomic.CompareAndSwapInt32(&g.status, statusIdle, statusInUse) { break } - // Status already changed from statusIdle => statusDying, delete this goroutine. - if atomic.LoadInt32(&g.status) == statusDying { - g.status = statusDead - } + // Status already changed from statusIdle => statusDead, drop it, find next one. } g.ch <- f @@ -115,7 +111,7 @@ func (g *goroutine) workLoop(pool *Pool) { case <-timer.C: // Check to avoid a corner case that the goroutine is take out from pool, // and get this signal at the same time. - succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDying) + succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDead) if succ { return } From 3fd1c580cb5cc0b039158c0313c60555d25fca79 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 9 Sep 2017 19:58:22 +0800 Subject: [PATCH 6/7] address comment --- util/goroutine_pool/gp.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go index 0e9f9c12e8c26..fa12866a1f158 100644 --- a/util/goroutine_pool/gp.go +++ b/util/goroutine_pool/gp.go @@ -53,18 +53,14 @@ func New(idleTimeout time.Duration) *Pool { // Go works like go func(), but goroutines are pooled for reusing. // This strategy can avoid runtime.morestack, because pooled goroutine is already enlarged. func (pool *Pool) Go(f func()) { - var g *goroutine for { - g = pool.get() + g := pool.get() if atomic.CompareAndSwapInt32(&g.status, statusIdle, statusInUse) { - break + g.ch <- f + return } // Status already changed from statusIdle => statusDead, drop it, find next one. } - - g.ch <- f - // When the goroutine finish f(), it will be put back to pool automatically, - // so it doesn't need to call pool.put() here. } func (pool *Pool) get() *goroutine { From 6edd144d957afbfe3203dd4a9ee0281a882e754e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sat, 9 Sep 2017 20:18:26 +0800 Subject: [PATCH 7/7] address comment --- util/goroutine_pool/gp.go | 1 - util/goroutine_pool/gp_test.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go index fa12866a1f158..ff97093148833 100644 --- a/util/goroutine_pool/gp.go +++ b/util/goroutine_pool/gp.go @@ -92,7 +92,6 @@ func (pool *Pool) alloc() *goroutine { func (g *goroutine) put(pool *Pool) { g.status = statusIdle - g.next = nil pool.Lock() pool.tail.next = g pool.tail = g diff --git a/util/goroutine_pool/gp_test.go b/util/goroutine_pool/gp_test.go index f787fedc56d22..2952f182305a5 100644 --- a/util/goroutine_pool/gp_test.go +++ b/util/goroutine_pool/gp_test.go @@ -48,13 +48,13 @@ func TestGC(t *testing.T) { gp.Lock() count := gp.count gp.Unlock() - if count != 1 { + if count > 1 { t.Error("all goroutines should be recycled", count) } } func TestRace(t *testing.T) { - gp := New(200 * time.Millisecond) + gp := New(8 * time.Millisecond) var wg sync.WaitGroup begin := make(chan struct{}) wg.Add(500)