Skip to content

Commit

Permalink
executor: make prepare analyze killed globally instead of query quota (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 10, 2023
1 parent 71d2cfd commit b96ce5c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 14 deletions.
43 changes: 43 additions & 0 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3097,6 +3097,49 @@ func TestGlobalMemoryControlForAnalyze(t *testing.T) {
tk0.MustExec(sql)
}

func TestGlobalMemoryControlForPrepareAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_mem_quota_query = 209715200 ") // 200MB
tk0.MustExec("set global tidb_server_memory_limit = 5GB")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk0.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk0.MustExec("use test")
tk0.MustExec("create table t(a int)")
tk0.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk0.MustExec("insert into t select * from t") // 256 Lines
}
sqlPrepare := "prepare stmt from 'analyze table t with 1.0 samplerate';"
sqlExecute := "execute stmt;" // Need about 100MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) // 512MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
// won't be killed by tidb_mem_quota_query
tk0.MustExec(sqlPrepare)
tk0.MustExec(sqlExecute)
runtime.GC()
// killed by tidb_server_memory_limit
tk0.MustExec("set global tidb_server_memory_limit = 512MB")
_, err0 := tk0.Exec(sqlPrepare)
require.NoError(t, err0)
_, err1 := tk0.Exec(sqlExecute)
// Killed and the WarnMsg is WarnMsgSuffixForInstance instead of WarnMsgSuffixForSingleQuery
require.True(t, strings.Contains(err1.Error(), memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance))
runtime.GC()
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
tk0.MustExec(sqlPrepare)
tk0.MustExec(sqlExecute)
}

func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
16 changes: 13 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2077,10 +2077,20 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
vars.MemTracker.ResetMaxConsumed()
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID = vars.ConnectionID
vars.MemTracker.SessionID.Store(vars.ConnectionID)
vars.StmtCtx.TableStats = make(map[int64]interface{})

if _, ok := s.(*ast.AnalyzeTableStmt); ok {
isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return err
}
_, isAnalyze = prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt)
} else if _, ok := s.(*ast.AnalyzeTableStmt); ok {
isAnalyze = true
}
if isAnalyze {
sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1)
vars.MemTracker.SetBytesLimit(-1)
vars.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
Expand All @@ -2100,7 +2110,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
}
sc.MemTracker.SessionID = vars.ConnectionID
sc.MemTracker.SessionID.Store(vars.ConnectionID)
sc.MemTracker.AttachTo(vars.MemTracker)
sc.InitDiskTracker(memory.LabelForSQLText, -1)
globalConfig := config.GetGlobalConfig()
Expand Down
2 changes: 1 addition & 1 deletion executor/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func TestIssue42662(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().ConnectionID = 12345
tk.Session().GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, -1)
tk.Session().GetSessionVars().MemTracker.SessionID = 12345
tk.Session().GetSessionVars().MemTracker.SessionID.Store(12345)
tk.Session().GetSessionVars().MemTracker.IsRootTrackerOfSess = true

sm := &testkit.MockSessionManager{
Expand Down
2 changes: 1 addition & 1 deletion util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (a *PanicOnExceed) Action(t *Tracker) {
if !a.acted {
if a.logHook == nil {
logutil.BgLogger().Warn("memory exceeds quota",
zap.Uint64("conn", t.SessionID), zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String())))
zap.Uint64("conn", t.SessionID.Load()), zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String())))
} else {
a.logHook(a.ConnID)
}
Expand Down
2 changes: 1 addition & 1 deletion util/memory/memstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ReadMemStats() (memStats *runtime.MemStats) {
}
failpoint.Inject("ReadMemStats", func(val failpoint.Value) {
injectedSize := val.(int)
memStats.HeapInuse += uint64(injectedSize)
memStats = &runtime.MemStats{HeapInuse: memStats.HeapInuse + uint64(injectedSize)}
})
return
}
Expand Down
12 changes: 6 additions & 6 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ type Tracker struct {
}
label int // Label of this "Tracker".
// following fields are used with atomic operations, so make them 64-byte aligned.
bytesConsumed int64 // Consumed bytes.
bytesReleased int64 // Released bytes.
maxConsumed atomicutil.Int64 // max number of bytes consumed during execution.
SessionID uint64 // SessionID indicates the sessionID the tracker is bound.
NeedKill atomic.Bool // NeedKill indicates whether this session need kill because OOM
bytesConsumed int64 // Consumed bytes.
bytesReleased int64 // Released bytes.
maxConsumed atomicutil.Int64 // max number of bytes consumed during execution.
SessionID atomicutil.Uint64 // SessionID indicates the sessionID the tracker is bound.
NeedKill atomic.Bool // NeedKill indicates whether this session need kill because OOM
NeedKillReceived sync.Once
IsRootTrackerOfSess bool // IsRootTrackerOfSess indicates whether this tracker is bound for session
isGlobal bool // isGlobal indicates whether this tracker is global tracker
Expand Down Expand Up @@ -462,7 +462,7 @@ func (t *Tracker) Consume(bs int64) {
sessionRootTracker.NeedKillReceived.Do(
func() {
logutil.BgLogger().Warn("global memory controller, NeedKill signal is received successfully",
zap.Uint64("conn", sessionRootTracker.SessionID))
zap.Uint64("conn", sessionRootTracker.SessionID.Load()))
})
tryActionLastOne(&sessionRootTracker.actionMuForHardLimit, sessionRootTracker)
}
Expand Down
5 changes: 3 additions & 2 deletions util/servermemorylimit/servermemorylimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) {
if instanceStats.HeapInuse > bt {
t := memory.MemUsageTop1Tracker.Load()
if t != nil {
sessionID := t.SessionID.Load()
memUsage := t.BytesConsumed()
// If the memory usage of the top1 session is less than tidb_server_memory_limit_sess_min_size, we do not need to kill it.
if uint64(memUsage) < limitSessMinSize {
memory.MemUsageTop1Tracker.CompareAndSwap(t, nil)
t = nil
} else if info, ok := sm.GetProcessInfo(t.SessionID); ok {
} else if info, ok := sm.GetProcessInfo(sessionID); ok {
logutil.BgLogger().Warn("global memory controller tries to kill the top1 memory consumer",
zap.Uint64("conn", info.ID),
zap.String("sql digest", info.Digest),
Expand All @@ -152,7 +153,7 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) {
zap.Uint64("heap inuse", instanceStats.HeapInuse),
zap.Int64("sql memory usage", info.MemTracker.BytesConsumed()),
)
s.sessionID = t.SessionID
s.sessionID = sessionID
s.sqlStartTime = info.Time
s.isKilling = true
s.sessionTracker = t
Expand Down

0 comments on commit b96ce5c

Please sign in to comment.