Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runaway: add runaway tidb-side time checker #54987

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/design/2023-08-24-background-tasks-control.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

Resource control is used to solve some problems of resource usage under data consolidation. We can currently control some normal query tasks by means of RU limiter and scheduling. But it's not an adaptation for some background or bulk import/export tasks very well.

Due to the implementation restriction, resource control can't be applied for some tasks such as BR and TiDB Lightning. And for some long-running tasks such as DDL or background auto-analyze, it's also hard to control the resource usage becase it's not easy to select a proper RU settrings for these kind of jobs.
Due to the implementation restriction, resource control can't be applied for some tasks such as BR and TiDB Lightning. And for some long-running tasks such as DDL or background auto-analyze, it's also hard to control the resource usage because it's not easy to select a proper RU settings for these kind of jobs.

## Design Goals

Expand All @@ -35,7 +35,7 @@ CREATE/ALTER RESOURCE GROUP rg1
[ BACKGROUND = ( TASK_TYPES = "br,analyze" ) ];
```

Currently, we only support set the task types that should be controlled in the background manner. We may extend this interface to include more setttings such as task priority in the future.
Currently, we only support set the task types that should be controlled in the background manner. We may extend this interface to include more settings such as task priority in the future.

If a resource group's background setting is not set, we automatically apply the `default` resource group's settings to this group.

Expand All @@ -55,7 +55,7 @@ In order to control the background tasks' resource usage, we plan to add an extr

![background-control.png](imgs/background-control.png)

- Control the resource usage of all background tasks by the Resource Limiter: The rate limit is dynamically adjusted to the value via the formula TiKVTotalRUCapcity - sum(RUCostRateOfForgroundTasks), with a fine-grained adjusting duration, we can ensure the foreground tasks' RU is always enough(or near the system's maximum if the foreground requirement reaches the maximum quota), so the background tasks' impact on foreground tasks should be very low; on the other hand, when the foreground resource consumption is low, the controller should increase the limit threshold, so background jobs can take advantage of the remaining resources.
- Control the resource usage of all background tasks by the Resource Limiter: The rate limit is dynamically adjusted to the value via the formula TiKVTotalRUCapacity - sum(RUCostRateOfForegroundTasks), with a fine-grained adjusting duration, we can ensure the foreground tasks' RU is always enough(or near the system's maximum if the foreground requirement reaches the maximum quota), so the background tasks' impact on foreground tasks should be very low; on the other hand, when the foreground resource consumption is low, the controller should increase the limit threshold, so background jobs can take advantage of the remaining resources.
- The local resource manager will statics RU consumption of background jobs via the Resource Limiter: We will do statistics and report the resource consumption to the global resource manager. In the first stage, we only do statistics globally but control it locally.
- Feedback mechanism: It's better to give feedback on how fast the limiter layer executes tasks on tikv to the upper layer like tidb, so that the upper layer task framework can adjust the number of tasks.

Expand Down Expand Up @@ -134,7 +134,7 @@ impl Future for LimitedFuture {

In our implementation, we integrate this rate limiter in the following components so it can cover most use cases:

- Coprocessor. All SQL read requests are handled via the coprocessor component, this can ensure all read reuqests are covered.
- Coprocessor. All SQL read requests are handled via the coprocessor component, this can ensure all read requests are covered.
- Txn Scheduler. The write requests in tikv are handled via multiple threadpools via a pipeline manner, to make things simple, we only apply the rate limiter in the first phase, that is, the txn scheduler worker pool. Though this is not ideal, the result is acceptable in our benchmark. We may enhance this mechanism in the future.
- Backup. We apply the rate limiter in backup kv scan and sst upload procedure.
- SST Service. Most sst relate operations are handled via the sst service. This ensure BR, TiDB Lightning and DDL(fast mode) can be controlled.
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/tests/resourcegroup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ go_test(
srcs = ["resource_group_test.go"],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
"//pkg/ddl/resourcegroup",
"//pkg/ddl/util/callback",
Expand All @@ -15,6 +15,7 @@ go_test(
"//pkg/errno",
"//pkg/parser/auth",
"//pkg/parser/model",
"//pkg/server",
"//pkg/sessionctx",
"//pkg/testkit",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
39 changes: 38 additions & 1 deletion pkg/ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -310,7 +311,6 @@ func TestResourceGroupRunaway(t *testing.T) {
maxWaitDuration := time.Second * 5
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg1 select /*+ resource_group(rg1) */ * from t identify"), maxWaitDuration, tryInterval)
// require.Len(t, tk.MustQuery("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries").Rows(), 0)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=COOLDOWN)")
Expand Down Expand Up @@ -363,6 +363,43 @@ func TestResourceGroupRunaway(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq"))
}

func TestResourceGroupRunawayExceedTiDBSide(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/FastRunawayGC"))
}()
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
sv.SetDomain(dom)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)

go dom.ExpensiveQueryHandle().SetSessionManager(sv).Run()
tk.MustExec("set global tidb_enable_resource_control='on'")
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)")

require.Eventually(t, func() bool {
return dom.RunawayManager().IsSyncerInitialized()
}, 20*time.Second, 300*time.Millisecond)

err := tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.5) from t")
require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query")

tryInterval := time.Millisecond * 100
maxWaitDuration := time.Second * 5
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.5) from t identify"), maxWaitDuration, tryInterval)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil,
nil, maxWaitDuration, tryInterval)
}

func TestAlreadyExistsDefaultResourceGroup(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/infosync/managerAlreadyCreateSomeGroups", `return(true)`))
defer func() {
Expand Down
29 changes: 29 additions & 0 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package resourcegroup

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -525,6 +526,34 @@ func (r *RunawayChecker) BeforeExecutor() error {
return nil
}

// CheckKillAction checks whether the query should be killed.
func (r *RunawayChecker) CheckKillAction() bool {
if r.setting == nil && !r.markedByWatch {
return false
}
// mark by rule
marked := r.markedByRule.Load()
if !marked {
now := time.Now()
until := r.deadline.Sub(now)
if until > 0 {
return false
}
if r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
if !r.markedByWatch {
r.markQuarantine(&now)
}
}
}
return r.setting.Action == rmpb.RunawayAction_Kill
}

// Rule returns the rule of the runaway checker.
func (r *RunawayChecker) Rule() string {
return fmt.Sprintf("execElapsedTimeMs:%s", time.Duration(r.setting.Rule.ExecElapsedTimeMs)*time.Millisecond)
}

// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request.
func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
if r.setting == nil && !r.markedByWatch {
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -2592,7 +2592,7 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
if x, ok := s.Expr.(*ast.FuncCallExpr); ok {
if x.FnName.L == ast.ConnectionID {
sm := e.Ctx().GetSessionManager()
sm.Kill(e.Ctx().GetSessionVars().ConnectionID, s.Query, false)
sm.Kill(e.Ctx().GetSessionVars().ConnectionID, s.Query, false, false)
return nil
}
return errors.New("Invalid operation. Please use 'KILL TIDB [CONNECTION | QUERY] [connectionID | CONNECTION_ID()]' instead")
Expand All @@ -2604,7 +2604,7 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
if sm == nil {
return nil
}
sm.Kill(s.ConnectionID, s.Query, false)
sm.Kill(s.ConnectionID, s.Query, false, false)
} else {
err := errors.NewNoStackError("Invalid operation. Please use 'KILL TIDB [CONNECTION | QUERY] [connectionID | CONNECTION_ID()]' instead")
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err)
Expand All @@ -2619,7 +2619,7 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
if e.IsFromRemote {
logutil.BgLogger().Info("Killing connection in current instance redirected from remote TiDB", zap.Uint64("conn", s.ConnectionID), zap.Bool("query", s.Query),
zap.String("sourceAddr", e.Ctx().GetSessionVars().SourceAddr.IP.String()))
sm.Kill(s.ConnectionID, s.Query, false)
sm.Kill(s.ConnectionID, s.Query, false, false)
return nil
}

Expand All @@ -2645,7 +2645,7 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err1)
}
} else {
sm.Kill(s.ConnectionID, s.Query, false)
sm.Kill(s.ConnectionID, s.Query, false, false)
}

return nil
Expand Down
17 changes: 10 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,9 @@ func (s *Server) GetConAttrs(user *auth.UserIdentity) map[uint64]map[string]stri
}

// Kill implements the SessionManager interface.
func (s *Server) Kill(connectionID uint64, query bool, maxExecutionTime bool) {
logutil.BgLogger().Info("kill", zap.Uint64("conn", connectionID), zap.Bool("query", query))
func (s *Server) Kill(connectionID uint64, query bool, maxExecutionTime bool, runaway bool) {
logutil.BgLogger().Info("kill", zap.Uint64("conn", connectionID),
zap.Bool("query", query), zap.Bool("maxExecutionTime", maxExecutionTime), zap.Bool("runawayExceed", runaway))
metrics.ServerEventCounter.WithLabelValues(metrics.EventKill).Inc()

s.rwlock.RLock()
Expand All @@ -916,7 +917,7 @@ func (s *Server) Kill(connectionID uint64, query bool, maxExecutionTime bool) {
}
}
}
killQuery(conn, maxExecutionTime)
killQuery(conn, maxExecutionTime, runaway)
}

// UpdateTLSConfig implements the SessionManager interface.
Expand All @@ -929,9 +930,11 @@ func (s *Server) GetTLSConfig() *tls.Config {
return (*tls.Config)(atomic.LoadPointer(&s.tlsConfig))
}

func killQuery(conn *clientConn, maxExecutionTime bool) {
func killQuery(conn *clientConn, maxExecutionTime, runaway bool) {
sessVars := conn.ctx.GetSessionVars()
if maxExecutionTime {
if runaway {
sessVars.SQLKiller.SendKillSignal(sqlkiller.RunawayQueryExceeded)
} else if maxExecutionTime {
sessVars.SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
} else {
sessVars.SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted)
Expand Down Expand Up @@ -974,7 +977,7 @@ func (s *Server) KillAllConnections() {
if err := conn.closeWithoutLock(); err != nil {
terror.Log(err)
}
killQuery(conn, false)
killQuery(conn, false, false)
}

s.KillSysProcesses()
Expand Down Expand Up @@ -1134,6 +1137,6 @@ func (s *Server) KillNonFlashbackClusterConn() {
}
s.rwlock.RUnlock()
for _, id := range connIDs {
s.Kill(id, false, false)
s.Kill(id, false, false, false)
}
}
1 change: 1 addition & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx,
MemTracker: s.sessionVars.MemTracker,
DiskTracker: s.sessionVars.DiskTracker,
RunawayChecker: s.sessionVars.StmtCtx.RunawayChecker,
StatsInfo: plannercore.GetStatsInfo,
OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(),
TableIDs: s.sessionVars.StmtCtx.TableIDs,
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/driver/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func ToTiDBErr(err error) error {
// connection id is unknown in client, which should be logged or filled by upper layers
return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(-1)
}
if stderrs.Is(err, tikverr.ErrQueryInterruptedWithSignal{Signal: sqlkiller.RunawayQueryExceeded}) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.GenWithStackByArgs()
}

if stderrs.Is(err, tikverr.ErrTiKVServerBusy) {
return ErrTiKVServerBusy
Expand Down
6 changes: 3 additions & 3 deletions pkg/testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (msm *MockSessionManager) GetConAttrs(user *auth.UserIdentity) map[uint64]m
}

// Kill implements the SessionManager.Kill interface.
func (*MockSessionManager) Kill(uint64, bool, bool) {
func (*MockSessionManager) Kill(uint64, bool, bool, bool) {
}

// KillAllConnections implements the SessionManager.KillAllConnections interface.
Expand Down Expand Up @@ -183,12 +183,12 @@ func (msm *MockSessionManager) KillNonFlashbackClusterConn() {
processInfo := se.ShowProcess()
ddl, ok := processInfo.StmtCtx.GetPlan().(*core.DDL)
if !ok {
msm.Kill(se.GetSessionVars().ConnectionID, false, false)
msm.Kill(se.GetSessionVars().ConnectionID, false, false, false)
continue
}
_, ok = ddl.Statement.(*ast.FlashBackToTimestampStmt)
if !ok {
msm.Kill(se.GetSessionVars().ConnectionID, false, false)
msm.Kill(se.GetSessionVars().ConnectionID, false, false, false)
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type PhysicalTable struct {
TimeColumn *model.ColumnInfo
}

// NewBasePhysicalTable create a new PhysicalTable with specific timeColunm.
// NewBasePhysicalTable create a new PhysicalTable with specific timeColumn.
func NewBasePhysicalTable(schema model.CIStr,
tbl *model.TableInfo,
partition model.CIStr,
Expand Down
1 change: 1 addition & 0 deletions pkg/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/domain/resourcegroup",
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/metrics",
Expand Down
9 changes: 7 additions & 2 deletions pkg/util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,21 @@ func (eqh *Handle) Run() {
if info.MaxExecutionTime > 0 && costTime > time.Duration(info.MaxExecutionTime)*time.Millisecond {
logutil.BgLogger().Warn("execution timeout, kill it", zap.Duration("costTime", costTime),
zap.Duration("maxExecutionTime", time.Duration(info.MaxExecutionTime)*time.Millisecond), zap.String("processInfo", info.String()))
sm.Kill(info.ID, true, true)
sm.Kill(info.ID, true, true, false)
}
if info.ID == sm.GetAutoAnalyzeProcID() {
maxAutoAnalyzeTime := variable.MaxAutoAnalyzeTime.Load()
if maxAutoAnalyzeTime > 0 && costTime > time.Duration(maxAutoAnalyzeTime)*time.Second {
logutil.BgLogger().Warn("auto analyze timeout, kill it", zap.Duration("costTime", costTime),
zap.Duration("maxAutoAnalyzeTime", time.Duration(maxAutoAnalyzeTime)*time.Second), zap.String("processInfo", info.String()))
sm.Kill(info.ID, true, false)
sm.Kill(info.ID, true, false, false)
}
}
if info.RunawayChecker != nil && info.RunawayChecker.CheckKillAction() {
logutil.BgLogger().Warn("runaway query timeout", zap.Duration("costTime", costTime), zap.String("groupName", info.ResourceGroupName),
zap.String("rule", info.RunawayChecker.Rule()), zap.String("processInfo", info.String()))
sm.Kill(info.ID, true, false, true)
}
}
threshold = atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold)
txnThreshold = atomic.LoadUint64(&variable.ExpensiveTxnTimeThreshold)
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/session/cursor"
Expand Down Expand Up @@ -51,6 +52,7 @@ type ProcessInfo struct {
RefCountOfStmtCtx *stmtctx.ReferenceCount
MemTracker *memory.Tracker
DiskTracker *disk.Tracker
RunawayChecker *resourcegroup.RunawayChecker
StatsInfo func(any) map[string]uint64
RuntimeStatsColl *execdetails.RuntimeStatsColl
User string
Expand Down Expand Up @@ -203,7 +205,7 @@ type SessionManager interface {
ShowProcessList() map[uint64]*ProcessInfo
ShowTxnList() []*txninfo.TxnInfo
GetProcessInfo(id uint64) (*ProcessInfo, bool)
Kill(connectionID uint64, query bool, maxExecutionTime bool)
Kill(connectionID uint64, query bool, maxExecutionTime bool, runaway bool)
KillAllConnections()
UpdateTLSConfig(cfg *tls.Config)
ServerID() uint64
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/sqlkiller/sqlkiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
MaxExecTimeExceeded
QueryMemoryExceeded
ServerMemoryExceeded
RunawayQueryExceeded
// When you add a new signal, you should also modify store/driver/error/ToTidbErr,
// so that errors in client can be correctly converted to tidb errors.
)
Expand Down Expand Up @@ -77,6 +78,9 @@ func (killer *SQLKiller) getKillError(status killSignal) error {
return exeerrors.ErrMemoryExceedForQuery.GenWithStackByArgs(killer.ConnID)
case ServerMemoryExceeded:
return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(killer.ConnID)
case RunawayQueryExceeded:
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.GenWithStackByArgs()
default:
}
return nil
}
Expand Down