Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove goroutine pool #7564

Merged
merged 2 commits into from
Aug 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand All @@ -35,10 +34,6 @@ var (
_ SelectResult = (*streamResult)(nil)
)

var (
selectResultGP = gp.New(time.Minute * 2)
)

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Fetch fetches partial results from client.
Expand Down Expand Up @@ -75,9 +70,7 @@ type selectResult struct {
}

func (r *selectResult) Fetch(ctx context.Context) {
selectResultGP.Go(func() {
r.fetch(ctx)
})
go r.fetch(ctx)
}

func (r *selectResult) fetch(ctx context.Context) {
Expand Down
15 changes: 6 additions & 9 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand All @@ -42,8 +41,6 @@ const (
actionCleanup twoPhaseCommitAction = 3
)

var twoPhaseCommitGP = gp.New(3 * time.Minute)

func (ca twoPhaseCommitAction) String() string {
switch ca {
case actionPrewrite:
Expand Down Expand Up @@ -225,13 +222,13 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
twoPhaseCommitGP.Go(func() {
go func() {
e := c.doActionOnBatches(bo, action, batches)
if e != nil {
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
}
})
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
Expand Down Expand Up @@ -272,7 +269,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
for _, batch1 := range batches {

batch := batch1
twoPhaseCommitGP.Go(func() {
go func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
Expand All @@ -288,7 +285,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
})
}()
}
var err error
for i := 0; i < len(batches); i++ {
Expand Down Expand Up @@ -571,7 +568,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.mu.RUnlock()
if !committed && !undetermined {
c.cleanWg.Add(1)
twoPhaseCommitGP.Go(func() {
go func() {
err := c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
if err != nil {
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
Expand All @@ -580,7 +577,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
}
c.cleanWg.Done()
})
}()
}
}()

Expand Down
9 changes: 2 additions & 7 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var copIteratorGP = gp.New(time.Minute)

// CopClient is coprocessor client.
type CopClient struct {
store *tikvStore
Expand Down Expand Up @@ -452,9 +449,7 @@ func (it *copIterator) open(ctx context.Context) {
finishCh: it.finishCh,
vars: it.vars,
}
copIteratorGP.Go(func() {
worker.run(ctx)
})
go worker.run(ctx)
}
taskSender := &copIteratorTaskSender{
taskCh: taskCh,
Expand All @@ -463,7 +458,7 @@ func (it *copIterator) open(ctx context.Context) {
finishCh: it.finishCh,
}
taskSender.respChan = it.respChan
copIteratorGP.Go(taskSender.run)
go taskSender.run()
}

func (sender *copIteratorTaskSender) run() {
Expand Down
10 changes: 4 additions & 6 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
"golang.org/x/net/context"
)

var (
rawKVClientGP = gp.New(3 * time.Minute)
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
Expand Down Expand Up @@ -349,11 +347,11 @@ func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc
ches := make(chan singleBatchResp, len(batches))
for _, batch := range batches {
batch1 := batch
rawKVClientGP.Go(func() {
go func() {
singleBatchBackoffer, singleBatchCancel := bo.Fork()
defer singleBatchCancel()
ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType)
})
}()
}

var firstError error
Expand Down Expand Up @@ -507,11 +505,11 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
ch := make(chan error, len(batches))
for _, batch := range batches {
batch1 := batch
rawKVClientGP.Go(func() {
go func() {
singleBatchBackoffer, singleBatchCancel := bo.Fork()
defer singleBatchCancel()
ch <- c.doBatchPut(singleBatchBackoffer, batch1)
})
}()
}

for i := 0; i < len(batches); i++ {
Expand Down
7 changes: 2 additions & 5 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/goroutine_pool"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand All @@ -51,8 +50,6 @@ type tikvSnapshot struct {
vars *kv.Variables
}

var snapshotGP = gp.New(time.Minute)

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
return &tikvSnapshot{
Expand Down Expand Up @@ -123,11 +120,11 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
ch := make(chan error)
for _, batch1 := range batches {
batch := batch1
snapshotGP.Go(func() {
go func() {
backoffer, cancel := bo.Fork()
defer cancel()
ch <- s.batchGetSingleRegion(backoffer, batch, collectF)
})
}()
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
Expand Down
30 changes: 0 additions & 30 deletions util/goroutine_pool/fake.go

This file was deleted.

124 changes: 0 additions & 124 deletions util/goroutine_pool/gp.go

This file was deleted.

Loading