Skip to content

Commit

Permalink
topsql: use new cache policy for top-n SQL (#25744)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Jun 29, 2021
1 parent a0b97b0 commit fac17a2
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 48 deletions.
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ const (
DefTiDBTopSQLEnable = false
DefTiDBTopSQLAgentAddress = ""
DefTiDBTopSQLPrecisionSeconds = 1
DefTiDBTopSQLMaxStatementCount = 2000
DefTiDBTopSQLMaxStatementCount = 200
DefTiDBTopSQLMaxCollect = 10000
DefTiDBTopSQLReportIntervalSeconds = 60
DefTiDBEnableGlobalTemporaryTable = false
Expand Down
2 changes: 1 addition & 1 deletion util/topsql/reporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *GRPCReportClient) Close() {
}

// sendBatchCPUTimeRecord sends a batch of TopSQL records by stream.
func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records map[string]*dataPoints) error {
func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records []*dataPoints) error {
if len(records) == 0 {
return nil
}
Expand Down
121 changes: 75 additions & 46 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,31 @@ type dataPoints struct {
CPUTimeMsTotal uint64
}

// cpuTimeSort is used to sort TopSQL records by total CPU time
type cpuTimeSort struct {
Key string
SQLDigest []byte
PlanDigest []byte
CPUTimeMsTotal uint64 // The sorting field
type dataPointsOrderByCPUTime []*dataPoints

func (t dataPointsOrderByCPUTime) Len() int {
return len(t)
}

type cpuTimeSortSlice []cpuTimeSort
func (t dataPointsOrderByCPUTime) Less(i, j int) bool {
// We need find the kth largest value, so here should use >
return t[i].CPUTimeMsTotal > t[j].CPUTimeMsTotal
}
func (t dataPointsOrderByCPUTime) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}

func (t cpuTimeSortSlice) Len() int {
type sqlCPUTimeRecordSlice []tracecpu.SQLCPUTimeRecord

func (t sqlCPUTimeRecordSlice) Len() int {
return len(t)
}

func (t cpuTimeSortSlice) Less(i, j int) bool {
func (t sqlCPUTimeRecordSlice) Less(i, j int) bool {
// We need find the kth largest value, so here should use >
return t[i].CPUTimeMsTotal > t[j].CPUTimeMsTotal
return t[i].CPUTimeMs > t[j].CPUTimeMs
}
func (t cpuTimeSortSlice) Swap(i, j int) {
func (t sqlCPUTimeRecordSlice) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}

Expand Down Expand Up @@ -215,16 +221,45 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string {
return buf.String()
}

// doCollect uses a hashmap to store records in every second, and evict when necessary.
func getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) {
maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load())
if len(records) <= maxStmt {
return records, nil
}
if err := quickselect.QuickSelect(sqlCPUTimeRecordSlice(records), maxStmt); err != nil {
// skip eviction
return records, nil
}
return records[:maxStmt], records[maxStmt:]
}

func getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) {
maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load())
if len(records) <= maxStmt {
return records, nil
}
if err := quickselect.QuickSelect(dataPointsOrderByCPUTime(records), maxStmt); err != nil {
// skip eviction
return records, nil
}
return records[:maxStmt], records[maxStmt:]
}

// doCollect collects top N records of each round into collectTarget, and evict the data that is not in top N.
func (tsr *RemoteTopSQLReporter) doCollect(
collectTarget map[string]*dataPoints, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) {
defer util.Recover("top-sql", "doCollect", nil, false)

// Get top N records of each round records.
var evicted []tracecpu.SQLCPUTimeRecord
records, evicted = getTopNRecords(records)

keyBuf := bytes.NewBuffer(make([]byte, 0, 64))
listCapacity := int(variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1)
if listCapacity < 1 {
listCapacity = 1
}
// Collect the top N records to collectTarget for each round.
for _, record := range records {
key := encodeKey(keyBuf, record.SQLDigest, record.PlanDigest)
entry, exist := collectTarget[key]
Expand All @@ -245,37 +280,15 @@ func (tsr *RemoteTopSQLReporter) doCollect(
entry.CPUTimeMsTotal += uint64(record.CPUTimeMs)
}

// evict records according to `MaxStatementCount` variable.
// TODO: Better to pass in the variable in the constructor, instead of referencing directly.
maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load())
if len(collectTarget) <= maxStmt {
return
}

// find the max CPUTimeMsTotal that should be evicted
digestCPUTimeList := make([]cpuTimeSort, len(collectTarget))
idx := 0
for key, value := range collectTarget {
digestCPUTimeList[idx] = cpuTimeSort{
Key: key,
SQLDigest: value.SQLDigest,
PlanDigest: value.PlanDigest,
CPUTimeMsTotal: value.CPUTimeMsTotal,
}
idx++
}

// QuickSelect will only return error when the second parameter is out of range
if err := quickselect.QuickSelect(cpuTimeSortSlice(digestCPUTimeList), maxStmt); err != nil {
// skip eviction
return
}

itemsToEvict := digestCPUTimeList[maxStmt:]
// Evict redundant data.
normalizedSQLMap := tsr.normalizedSQLMap.Load().(*sync.Map)
normalizedPlanMap := tsr.normalizedPlanMap.Load().(*sync.Map)
for _, evict := range itemsToEvict {
delete(collectTarget, evict.Key)
for _, evict := range evicted {
key := encodeKey(keyBuf, evict.SQLDigest, evict.PlanDigest)
_, ok := collectTarget[key]
if ok {
continue
}
_, loaded := normalizedSQLMap.LoadAndDelete(string(evict.SQLDigest))
if loaded {
tsr.sqlMapLength.Add(-1)
Expand All @@ -290,11 +303,13 @@ func (tsr *RemoteTopSQLReporter) doCollect(
// takeDataAndSendToReportChan takes out (resets) collected data. These data will be send to a report channel
// for reporting later.
func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *map[string]*dataPoints) {
data := reportData{
collectedData: *collectedDataPtr,
normalizedSQLMap: tsr.normalizedSQLMap.Load().(*sync.Map),
normalizedPlanMap: tsr.normalizedPlanMap.Load().(*sync.Map),
// Fetch TopN dataPoints.
records := make([]*dataPoints, 0, len(*collectedDataPtr))
for _, v := range *collectedDataPtr {
records = append(records, v)
}
normalizedSQLMap := tsr.normalizedSQLMap.Load().(*sync.Map)
normalizedPlanMap := tsr.normalizedPlanMap.Load().(*sync.Map)

// Reset data for next report.
*collectedDataPtr = make(map[string]*dataPoints)
Expand All @@ -303,6 +318,20 @@ func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *m
tsr.sqlMapLength.Store(0)
tsr.planMapLength.Store(0)

// Evict redundant data.
var evicted []*dataPoints
records, evicted = getTopNDataPoints(records)
for _, evict := range evicted {
normalizedSQLMap.LoadAndDelete(string(evict.SQLDigest))
normalizedPlanMap.LoadAndDelete(string(evict.PlanDigest))
}

data := reportData{
collectedData: records,
normalizedSQLMap: normalizedSQLMap,
normalizedPlanMap: normalizedPlanMap,
}

// Send to report channel. When channel is full, data will be dropped.
select {
case tsr.reportDataChan <- data:
Expand All @@ -312,7 +341,7 @@ func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *m

// reportData contains data that reporter sends to the agent
type reportData struct {
collectedData map[string]*dataPoints
collectedData []*dataPoints
normalizedSQLMap *sync.Map
normalizedPlanMap *sync.Map
}
Expand Down
72 changes: 72 additions & 0 deletions util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,78 @@ func (s *testTopSQLReporter) TestCollectAndEvicted(c *C) {
}
}

func (s *testTopSQLReporter) newSQLCPUTimeRecord(tsr *RemoteTopSQLReporter, sqlID int, cpuTimeMs uint32) tracecpu.SQLCPUTimeRecord {
key := []byte("sqlDigest" + strconv.Itoa(sqlID))
value := "sqlNormalized" + strconv.Itoa(sqlID)
tsr.RegisterSQL(key, value)

key = []byte("planDigest" + strconv.Itoa(sqlID))
value = "planNormalized" + strconv.Itoa(sqlID)
tsr.RegisterPlan(key, value)

return tracecpu.SQLCPUTimeRecord{
SQLDigest: []byte("sqlDigest" + strconv.Itoa(sqlID)),
PlanDigest: []byte("planDigest" + strconv.Itoa(sqlID)),
CPUTimeMs: cpuTimeMs,
}
}

func (s *testTopSQLReporter) collectAndWait(tsr *RemoteTopSQLReporter, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) {
tsr.Collect(timestamp, records)
time.Sleep(time.Millisecond * 100)
}

func (s *testTopSQLReporter) TestCollectAndTopN(c *C) {
agentServer, err := mock.StartMockAgentServer()
c.Assert(err, IsNil)
defer agentServer.Stop()

tsr := setupRemoteTopSQLReporter(2, 1, agentServer.Address())
defer tsr.Close()

records := []tracecpu.SQLCPUTimeRecord{
s.newSQLCPUTimeRecord(tsr, 1, 1),
s.newSQLCPUTimeRecord(tsr, 2, 2),
}
s.collectAndWait(tsr, 1, records)

records = []tracecpu.SQLCPUTimeRecord{
s.newSQLCPUTimeRecord(tsr, 3, 3),
s.newSQLCPUTimeRecord(tsr, 1, 1),
}
s.collectAndWait(tsr, 2, records)

records = []tracecpu.SQLCPUTimeRecord{
s.newSQLCPUTimeRecord(tsr, 4, 1),
s.newSQLCPUTimeRecord(tsr, 1, 1),
}
s.collectAndWait(tsr, 3, records)

// Wait agent server collect finish.
agentServer.WaitCollectCnt(1, time.Second*10)

// check for equality of server received batch and the original data
results := agentServer.GetLatestRecords()
c.Assert(results, HasLen, 2)
for _, req := range results {
id := 0
prefix := "sqlDigest"
if strings.HasPrefix(string(req.SqlDigest), prefix) {
n, err := strconv.Atoi(string(req.SqlDigest)[len(prefix):])
c.Assert(err, IsNil)
id = n
}
if id != 1 && id != 3 {
c.Fatalf("the id should be 1 or 3, got: %v", id)
}
total := uint32(0)
for _, v := range req.RecordListCpuTimeMs {
total += v
}
c.Assert(total, Equals, uint32(3))
}
}

func (s *testTopSQLReporter) TestCollectCapacity(c *C) {
tsr := setupRemoteTopSQLReporter(maxSQLNum, 60, "")
defer tsr.Close()
Expand Down

0 comments on commit fac17a2

Please sign in to comment.