Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: print an expensive query log when a query is out of memQuota (#10799) #10849

Merged
merged 4 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ func (c *Config) Valid() error {
return fmt.Errorf("invalid max log file size=%v which is larger than max=%v", c.Log.File.MaxSize, MaxLogFileSize)
}
c.OOMAction = strings.ToLower(c.OOMAction)
if c.OOMAction != OOMActionLog && c.OOMAction != OOMActionCancel {
return fmt.Errorf("unsupported OOMAction %v, TiDB only supports [%v, %v]", c.OOMAction, OOMActionLog, OOMActionCancel)
}

// lower_case_table_names is allowed to be 0, 1, 2
if c.LowerCaseTableNames < 0 || c.LowerCaseTableNames > 2 {
Expand Down
18 changes: 18 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,21 @@ func (s *testConfigSuite) TestValid(c *C) {
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}

func (s *testConfigSuite) TestOOMActionValid(c *C) {
c1 := NewConfig()
tests := []struct {
oomAction string
valid bool
}{
{"log", true},
{"Log", true},
{"Cancel", true},
{"cANceL", true},
{"quit", false},
}
for _, tt := range tests {
c1.OOMAction = tt.oomAction
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}
10 changes: 7 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,11 +1303,15 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
sc.MemTracker.SetActionOnExceed(&memory.PanicOnExceed{})
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
case config.OOMActionLog:
sc.MemTracker.SetActionOnExceed(&memory.LogOnExceed{})
fallthrough
default:
sc.MemTracker.SetActionOnExceed(&memory.LogOnExceed{})
action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
}

if execStmt, ok := s.(*ast.ExecuteStmt); ok {
Expand Down
5 changes: 3 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,9 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo {
if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {
continue
}
pi := client.ctx.ShowProcess()
rs[pi.ID] = pi
if pi := client.ctx.ShowProcess(); pi != nil {
rs[pi.ID] = pi
}
}
s.rwlock.RUnlock()
return rs
Expand Down
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func createServer() {
svr, err = server.NewServer(cfg, driver)
// Both domain and storage have started, so we have to clean them before exiting.
terror.MustNil(err, closeDomainAndStorage)
go dom.ExpensiveQueryHandle().Run(svr)
go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run()
}

func serverShutdown(isgraceful bool) {
Expand Down
26 changes: 23 additions & 3 deletions util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ import (
// Handle is the handler for expensive query.
type Handle struct {
exitCh chan struct{}
sm util.SessionManager
}

// NewExpensiveQueryHandle builds a new expensive query handler.
func NewExpensiveQueryHandle(exitCh chan struct{}) *Handle {
return &Handle{exitCh}
return &Handle{exitCh: exitCh}
}

// SetSessionManager sets the SessionManager which is used to fetching the info
// of all active sessions.
func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle {
eqh.sm = sm
return eqh
}

// Run starts a expensive query checker goroutine at the start time of the server.
func (eqh *Handle) Run(sm util.SessionManager) {
func (eqh *Handle) Run() {
threshold := atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold)
curInterval := time.Second * time.Duration(threshold)
ticker := time.NewTicker(curInterval / 2)
Expand All @@ -50,7 +58,7 @@ func (eqh *Handle) Run(sm util.SessionManager) {
if log.GetLevel() > zapcore.WarnLevel {
continue
}
processInfo := sm.ShowProcessList()
processInfo := eqh.sm.ShowProcessList()
for _, info := range processInfo {
if len(info.Info) == 0 || info.ExceedExpensiveTimeThresh {
continue
Expand All @@ -72,6 +80,18 @@ func (eqh *Handle) Run(sm util.SessionManager) {
}
}

// LogOnQueryExceedMemQuota prints a log when memory usage of connID is out of memory quota.
func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) {
if log.GetLevel() > zapcore.WarnLevel {
return
}
info, ok := eqh.sm.GetProcessInfo(connID)
if !ok {
return
}
logExpensiveQuery(time.Since(info.Time), info)
}

// logExpensiveQuery logs the queries which exceed the time threshold or memory threshold.
func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) {
logFields := make([]zap.Field, 0, 20)
Expand Down
40 changes: 32 additions & 8 deletions util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package memory

import (
"context"
"fmt"
"sync"

"github.com/pingcap/parser/mysql"
Expand All @@ -29,12 +30,22 @@ type ActionOnExceed interface {
// Action will be called when memory usage exceeds memory quota by the
// corresponding Tracker.
Action(t *Tracker)
// SetLogHook binds a log hook which will be triggered and log an detailed
// message for the out-of-memory sql.
SetLogHook(hook func(uint64))
}

// LogOnExceed logs a warning only once when memory usage exceeds memory quota.
type LogOnExceed struct {
mutex sync.Mutex // For synchronization.
acted bool
mutex sync.Mutex // For synchronization.
acted bool
ConnID uint64
logHook func(uint64)
}

// SetLogHook sets a hook for LogOnExceed.
func (a *LogOnExceed) SetLogHook(hook func(uint64)) {
a.logHook = hook
}

// Action logs a warning only once when memory usage exceeds memory quota.
Expand All @@ -43,16 +54,26 @@ func (a *LogOnExceed) Action(t *Tracker) {
defer a.mutex.Unlock()
if !a.acted {
a.acted = true
logutil.Logger(context.Background()).Warn("memory exceeds quota",
zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.bytesLimit, t.String())))
return
if a.logHook == nil {
logutil.Logger(context.Background()).Warn("memory exceeds quota",
zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.bytesLimit, t.String())))
return
}
a.logHook(a.ConnID)
}
}

// PanicOnExceed panics when memory usage exceeds memory quota.
type PanicOnExceed struct {
mutex sync.Mutex // For synchronization.
acted bool
mutex sync.Mutex // For synchronization.
acted bool
ConnID uint64
logHook func(uint64)
}

// SetLogHook sets a hook for PanicOnExceed.
func (a *PanicOnExceed) SetLogHook(hook func(uint64)) {
a.logHook = hook
}

// Action panics when memory usage exceeds memory quota.
Expand All @@ -64,7 +85,10 @@ func (a *PanicOnExceed) Action(t *Tracker) {
}
a.acted = true
a.mutex.Unlock()
panic(PanicMemoryExceed + t.String())
if a.logHook != nil {
a.logHook(a.ConnID)
}
panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID))
}

var (
Expand Down
3 changes: 3 additions & 0 deletions util/memory/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type mockAction struct {
called bool
}

func (a *mockAction) SetLogHook(hook func(uint64)) {
}

func (a *mockAction) Action(t *Tracker) {
a.called = true
}
Expand Down