-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: use goroutine pool to avoid runtime.morestack #3753
Changes from all commits
4f3292a
a636621
47475c9
7ab8420
bf21e85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import ( | |
"github.com/pingcap/tidb/store/tikv/tikvrpc" | ||
"github.com/pingcap/tidb/tablecodec" | ||
"github.com/pingcap/tidb/terror" | ||
"github.com/pingcap/tidb/util/goroutine_pool" | ||
"github.com/pingcap/tipb/go-binlog" | ||
goctx "golang.org/x/net/context" | ||
) | ||
|
@@ -41,6 +42,8 @@ const ( | |
actionCleanup twoPhaseCommitAction = 3 | ||
) | ||
|
||
var twoPhaseCommitGP = gp.New(3 * time.Minute) | ||
|
||
func (ca twoPhaseCommitAction) String() string { | ||
switch ca { | ||
case actionPrewrite: | ||
|
@@ -218,30 +221,18 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA | |
} | ||
if action == actionCommit { | ||
// Commit secondary batches in background goroutine to reduce latency. | ||
go func() { | ||
reserveStack(false) | ||
twoPhaseCommitGP.Go(func() { | ||
e := c.doActionOnBatches(bo, action, batches) | ||
if e != nil { | ||
log.Debugf("2PC async doActionOnBatches %s err: %v", action, e) | ||
} | ||
}() | ||
}) | ||
} else { | ||
err = c.doActionOnBatches(bo, action, batches) | ||
} | ||
return errors.Trace(err) | ||
} | ||
|
||
// reserveStack reserves 4KB memory on the stack to avoid runtime.morestack, call it after new a goroutine if necessary. | ||
func reserveStack(dummy bool) { | ||
var buf [8 << 10]byte | ||
// avoid compiler optimize the buf out. | ||
if dummy { | ||
for i := range buf { | ||
buf[i] = byte(i) | ||
} | ||
} | ||
} | ||
|
||
// doActionOnBatches does action to batches in parallel. | ||
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error { | ||
if len(batches) == 0 { | ||
|
@@ -273,9 +264,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm | |
|
||
// Concurrently do the work for each batch. | ||
ch := make(chan error, len(batches)) | ||
for _, batch := range batches { | ||
go func(batch batchKeys) { | ||
reserveStack(false) | ||
for _, batch1 := range batches { | ||
|
||
batch := batch1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just use batch1? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good habit ! @winkyao
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it! |
||
twoPhaseCommitGP.Go(func() { | ||
if action == actionCommit { | ||
// Because the secondary batches of the commit actions are implemented to be | ||
// committed asynchronously in background goroutines, we should not | ||
|
@@ -291,7 +283,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm | |
defer singleBatchCancel() | ||
ch <- singleBatchActionFunc(singleBatchBackoffer, batch) | ||
} | ||
}(batch) | ||
}) | ||
} | ||
var err error | ||
for i := 0; i < len(batches); i++ { | ||
|
@@ -539,15 +531,14 @@ func (c *twoPhaseCommitter) execute() error { | |
undetermined := c.mu.undetermined | ||
c.mu.RUnlock() | ||
if !committed && !undetermined { | ||
go func() { | ||
reserveStack(false) | ||
twoPhaseCommitGP.Go(func() { | ||
err := c.cleanupKeys(NewBackoffer(cleanupMaxBackoff, goctx.Background()), writtenKeys) | ||
if err != nil { | ||
log.Infof("2PC cleanup err: %v, tid: %d", err, c.startTS) | ||
} else { | ||
log.Infof("2PC clean up done, tid: %d", c.startTS) | ||
} | ||
}() | ||
}) | ||
} | ||
}() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use diffrent idleTimeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't matter whether it 2mins or 3mins or anything.