Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use goroutine pool to avoid runtime.morestack #3753

Merged
merged 5 commits into from
Sep 22, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
Expand All @@ -35,6 +36,8 @@ var (
_ PartialResult = &partialResult{}
)

var selectResultGP = gp.New(2 * time.Minute)

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Next gets the next partial result.
Expand Down Expand Up @@ -73,7 +76,9 @@ type resultWithErr struct {
}

func (r *selectResult) Fetch(ctx goctx.Context) {
go r.fetch(ctx)
selectResultGP.Go(func() {
r.fetch(ctx)
})
}

func (r *selectResult) fetch(ctx goctx.Context) {
Expand Down
11 changes: 9 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
Expand All @@ -43,6 +44,8 @@ const (
minLogDuration = 50 * time.Millisecond
)

var xIndexSelectGP = gp.New(3 * time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use diffrent idleTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter whether it 2mins or 3mins or anything.


// LookupTableTaskChannelSize represents the channel size of the index double read taskChan.
var LookupTableTaskChannelSize int32 = 50

Expand Down Expand Up @@ -551,7 +554,9 @@ func (e *XSelectIndexExec) nextForDoubleRead() (Row, error) {
// e.taskChan serves as a pipeline, so fetching index and getting table data can
// run concurrently.
e.taskChan = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
go e.fetchHandles(idxResult, e.taskChan)
xIndexSelectGP.Go(func() {
e.fetchHandles(idxResult, e.taskChan)
})
}

for {
Expand Down Expand Up @@ -587,7 +592,9 @@ func (e *XSelectIndexExec) slowQueryInfo(duration time.Duration) string {

func (e *XSelectIndexExec) addWorker(workCh chan *lookupTableTask, concurrency *int, concurrencyLimit int) {
if *concurrency < concurrencyLimit {
go e.pickAndExecTask(workCh)
xIndexSelectGP.Go(func() {
e.pickAndExecTask(workCh)
})
*concurrency++
}
}
Expand Down
33 changes: 12 additions & 21 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-binlog"
goctx "golang.org/x/net/context"
)
Expand All @@ -41,6 +42,8 @@ const (
actionCleanup twoPhaseCommitAction = 3
)

var twoPhaseCommitGP = gp.New(3 * time.Minute)

func (ca twoPhaseCommitAction) String() string {
switch ca {
case actionPrewrite:
Expand Down Expand Up @@ -218,30 +221,18 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
go func() {
reserveStack(false)
twoPhaseCommitGP.Go(func() {
e := c.doActionOnBatches(bo, action, batches)
if e != nil {
log.Debugf("2PC async doActionOnBatches %s err: %v", action, e)
}
}()
})
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return errors.Trace(err)
}

// reserveStack reserves 4KB memory on the stack to avoid runtime.morestack, call it after new a goroutine if necessary.
func reserveStack(dummy bool) {
var buf [8 << 10]byte
// avoid compiler optimize the buf out.
if dummy {
for i := range buf {
buf[i] = byte(i)
}
}
}

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
if len(batches) == 0 {
Expand Down Expand Up @@ -273,9 +264,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm

// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch := range batches {
go func(batch batchKeys) {
reserveStack(false)
for _, batch1 := range batches {

batch := batch1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use batch1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good habit ! @winkyao
When using closure along with range operation, it's easy to get suck:

	arr := []int{1, 2, 3, 4}
	for _, v := range arr {
		go func() {
			fmt.Println(v)   // guess the result?
		}()
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it!

twoPhaseCommitGP.Go(func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
Expand All @@ -291,7 +283,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}(batch)
})
}
var err error
for i := 0; i < len(batches); i++ {
Expand Down Expand Up @@ -539,15 +531,14 @@ func (c *twoPhaseCommitter) execute() error {
undetermined := c.mu.undetermined
c.mu.RUnlock()
if !committed && !undetermined {
go func() {
reserveStack(false)
twoPhaseCommitGP.Go(func() {
err := c.cleanupKeys(NewBackoffer(cleanupMaxBackoff, goctx.Background()), writtenKeys)
if err != nil {
log.Infof("2PC cleanup err: %v, tid: %d", err, c.startTS)
} else {
log.Infof("2PC clean up done, tid: %d", c.startTS)
}
}()
})
}
}()

Expand Down
11 changes: 7 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)

var copIteratorGP = gp.New(time.Minute)

// CopClient is coprocessor client.
type CopClient struct {
store *tikvStore
Expand Down Expand Up @@ -367,14 +370,14 @@ func (it *copIterator) run(ctx goctx.Context) {
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
go func() {
copIteratorGP.Go(func() {
childCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
it.work(childCtx, taskCh)
}()
})
}

go func() {
copIteratorGP.Go(func() {
// Send tasks to feed the worker goroutines.
childCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
Expand All @@ -391,7 +394,7 @@ func (it *copIterator) run(ctx goctx.Context) {
if !it.req.KeepOrder {
close(it.respChan)
}
}()
})
}

func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask, taskCh chan<- *copTask) (finished bool, canceled bool) {
Expand Down
10 changes: 7 additions & 3 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
goctx "golang.org/x/net/context"
)

Expand All @@ -43,6 +44,8 @@ type tikvSnapshot struct {
priority pb.CommandPri
}

var snapshotGP = gp.New(time.Minute)

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
return &tikvSnapshot{
Expand Down Expand Up @@ -101,12 +104,13 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF))
}
ch := make(chan error)
for _, batch := range batches {
go func(batch batchKeys) {
for _, batch1 := range batches {
batch := batch1
snapshotGP.Go(func() {
backoffer, cancel := bo.Fork()
defer cancel()
ch <- s.batchGetSingleRegion(backoffer, batch, collectF)
}(batch)
})
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
Expand Down
1 change: 1 addition & 0 deletions tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func (s *testMainSuite) TestSchemaValidity(c *C) {
}

func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
c.Skip("make leak should check it")
// TODO: testleak package should be able to find this leak.
store := newStoreWithBootstrap(c, s.dbName+"goroutine_leak")
defer store.Close()
Expand Down