Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: Exceed action for copiterator (#17324) #18392

Merged
merged 6 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -139,6 +140,14 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
e.feedback.Invalidate()
return err
}

actionExceed := e.memTracker.GetActionOnExceed()
if actionExceed != nil {
e.ctx.GetSessionVars().StmtCtx.MemTracker.FallbackOldAndSetNewAction(actionExceed)
} else {
return errors.Trace(fmt.Errorf("failed to find actionExceed in TableReaderExecutor Open phase"))
}

if len(secondPartRanges) == 0 {
e.resultHandler.open(nil, firstResult)
return nil
Expand Down
84 changes: 84 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
vars: vars,
memTracker: req.MemTracker,
replicaReadSeed: c.replicaReadSeed,
actionOnExceed: &EndCopWorkerAction{},
}
if it.memTracker != nil {
it.memTracker.FallbackOldAndSetNewAction(it.actionOnExceed)
}

it.minCommitTSPushed.data = make(map[uint64]struct{}, 5)
it.tasks = tasks
if it.concurrency > len(tasks) {
Expand All @@ -90,6 +95,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
} else {
it.respChan = make(chan *copResponse, it.concurrency)
}
it.actionOnExceed.mu.aliveWorker = it.concurrency
it.open(ctx)
return it
}
Expand Down Expand Up @@ -397,10 +403,13 @@ type copIterator struct {
closed uint32

minCommitTSPushed

actionOnExceed *EndCopWorkerAction
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
type copIteratorWorker struct {
id string
taskCh <-chan *copTask
wg *sync.WaitGroup
store *tikvStore
Expand All @@ -413,6 +422,8 @@ type copIteratorWorker struct {
memTracker *memory.Tracker

replicaReadSeed uint32

actionOnExceed *EndCopWorkerAction
}

// copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
Expand Down Expand Up @@ -484,7 +495,14 @@ const minLogCopTaskTime = 300 * time.Millisecond
// send the result back.
func (worker *copIteratorWorker) run(ctx context.Context) {
defer worker.wg.Done()

for task := range worker.taskCh {
endWorker, remainWorkers := worker.checkWorkerOOM()
if endWorker {
logutil.BgLogger().Info("end one copIterator worker.",
zap.String("copIteratorWorker id", worker.id), zap.Int("remain alive worker", remainWorkers))
return
}
respCh := worker.respChan
if respCh == nil {
respCh = task.respChan
Expand All @@ -497,19 +515,39 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
}
select {
case <-worker.finishCh:
worker.actionOnExceed.mu.Lock()
worker.actionOnExceed.mu.aliveWorker--
worker.actionOnExceed.mu.Unlock()
return
default:
}
}
}

func (worker *copIteratorWorker) checkWorkerOOM() (bool, int) {
endWorker := false
remainWorkers := 0
worker.actionOnExceed.mu.Lock()
defer worker.actionOnExceed.mu.Unlock()
if worker.actionOnExceed.mu.exceeded != 0 {
endWorker = true
worker.actionOnExceed.mu.aliveWorker--
remainWorkers = worker.actionOnExceed.mu.aliveWorker
// reset action
worker.actionOnExceed.mu.exceeded = 0
worker.actionOnExceed.once = sync.Once{}
}
return endWorker, remainWorkers
}

// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context) {
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency; i++ {
worker := &copIteratorWorker{
id: fmt.Sprintf("copIteratorWorker-%d", i),
taskCh: taskCh,
wg: &it.wg,
store: it.store,
Expand All @@ -527,6 +565,7 @@ func (it *copIterator) open(ctx context.Context) {
memTracker: it.memTracker,

replicaReadSeed: it.replicaReadSeed,
actionOnExceed: it.actionOnExceed,
}
go worker.run(ctx)
}
Expand Down Expand Up @@ -1150,3 +1189,48 @@ func (it copErrorResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
func (it copErrorResponse) Close() error {
return nil
}

// EndCopWorkerAction implements memory.ActionOnExceed for copIteratorWorker. If
// the memory quota of a query is exceeded, EndCopWorkAction.Action would end one copIteratorWorker.
// If there is only one or zero worker is running, delegate to the fallback action.
type EndCopWorkerAction struct {
once sync.Once
fallbackAction memory.ActionOnExceed
mu struct {
sync.Mutex
// exceeded indicates that datasource have exceeded memQuota.
exceeded uint32

// alive worker indicates how many copIteratorWorker are running
aliveWorker int
}
}

// Action sends a signal to trigger end one copIterator worker.
func (e *EndCopWorkerAction) Action(t *memory.Tracker) {
e.mu.Lock()
defer e.mu.Unlock()
// only one or zero worker is running, delegate to the fallback action
if e.mu.aliveWorker < 2 {
if e.fallbackAction != nil {
e.fallbackAction.Action(t)
}
return
}
// set exceeded as 1
e.once.Do(func() {
e.mu.exceeded = 1
logutil.BgLogger().Info("memory exceeds quota, mark EndCopWorkerAction exceed signal.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()), zap.Int64("maxConsumed", t.MaxConsumed()))
})
}

// SetLogHook implements ActionOnExceed.SetLogHook
func (e *EndCopWorkerAction) SetLogHook(hook func(uint64)) {

}

// SetFallback implements ActionOnExceed.SetFallback
func (e *EndCopWorkerAction) SetFallback(a memory.ActionOnExceed) {
e.fallbackAction = a
}
7 changes: 7 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
t.actionMu.Unlock()
}

// GetActionOnExceed return the actionOnExceed
func (t *Tracker) GetActionOnExceed() ActionOnExceed {
t.actionMu.Lock()
defer t.actionMu.Unlock()
return t.actionMu.actionOnExceed
}

// FallbackOldAndSetNewAction sets the action when memory usage exceeds bytesLimit
// and set the original action as its fallback.
func (t *Tracker) FallbackOldAndSetNewAction(a ActionOnExceed) {
Expand Down