Skip to content

Commit

Permalink
: use goroutine pool to avoid runtime.morestack
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Jul 18, 2017
1 parent 4f3292a commit a636621
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 31 deletions.
7 changes: 6 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
Expand All @@ -35,6 +36,8 @@ var (
_ PartialResult = &partialResult{}
)

var selectResultGP = gp.New(2 * time.Minute)

// SelectResult is an iterator of coprocessor partial results.
type SelectResult interface {
// Next gets the next partial result.
Expand Down Expand Up @@ -71,7 +74,9 @@ type resultWithErr struct {
}

func (r *selectResult) Fetch(ctx goctx.Context) {
go r.fetch(ctx)
selectResultGP.Go(func() {
r.fetch(ctx)
})
}

func (r *selectResult) fetch(ctx goctx.Context) {
Expand Down
11 changes: 9 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
Expand All @@ -41,6 +42,8 @@ const (
minLogDuration = 50 * time.Millisecond
)

var xIndexSelectGP = gp.New(3 * time.Minute)

func resultRowToRow(t table.Table, h int64, data []types.Datum, tableAsName *model.CIStr) *Row {
entry := &RowKeyEntry{
Handle: h,
Expand Down Expand Up @@ -550,7 +553,9 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {
// e.taskChan serves as a pipeline, so fetching index and getting table data can
// run concurrently.
e.taskChan = make(chan *lookupTableTask, LookupTableTaskChannelSize)
go e.fetchHandles(idxResult, e.taskChan)
xIndexSelectGP.Go(func() {
e.fetchHandles(idxResult, e.taskChan)
})
}

for {
Expand Down Expand Up @@ -586,7 +591,9 @@ func (e *XSelectIndexExec) slowQueryInfo(duration time.Duration) string {

func (e *XSelectIndexExec) addWorker(workCh chan *lookupTableTask, concurrency *int, concurrencyLimit int) {
if *concurrency < concurrencyLimit {
go e.pickAndExecTask(workCh)
xIndexSelectGP.Go(func() {
e.pickAndExecTask(workCh)
})
*concurrency++
}
}
Expand Down
33 changes: 12 additions & 21 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-binlog"
goctx "golang.org/x/net/context"
)
Expand All @@ -41,6 +42,8 @@ const (
actionCleanup twoPhaseCommitAction = 3
)

var twoPhaseCommitGP = gp.New(3 * time.Minute)

func (ca twoPhaseCommitAction) String() string {
switch ca {
case actionPrewrite:
Expand Down Expand Up @@ -218,30 +221,18 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
go func() {
reserveStack(false)
twoPhaseCommitGP.Go(func() {
e := c.doActionOnBatches(bo, action, batches)
if e != nil {
log.Debugf("2PC async doActionOnBatches %s err: %v", action, e)
}
}()
})
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return errors.Trace(err)
}

// reserveStack reserves 4KB memory on the stack to avoid runtime.morestack, call it after new a goroutine if necessary.
func reserveStack(dummy bool) {
var buf [8 << 10]byte
// avoid compiler optimize the buf out.
if dummy {
for i := range buf {
buf[i] = byte(i)
}
}
}

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
if len(batches) == 0 {
Expand Down Expand Up @@ -273,9 +264,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm

// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch := range batches {
go func(batch batchKeys) {
reserveStack(false)
for _, batch1 := range batches {

batch := batch1
twoPhaseCommitGP.Go(func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
Expand All @@ -291,7 +283,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}(batch)
})
}
var err error
for i := 0; i < len(batches); i++ {
Expand Down Expand Up @@ -539,15 +531,14 @@ func (c *twoPhaseCommitter) execute() error {
undetermined := c.mu.undetermined
c.mu.RUnlock()
if !committed && !undetermined {
go func() {
reserveStack(false)
twoPhaseCommitGP.Go(func() {
err := c.cleanupKeys(NewBackoffer(cleanupMaxBackoff, goctx.Background()), writtenKeys)
if err != nil {
log.Infof("2PC cleanup err: %v, tid: %d", err, c.startTS)
} else {
log.Infof("2PC clean up done, tid: %d", c.startTS)
}
}()
})
}
}()

Expand Down
11 changes: 7 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
goctx "golang.org/x/net/context"
)

var copIteratorGP = gp.New(time.Minute)

// CopClient is coprocessor client.
type CopClient struct {
store *tikvStore
Expand Down Expand Up @@ -351,14 +354,14 @@ func (it *copIterator) run(ctx goctx.Context) {
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
go func() {
copIteratorGP.Go(func() {
childCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
it.work(childCtx, it.taskCh)
}()
})
}

go func() {
copIteratorGP.Go(func() {
// Send tasks to feed the worker goroutines.
childCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
Expand All @@ -375,7 +378,7 @@ func (it *copIterator) run(ctx goctx.Context) {
if !it.req.KeepOrder {
close(it.respChan)
}
}()
})
}

func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask) (finished bool, canceled bool) {
Expand Down
10 changes: 7 additions & 3 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/goroutine_pool"
goctx "golang.org/x/net/context"
)

Expand All @@ -43,6 +44,8 @@ type tikvSnapshot struct {
priority pb.CommandPri
}

var snapshotGP = gp.New(time.Minute)

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
return &tikvSnapshot{
Expand Down Expand Up @@ -101,12 +104,13 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF))
}
ch := make(chan error)
for _, batch := range batches {
go func(batch batchKeys) {
for _, batch1 := range batches {
batch := batch1
snapshotGP.Go(func() {
backoffer, cancel := bo.Fork()
defer cancel()
ch <- s.batchGetSingleRegion(backoffer, batch, collectF)
}(batch)
})
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
Expand Down

0 comments on commit a636621

Please sign in to comment.