From 60c9b25690498cfbe1580a7526b2c1e7c781840b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 31 Aug 2018 10:32:50 +0800 Subject: [PATCH] *: remove goroutine pool goroutine pool was introduced to handle stack copy cost, Go1.11 many optimizations for stack copy, after upgrading to Go1.1, goroutine pool is not necessary any more. --- distsql/select_result.go | 9 +-- store/tikv/2pc.go | 15 ++-- store/tikv/coprocessor.go | 9 +-- store/tikv/rawkv.go | 10 +-- store/tikv/snapshot.go | 7 +- util/goroutine_pool/fake.go | 30 -------- util/goroutine_pool/gp.go | 124 ------------------------------ util/goroutine_pool/gp_test.go | 135 --------------------------------- 8 files changed, 15 insertions(+), 324 deletions(-) delete mode 100644 util/goroutine_pool/fake.go delete mode 100644 util/goroutine_pool/gp.go delete mode 100644 util/goroutine_pool/gp_test.go diff --git a/distsql/select_result.go b/distsql/select_result.go index 215b114e7b743..ecdf76a79a3ad 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/goroutine_pool" "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" ) @@ -35,10 +34,6 @@ var ( _ SelectResult = (*streamResult)(nil) ) -var ( - selectResultGP = gp.New(time.Minute * 2) -) - // SelectResult is an iterator of coprocessor partial results. type SelectResult interface { // Fetch fetches partial results from client. @@ -75,9 +70,7 @@ type selectResult struct { } func (r *selectResult) Fetch(ctx context.Context) { - selectResultGP.Go(func() { - r.fetch(ctx) - }) + go r.fetch(ctx) } func (r *selectResult) fetch(ctx context.Context) { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 3fa61f1b405aa..086acb577b0fa 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/terror" - "github.com/pingcap/tidb/util/goroutine_pool" binlog "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -42,8 +41,6 @@ const ( actionCleanup twoPhaseCommitAction = 3 ) -var twoPhaseCommitGP = gp.New(3 * time.Minute) - func (ca twoPhaseCommitAction) String() string { switch ca { case actionPrewrite: @@ -225,13 +222,13 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } if action == actionCommit { // Commit secondary batches in background goroutine to reduce latency. - twoPhaseCommitGP.Go(func() { + go func() { e := c.doActionOnBatches(bo, action, batches) if e != nil { log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e) metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc() } - }) + }() } else { err = c.doActionOnBatches(bo, action, batches) } @@ -272,7 +269,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm for _, batch1 := range batches { batch := batch1 - twoPhaseCommitGP.Go(func() { + go func() { if action == actionCommit { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not @@ -288,7 +285,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm defer singleBatchCancel() ch <- singleBatchActionFunc(singleBatchBackoffer, batch) } - }) + }() } var err error for i := 0; i < len(batches); i++ { @@ -571,7 +568,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { c.mu.RUnlock() if !committed && !undetermined { c.cleanWg.Add(1) - twoPhaseCommitGP.Go(func() { + go func() { err := c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff).WithVars(c.txn.vars), c.keys) if err != nil { metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc() @@ -580,7 +577,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS) } c.cleanWg.Done() - }) + }() } }() diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 34153fb9b60f1..1e9a6b8acf666 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -31,14 +31,11 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/execdetails" - "github.com/pingcap/tidb/util/goroutine_pool" "github.com/pingcap/tipb/go-tipb" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) -var copIteratorGP = gp.New(time.Minute) - // CopClient is coprocessor client. type CopClient struct { store *tikvStore @@ -452,9 +449,7 @@ func (it *copIterator) open(ctx context.Context) { finishCh: it.finishCh, vars: it.vars, } - copIteratorGP.Go(func() { - worker.run(ctx) - }) + go worker.run(ctx) } taskSender := &copIteratorTaskSender{ taskCh: taskCh, @@ -463,7 +458,7 @@ func (it *copIterator) open(ctx context.Context) { finishCh: it.finishCh, } taskSender.respChan = it.respChan - copIteratorGP.Go(taskSender.run) + go taskSender.run() } func (sender *copIteratorTaskSender) run() { diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index 41b72a1986859..cdaf29cc35a15 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -23,12 +23,10 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/pingcap/tidb/util/goroutine_pool" "golang.org/x/net/context" ) var ( - rawKVClientGP = gp.New(3 * time.Minute) // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. MaxRawKVScanLimit = 10240 // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. @@ -349,11 +347,11 @@ func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc ches := make(chan singleBatchResp, len(batches)) for _, batch := range batches { batch1 := batch - rawKVClientGP.Go(func() { + go func() { singleBatchBackoffer, singleBatchCancel := bo.Fork() defer singleBatchCancel() ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType) - }) + }() } var firstError error @@ -507,11 +505,11 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error { ch := make(chan error, len(batches)) for _, batch := range batches { batch1 := batch - rawKVClientGP.Go(func() { + go func() { singleBatchBackoffer, singleBatchCancel := bo.Fork() defer singleBatchCancel() ch <- c.doBatchPut(singleBatchBackoffer, batch1) - }) + }() } for i := 0; i < len(batches); i++ { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index c51983495cde3..e4f55d1600a15 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/util/goroutine_pool" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -51,8 +50,6 @@ type tikvSnapshot struct { vars *kv.Variables } -var snapshotGP = gp.New(time.Minute) - // newTiKVSnapshot creates a snapshot of an TiKV store. func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { return &tikvSnapshot{ @@ -123,11 +120,11 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle ch := make(chan error) for _, batch1 := range batches { batch := batch1 - snapshotGP.Go(func() { + go func() { backoffer, cancel := bo.Fork() defer cancel() ch <- s.batchGetSingleRegion(backoffer, batch, collectF) - }) + }() } for i := 0; i < len(batches); i++ { if e := <-ch; e != nil { diff --git a/util/goroutine_pool/fake.go b/util/goroutine_pool/fake.go deleted file mode 100644 index 2465572c142a8..0000000000000 --- a/util/goroutine_pool/fake.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -// +build leak - -package gp - -import "time" - -type Pool struct{} - -// New returns a new *Pool object. -// When compile with leak flag, goroutine will not be reusing. -func New(idleTimeout time.Duration) *Pool { - return &Pool{} -} - -// Go run f() in a new goroutine. -func (pool *Pool) Go(f func()) { - go f() -} diff --git a/util/goroutine_pool/gp.go b/util/goroutine_pool/gp.go deleted file mode 100644 index 9d29bb5cc7bc3..0000000000000 --- a/util/goroutine_pool/gp.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -// +build !leak - -package gp - -import ( - "sync" - "sync/atomic" - "time" -) - -// Pool is a struct to represent goroutine pool. -type Pool struct { - stack []*goroutine - idleTimeout time.Duration - sync.Mutex -} - -// goroutine is actually a background goroutine, with a channel binded for communication. -type goroutine struct { - ch chan func() - status int32 -} - -const ( - statusIdle int32 = 0 - statusInUse int32 = 1 - statusDead int32 = 2 -) - -// New returns a new *Pool object. -func New(idleTimeout time.Duration) *Pool { - pool := &Pool{ - idleTimeout: idleTimeout, - stack: make([]*goroutine, 0, 64), - } - return pool -} - -// Go works like go func(), but goroutines are pooled for reusing. -// This strategy can avoid runtime.morestack, because pooled goroutine is already enlarged. -func (pool *Pool) Go(f func()) { - for { - g := pool.get() - if atomic.CompareAndSwapInt32(&g.status, statusIdle, statusInUse) { - g.ch <- f - return - } - // Status already changed from statusIdle => statusDead, drop it, find next one. - } -} - -func (pool *Pool) get() *goroutine { - pool.Lock() - if len(pool.stack) == 0 { - pool.Unlock() - return pool.alloc() - } - - ret := pool.stack[len(pool.stack)-1] - pool.stack = pool.stack[:len(pool.stack)-1] - pool.Unlock() - return ret -} - -func (pool *Pool) alloc() *goroutine { - g := &goroutine{ - ch: make(chan func()), - } - go g.workLoop(pool) - return g -} - -func (g *goroutine) put(pool *Pool) { - g.status = statusIdle - pool.Lock() - - // Recycle dead goroutine space. - i := 0 - for ; i < len(pool.stack) && atomic.LoadInt32(&pool.stack[i].status) == statusDead; i++ { - } - pool.stack = append(pool.stack[i:], g) - pool.Unlock() -} - -func (g *goroutine) workLoop(pool *Pool) { - timer := time.NewTimer(pool.idleTimeout) - for { - select { - case <-timer.C: - // Check to avoid a corner case that the goroutine is take out from pool, - // and get this signal at the same time. - succ := atomic.CompareAndSwapInt32(&g.status, statusIdle, statusDead) - if succ { - return - } - case work := <-g.ch: - work() - // Put g back to the pool. - // This is the normal usage for a resource pool: - // - // obj := pool.get() - // use(obj) - // pool.put(obj) - // - // But when goroutine is used as a resource, we can't pool.put() immediately, - // because the resource(goroutine) maybe still in use. - // So, put back resource is done here, when the goroutine finish its work. - g.put(pool) - } - timer.Reset(pool.idleTimeout) - } -} diff --git a/util/goroutine_pool/gp_test.go b/util/goroutine_pool/gp_test.go deleted file mode 100644 index 46cd5f438fa98..0000000000000 --- a/util/goroutine_pool/gp_test.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -// +build !leak - -package gp - -import ( - "sync" - "testing" - "time" -) - -func TestBasicAPI(t *testing.T) { - gp := New(time.Second) - var wg sync.WaitGroup - wg.Add(1) - // cover alloc() - gp.Go(func() { wg.Done() }) - // cover put() - wg.Wait() - // cover get() - gp.Go(func() {}) -} - -func TestGC(t *testing.T) { - gp := New(200 * time.Millisecond) - var wg sync.WaitGroup - wg.Add(100) - for i := 0; i < 100; i++ { - idx := i - gp.Go(func() { - time.Sleep(time.Duration(idx+1) * time.Millisecond) - wg.Done() - }) - } - wg.Wait() - - time.Sleep(300 * time.Millisecond) - gp.Go(func() {}) // To trigger count change. - - gp.Lock() - count := len(gp.stack) - gp.Unlock() - if count > 1 { - t.Error("all goroutines should be recycled", count) - } -} - -func TestRace(t *testing.T) { - gp := New(8 * time.Millisecond) - var wg sync.WaitGroup - begin := make(chan struct{}) - wg.Add(500) - for i := 0; i < 50; i++ { - go func() { - <-begin - for i := 0; i < 10; i++ { - gp.Go(func() { - wg.Done() - }) - time.Sleep(5 * time.Millisecond) - } - }() - } - close(begin) - wg.Wait() -} - -func BenchmarkGoPool(b *testing.B) { - gp := New(20 * time.Second) - for i := 0; i < b.N/2; i++ { - gp.Go(func() {}) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - gp.Go(dummy) - } -} - -func BenchmarkGo(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - go dummy() - } -} - -func dummy() { -} - -func BenchmarkMorestackPool(b *testing.B) { - gp := New(5 * time.Second) - b.ResetTimer() - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - wg.Add(1) - gp.Go(func() { - morestack(false) - wg.Done() - }) - wg.Wait() - } -} - -func BenchmarkMoreStack(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - var wg sync.WaitGroup - wg.Add(1) - go func() { - morestack(false) - wg.Done() - }() - wg.Wait() - } -} - -func morestack(f bool) { - var stack [8 * 1024]byte - if f { - for i := 0; i < len(stack); i++ { - stack[i] = 'a' - } - } -}