Skip to content

Commit

Permalink
cherry pick pingcap#33953 to release-6.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
crazycs520 authored and ti-srebot committed Apr 14, 2022
1 parent 36a9810 commit 8e6972c
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 10 deletions.
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,13 +1778,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
}

case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
memTracker := memory.NewTracker(v.ID(), -1)
memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker)
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &slowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
memTracker: memTracker,
},
}
case strings.ToLower(infoschema.TableStorageStats):
Expand Down
48 changes: 42 additions & 6 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ type slowQueryRetriever struct {
checker *slowLogChecker
columnValueFactoryMap map[string]slowQueryColumnValueFactory

taskList chan slowLogTask
stats *slowQueryRuntimeStats
taskList chan slowLogTask
stats *slowQueryRuntimeStats
memTracker *memory.Tracker
lastFetchSize int64
cancel context.CancelFunc
}

func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
Expand All @@ -76,6 +79,7 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte
if err != nil {
return nil, err
}
ctx, e.cancel = context.WithCancel(ctx)
e.initializeAsyncParsing(ctx, sctx)
}
return e.dataForSlowLog(ctx, sctx)
Expand Down Expand Up @@ -141,6 +145,9 @@ func (e *slowQueryRetriever) close() error {
logutil.BgLogger().Error("close slow log file failed.", zap.Error(err))
}
}
if e.cancel != nil {
e.cancel()
}
return nil
}

Expand Down Expand Up @@ -197,6 +204,8 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context, sctx sessionctx
task slowLogTask
ok bool
)
e.memConsume(-e.lastFetchSize)
e.lastFetchSize = 0
for {
select {
case task, ok = <-e.taskList:
Expand All @@ -214,6 +223,7 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context, sctx sessionctx
if len(rows) == 0 {
continue
}
e.lastFetchSize = calculateDatumsSize(rows)
return rows, nil
}
}
Expand Down Expand Up @@ -438,7 +448,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
return
case e.taskList <- t:
}
e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{nil, err})
e.sendParsedSlowLogCh(t, parsedSlowLog{nil, err})
}
if len(logs) == 0 || len(logs[0]) == 0 {
break
Expand Down Expand Up @@ -466,7 +476,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
}
wg.Run(func() {
result, err := e.parseLog(ctx, sctx, log, start)
e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{result, err})
e.sendParsedSlowLogCh(t, parsedSlowLog{result, err})
<-ch
})
offset.offset = e.fileLine
Expand All @@ -481,10 +491,10 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
wg.Wait()
}

func (e *slowQueryRetriever) sendParsedSlowLogCh(ctx context.Context, t slowLogTask, re parsedSlowLog) {
func (e *slowQueryRetriever) sendParsedSlowLogCh(t slowLogTask, re parsedSlowLog) {
select {
case t.resultCh <- re:
case <-ctx.Done():
default:
return
}
}
Expand Down Expand Up @@ -531,6 +541,8 @@ func splitByColon(line string) (fields []string, values []string) {

func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) {
start := time.Now()
logSize := calculateLogSize(log)
defer e.memConsume(-logSize)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
Expand All @@ -543,6 +555,7 @@ func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Conte
atomic.AddInt64(&e.stats.parseLog, int64(time.Since(start)))
}
}()
e.memConsume(logSize)
failpoint.Inject("errorMockParseSlowLogPanic", func(val failpoint.Value) {
if val.(bool) {
panic("panic test")
Expand Down Expand Up @@ -619,6 +632,7 @@ func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Conte
// Get the sql string, and mark the start flag to false.
_ = e.setColumnValue(sctx, row, tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.checker, fileLine)
e.setDefaultValue(row)
e.memConsume(types.EstimatedMemUsage(row, 1))
data = append(data, row)
startFlag = false
} else {
Expand Down Expand Up @@ -1087,3 +1101,25 @@ func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx se
e.taskList = make(chan slowLogTask, 1)
go e.parseDataForSlowLog(ctx, sctx)
}

func calculateLogSize(log []string) int64 {
size := 0
for _, line := range log {
size += len(line)
}
return int64(size)
}

func calculateDatumsSize(rows [][]types.Datum) int64 {
size := int64(0)
for _, row := range rows {
size += types.EstimatedMemUsage(row, 1)
}
return size
}

func (e *slowQueryRetriever) memConsume(bytes int64) {
if e.memTracker != nil {
e.memTracker.Consume(bytes)
}
}
106 changes: 106 additions & 0 deletions infoschema/cluster_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package infoschema_test

import (
"fmt"
"math/rand"
"net"
"net/http/httptest"
"os"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/auth"
Expand All @@ -39,6 +42,7 @@ import (
"github.com/pingcap/tidb/store/mockstore/mockstorage"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/set"
Expand Down Expand Up @@ -604,6 +608,108 @@ func TestStmtSummaryResultRows(t *testing.T) {
Check(testkit.Rows("10 30 20"))
}

func TestSlowQueryOOM(t *testing.T) {
var clean func()
s := new(clusterTablesSuite)
s.store, s.dom, clean = testkit.CreateMockStoreAndDomain(t)
defer clean()
s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0")
s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer()
s.startTime = time.Now()
defer s.httpServer.Close()
defer s.rpcserver.Stop()
tk := s.newTestKitWithRoot(t)

f, err := os.CreateTemp("", "tidb-slow-*.log")
require.NoError(t, err)
_, err = f.WriteString(`
# Time: 2022-04-14T10:50:28.185954+08:00
# Txn_start_ts: 432512598850928660
# User@Host: root[root] @ 127.0.0.1 [127.0.0.1]
# Conn_ID: 465
# Query_time: 0.000955269
# Parse_time: 0
# Compile_time: 0.000486719
# Rewrite_time: 0.000142467
# Optimize_time: 0.000312527
# Wait_TS: 0.000004489
# Cop_time: 0.000169235 Request_count: 2
# DB: test
# Index_names: [t_normal_oltp:idx0]
# Is_internal: false
# Digest: dcb13f841a568ec94baec50c88d0679c533bbd65539ba8fee6deb2e39881acdd
# Stats: t_normal_oltp:432512598027796484
# Num_cop_tasks: 2
# Cop_proc_avg: 0 Cop_proc_p90: 0 Cop_proc_max: 0 Cop_proc_addr: store1
# Cop_wait_avg: 0 Cop_wait_p90: 0 Cop_wait_max: 0 Cop_wait_addr: store1
# Mem_max: 11372
# Prepared: true
# Plan_from_cache: false
# Plan_from_binding: false
# Has_more_results: false
# KV_total: 0
# PD_total: 0.000000671
# Backoff_total: 0
# Write_sql_response_total: 0.000000606
# Result_rows: 1
# Succ: true
# IsExplicitTxn: false
# Plan: tidb_decode_plan('lQeAMAk1XzEwCTAJMQlmdW5jczpzdW0oQ29sdW1uIzEyKS0+DQzwUjYJMQl0aW1lOjQyMS45wrVzLCBsb29wczoyCTEuNDUgS0IJTi9BCjEJM18yNwkwCTAJY2FzdChwbHVzKHRlc3QudF9ub3JtYWxfb2x0cC5hLCB0RhYAXGIpLCBkZWNpbWFsKDIwLDApIEJJTkFSWRmHDDEyCTERiQQxOC6HAGwsIENvbmN1cnJlbmN5Ok9GRgk3NjAgQnl0ZXMJAZoYMgkzMF8yNAWbGUQMMDkuNzZGAEhpbmRleF90YXNrOiB7dG90YWxfBfgUIDEwMS4yBSwsZmV0Y2hfaGFuZGxlARgIMC4xBRiAYnVpbGQ6IDQ2OW5zLCB3YWl0OiA1OTVuc30sIHRhYmxlTlcADDI1Ny4pUCBudW06IDEsIGMdyBwgNX0JOC45MTFgODMJNDdfMjIJMV8wCTAJdAFVADoyWgEALAnBwDppZHgwKGEpLCByYW5nZTpbNjY4Mzk4LDY2ODQwOF0sIGtlZXAgb3JkZXI6ZmFsc2U1Ewg5Ni4uWAEAMwGQAHARuRGjGG1heDogNzQF9kRwcm9jX2tleXM6IDAsIHJwY18RJgEMKTwENjMN5GRjb3ByX2NhY2hlX2hpdF9yYXRpbzogMC4wMCEkCGlrdglqAHsFNwA1LokAFDB9CU4vQQEEIQUMNV8yM24FAWbfAAA1LcVNvmrfAAAwot8ADDU5LjYFLbbfAAQzLlrhAA==')
# Plan_digest: e7b1a5789200cb6d91aaac8af3f5560af51870369bac2e247b84fe9b5e754cbe
select sum(a+b) from test.t_normal_oltp where a >= ? and a <= ? [arguments: (668398, 668408)];
# Time: 2022-04-14T10:50:28.185987+08:00
select * from t;
# Time: 2022-04-14T10:50:28.186028+08:00
select * from t1;
`)
require.NoError(t, err)
require.NoError(t, f.Close())
executor.ParseSlowLogBatchSize = 1
originCfg := config.GetGlobalConfig()
newCfg := *originCfg
newCfg.Log.SlowQueryFile = f.Name()
newCfg.OOMAction = config.OOMActionCancel
config.StoreGlobalConfig(&newCfg)
defer func() {
executor.ParseSlowLogBatchSize = 64
config.StoreGlobalConfig(originCfg)
require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile))
}()
require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig()))

tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name()))
checkFn := func(quota int) {
originCfg := config.GetGlobalConfig()
newCfg := *originCfg
newCfg.MemQuotaQuery = int64(quota)
config.StoreGlobalConfig(&newCfg)
tk.MustExec("set @@tidb_mem_quota_query=" + strconv.Itoa(quota))

err = tk.QueryToErr("select * from `information_schema`.`slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'")
require.Error(t, err, quota)
require.Contains(t, err.Error(), "Out Of Memory Quota!", quota)
err = tk.QueryToErr("select * from `information_schema`.`cluster_slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'")
require.Error(t, err, quota)
require.Contains(t, err.Error(), "Out Of Memory Quota!", quota)
}
memQuotas := []int{128, 512, 1024, 2048, 4096}
for _, quota := range memQuotas {
checkFn(quota)
}
for i := 0; i < 100; i++ {
quota := rand.Int()%10240 + 1
checkFn(quota)
}

newCfg.MemQuotaQuery = 1024 * 1024 * 1024
config.StoreGlobalConfig(&newCfg)
tk.MustExec("set @@tidb_mem_quota_query=" + strconv.Itoa(int(newCfg.MemQuotaQuery)))
tk.MustQuery("select * from `information_schema`.`slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'")
mem := tk.Session().GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
require.Equal(t, mem, int64(0))
tk.MustQuery("select * from `information_schema`.`cluster_slow_query` where time > '2022-04-14 00:00:00' and time < '2022-04-15 00:00:00'")
}

func (s *clusterTablesSuite) setUpRPCService(t *testing.T, addr string) (*grpc.Server, string) {
lis, err := net.Listen("tcp", addr)
require.NoError(t, err)
Expand Down
17 changes: 13 additions & 4 deletions server/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,21 @@ func (s *rpcServer) createSession() (session.Session, error) {
Handle: do.PrivilegeHandle(),
}
privilege.BindPrivilegeManager(se, pm)
se.GetSessionVars().TxnCtx.InfoSchema = is
vars := se.GetSessionVars()
vars.TxnCtx.InfoSchema = is
// This is for disable parallel hash agg.
// TODO: remove this.
se.GetSessionVars().SetHashAggPartialConcurrency(1)
se.GetSessionVars().SetHashAggFinalConcurrency(1)
se.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForCoprocessor, -1)
vars.SetHashAggPartialConcurrency(1)
vars.SetHashAggFinalConcurrency(1)
vars.StmtCtx.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery)
vars.StmtCtx.MemTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker)
globalConfig := config.GetGlobalConfig()
switch globalConfig.OOMAction {
case config.OOMActionCancel:
action := &memory.PanicOnExceed{}
action.SetLogHook(domain.GetDomain(se).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
vars.StmtCtx.MemTracker.SetActionOnExceed(action)
}
se.SetSessionManager(s.sm)
return se, nil
}

0 comments on commit 8e6972c

Please sign in to comment.