From 84a034c5dc95f8870a37502d0283f1ed2cd88467 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 23 Jun 2021 23:22:46 +0800 Subject: [PATCH 01/12] init Signed-off-by: crazycs --- util/topsql/reporter/client.go | 2 +- util/topsql/reporter/reporter.go | 117 +++++++++++++++++++------------ 2 files changed, 74 insertions(+), 45 deletions(-) diff --git a/util/topsql/reporter/client.go b/util/topsql/reporter/client.go index 87504d646902b..68181f4247ce7 100644 --- a/util/topsql/reporter/client.go +++ b/util/topsql/reporter/client.go @@ -99,7 +99,7 @@ func (r *GRPCReportClient) Close() { } // sendBatchCPUTimeRecord sends a batch of TopSQL records by stream. -func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records map[string]*dataPoints) error { +func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records []*dataPoints) error { if len(records) == 0 { return nil } diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 34163383f160e..e3c4c030b89c8 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -61,25 +61,31 @@ type dataPoints struct { CPUTimeMsTotal uint64 } -// cpuTimeSort is used to sort TopSQL records by total CPU time -type cpuTimeSort struct { - Key string - SQLDigest []byte - PlanDigest []byte - CPUTimeMsTotal uint64 // The sorting field +type dataPointsSlice []*dataPoints + +func (t dataPointsSlice) Len() int { + return len(t) +} + +func (t dataPointsSlice) Less(i, j int) bool { + // We need find the kth largest value, so here should use > + return t[i].CPUTimeMsTotal > t[j].CPUTimeMsTotal +} +func (t dataPointsSlice) Swap(i, j int) { + t[i], t[j] = t[j], t[i] } -type cpuTimeSortSlice []cpuTimeSort +type sqlCPUTimeRecordSlice []tracecpu.SQLCPUTimeRecord -func (t cpuTimeSortSlice) Len() int { +func (t sqlCPUTimeRecordSlice) Len() int { return len(t) } -func (t cpuTimeSortSlice) Less(i, j int) bool { +func (t sqlCPUTimeRecordSlice) Less(i, j int) bool { // We need find the kth largest value, so here should use > - return t[i].CPUTimeMsTotal > t[j].CPUTimeMsTotal + return t[i].CPUTimeMs > t[j].CPUTimeMs } -func (t cpuTimeSortSlice) Swap(i, j int) { +func (t sqlCPUTimeRecordSlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } @@ -215,11 +221,37 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { return buf.String() } +func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topn, evicted []tracecpu.SQLCPUTimeRecord) { + maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) + if len(records) <= maxStmt { + return records, nil + } + if err := quickselect.QuickSelect(sqlCPUTimeRecordSlice(records), maxStmt); err != nil { + // skip eviction + return records, nil + } + return records[:maxStmt], records[maxStmt:] +} + +func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topn, evicted []*dataPoints) { + maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) + if len(records) <= maxStmt { + return records, nil + } + if err := quickselect.QuickSelect(dataPointsSlice(records), maxStmt); err != nil { + // skip eviction + return records, nil + } + return records[:maxStmt], records[maxStmt:] +} + // doCollect uses a hashmap to store records in every second, and evict when necessary. func (tsr *RemoteTopSQLReporter) doCollect( collectTarget map[string]*dataPoints, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { defer util.Recover("top-sql", "doCollect", nil, false) + var evicted []tracecpu.SQLCPUTimeRecord + records, evicted = tsr.getTopNRecords(records) keyBuf := bytes.NewBuffer(make([]byte, 0, 64)) listCapacity := int(variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1) if listCapacity < 1 { @@ -245,37 +277,14 @@ func (tsr *RemoteTopSQLReporter) doCollect( entry.CPUTimeMsTotal += uint64(record.CPUTimeMs) } - // evict records according to `MaxStatementCount` variable. - // TODO: Better to pass in the variable in the constructor, instead of referencing directly. - maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) - if len(collectTarget) <= maxStmt { - return - } - - // find the max CPUTimeMsTotal that should be evicted - digestCPUTimeList := make([]cpuTimeSort, len(collectTarget)) - idx := 0 - for key, value := range collectTarget { - digestCPUTimeList[idx] = cpuTimeSort{ - Key: key, - SQLDigest: value.SQLDigest, - PlanDigest: value.PlanDigest, - CPUTimeMsTotal: value.CPUTimeMsTotal, - } - idx++ - } - - // QuickSelect will only return error when the second parameter is out of range - if err := quickselect.QuickSelect(cpuTimeSortSlice(digestCPUTimeList), maxStmt); err != nil { - // skip eviction - return - } - - itemsToEvict := digestCPUTimeList[maxStmt:] normalizedSQLMap := tsr.normalizedSQLMap.Load().(*sync.Map) normalizedPlanMap := tsr.normalizedPlanMap.Load().(*sync.Map) - for _, evict := range itemsToEvict { - delete(collectTarget, evict.Key) + for _, evict := range evicted { + key := encodeKey(keyBuf, evict.SQLDigest, evict.PlanDigest) + _, ok := collectTarget[key] + if ok { + continue + } _, loaded := normalizedSQLMap.LoadAndDelete(string(evict.SQLDigest)) if loaded { tsr.sqlMapLength.Add(-1) @@ -290,10 +299,30 @@ func (tsr *RemoteTopSQLReporter) doCollect( // takeDataAndSendToReportChan takes out (resets) collected data. These data will be send to a report channel // for reporting later. func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *map[string]*dataPoints) { + // Fetch TopN dataPoints. + records := make([]*dataPoints, 0, len(*collectedDataPtr)) + for _, v := range *collectedDataPtr { + records = append(records, v) + } + var evicted []*dataPoints + records, evicted = tsr.getTopNDataPoints(records) + normalizedSQLMap := tsr.normalizedSQLMap.Load().(*sync.Map) + normalizedPlanMap := tsr.normalizedPlanMap.Load().(*sync.Map) + for _, evict := range evicted { + _, loaded := normalizedSQLMap.LoadAndDelete(string(evict.SQLDigest)) + if loaded { + tsr.sqlMapLength.Add(-1) + } + _, loaded = normalizedPlanMap.LoadAndDelete(string(evict.PlanDigest)) + if loaded { + tsr.planMapLength.Add(-1) + } + } + data := reportData{ - collectedData: *collectedDataPtr, - normalizedSQLMap: tsr.normalizedSQLMap.Load().(*sync.Map), - normalizedPlanMap: tsr.normalizedPlanMap.Load().(*sync.Map), + collectedData: records, + normalizedSQLMap: normalizedSQLMap, + normalizedPlanMap: normalizedPlanMap, } // Reset data for next report. @@ -312,7 +341,7 @@ func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *m // reportData contains data that reporter sends to the agent type reportData struct { - collectedData map[string]*dataPoints + collectedData []*dataPoints normalizedSQLMap *sync.Map normalizedPlanMap *sync.Map } From 20d3ac6ac153ce9cdfdd3961666b3cce362de43c Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 24 Jun 2021 16:10:10 +0800 Subject: [PATCH 02/12] add test Signed-off-by: crazycs --- util/topsql/reporter/reporter_test.go | 72 +++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 7e4d2b2ddc9b5..8a25fe2a84235 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -169,6 +169,78 @@ func (s *testTopSQLReporter) TestCollectAndEvicted(c *C) { } } +func (s *testTopSQLReporter) newSQLCPUTimeRecord(tsr *RemoteTopSQLReporter, sqlID int, cpuTimeMs uint32) tracecpu.SQLCPUTimeRecord { + key := []byte("sqlDigest" + strconv.Itoa(sqlID)) + value := "sqlNormalized" + strconv.Itoa(sqlID) + tsr.RegisterSQL(key, value) + + key = []byte("planDigest" + strconv.Itoa(sqlID)) + value = "planNormalized" + strconv.Itoa(sqlID) + tsr.RegisterPlan(key, value) + + return tracecpu.SQLCPUTimeRecord{ + SQLDigest: []byte("sqlDigest" + strconv.Itoa(sqlID)), + PlanDigest: []byte("planDigest" + strconv.Itoa(sqlID)), + CPUTimeMs: cpuTimeMs, + } +} + +func (s *testTopSQLReporter) collectAndWait(tsr *RemoteTopSQLReporter, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { + tsr.Collect(timestamp, records) + time.Sleep(time.Millisecond * 100) +} + +func (s *testTopSQLReporter) TestCollectAndTopN(c *C) { + agentServer, err := mock.StartMockAgentServer() + c.Assert(err, IsNil) + defer agentServer.Stop() + + tsr := setupRemoteTopSQLReporter(2, 1, agentServer.Address()) + defer tsr.Close() + + records := []tracecpu.SQLCPUTimeRecord{ + s.newSQLCPUTimeRecord(tsr, 1, 1), + s.newSQLCPUTimeRecord(tsr, 2, 2), + } + s.collectAndWait(tsr, 1, records) + + records = []tracecpu.SQLCPUTimeRecord{ + s.newSQLCPUTimeRecord(tsr, 3, 3), + s.newSQLCPUTimeRecord(tsr, 1, 1), + } + s.collectAndWait(tsr, 2, records) + + records = []tracecpu.SQLCPUTimeRecord{ + s.newSQLCPUTimeRecord(tsr, 4, 1), + s.newSQLCPUTimeRecord(tsr, 1, 1), + } + s.collectAndWait(tsr, 3, records) + + // Wait agent server collect finish. + agentServer.WaitCollectCnt(1, time.Second*10) + + // check for equality of server received batch and the original data + results := agentServer.GetLatestRecords() + c.Assert(results, HasLen, 2) + for _, req := range results { + id := 0 + prefix := "sqlDigest" + if strings.HasPrefix(string(req.SqlDigest), prefix) { + n, err := strconv.Atoi(string(req.SqlDigest)[len(prefix):]) + c.Assert(err, IsNil) + id = n + } + if id != 1 && id != 3 { + c.Fatalf("the id should be 1 or 3, got: %v", id) + } + total := uint32(0) + for _, v := range req.CpuTimeMsList { + total += v + } + c.Assert(total, Equals, uint32(3)) + } +} + func (s *testTopSQLReporter) TestCollectCapacity(c *C) { tsr := setupRemoteTopSQLReporter(maxSQLNum, 60, "") defer tsr.Close() From 0c3a76588d08231a6728b22e7733bcf5796b7e1d Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 28 Jun 2021 15:22:25 +0800 Subject: [PATCH 03/12] update tipb and address comment Signed-off-by: crazycs --- go.mod | 3 +-- go.sum | 4 ++-- sessionctx/variable/tidb_vars.go | 2 +- util/topsql/reporter/client.go | 8 ++++---- util/topsql/reporter/reporter.go | 4 ++-- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 8c899a662712b..4e25817d439bf 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/parser v0.0.0-20210618053735-57843e8185c4 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible - github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95 + github.com/pingcap/tipb v0.0.0-20210628060001-1793e022b962 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 @@ -76,7 +76,6 @@ require ( golang.org/x/tools v0.1.0 google.golang.org/grpc v1.27.1 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect - honnef.co/go/tools v0.2.0 // indirect modernc.org/mathutil v1.2.2 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 diff --git a/go.sum b/go.sum index d52e75c750529..644b8299e9f52 100644 --- a/go.sum +++ b/go.sum @@ -451,8 +451,8 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041 github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95 h1:Cj7FhGvYn8hrXDNcaHi0aTl0KdV67KTL+P5gBp3vqT4= -github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20210628060001-1793e022b962 h1:9Y9Eci9LwAEhyXAlAU0bSix7Nemm3G267oyN3GVK+j0= +github.com/pingcap/tipb v0.0.0-20210628060001-1793e022b962/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3a6ae0a067576..66c7ae9c18380 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -712,7 +712,7 @@ const ( DefTiDBTopSQLEnable = false DefTiDBTopSQLAgentAddress = "" DefTiDBTopSQLPrecisionSeconds = 1 - DefTiDBTopSQLMaxStatementCount = 2000 + DefTiDBTopSQLMaxStatementCount = 200 DefTiDBTopSQLMaxCollect = 10000 DefTiDBTopSQLReportIntervalSeconds = 60 DefTiDBEnableGlobalTemporaryTable = false diff --git a/util/topsql/reporter/client.go b/util/topsql/reporter/client.go index 68181f4247ce7..300c055dd13c5 100644 --- a/util/topsql/reporter/client.go +++ b/util/topsql/reporter/client.go @@ -110,10 +110,10 @@ func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records [ } for _, record := range records { record := &tipb.CPUTimeRecord{ - TimestampList: record.TimestampList, - CpuTimeMsList: record.CPUTimeMsList, - SqlDigest: record.SQLDigest, - PlanDigest: record.PlanDigest, + RecordListTimestampSec: record.TimestampList, + RecordListCpuTimeMs: record.CPUTimeMsList, + SqlDigest: record.SQLDigest, + PlanDigest: record.PlanDigest, } if err := stream.Send(record); err != nil { return err diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index e3c4c030b89c8..fb3417c149373 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -221,7 +221,7 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { return buf.String() } -func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topn, evicted []tracecpu.SQLCPUTimeRecord) { +func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil @@ -233,7 +233,7 @@ func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRec return records[:maxStmt], records[maxStmt:] } -func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topn, evicted []*dataPoints) { +func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil From 18b57b77a5d65d1eec27fbce674ae5dc9e233b0c Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 28 Jun 2021 15:29:03 +0800 Subject: [PATCH 04/12] fix test Signed-off-by: crazycs --- util/topsql/reporter/reporter_test.go | 26 +++++++++++++------------- util/topsql/topsql_test.go | 6 +++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 8a25fe2a84235..8728f992d80ae 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -111,13 +111,13 @@ func (s *testTopSQLReporter) TestCollectAndSendBatch(c *C) { c.Assert(err, IsNil) id = n } - c.Assert(req.CpuTimeMsList, HasLen, 1) - for i := range req.CpuTimeMsList { - c.Assert(req.CpuTimeMsList[i], Equals, uint32(id)) + c.Assert(req.RecordListCpuTimeMs, HasLen, 1) + for i := range req.RecordListCpuTimeMs { + c.Assert(req.RecordListCpuTimeMs[i], Equals, uint32(id)) } - c.Assert(req.TimestampList, HasLen, 1) - for i := range req.TimestampList { - c.Assert(req.TimestampList[i], Equals, uint64(1)) + c.Assert(req.RecordListTimestampSec, HasLen, 1) + for i := range req.RecordListTimestampSec { + c.Assert(req.RecordListTimestampSec[i], Equals, uint64(1)) } normalizedSQL, exist := agentServer.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) c.Assert(exist, IsTrue) @@ -152,13 +152,13 @@ func (s *testTopSQLReporter) TestCollectAndEvicted(c *C) { id = n } c.Assert(id >= maxSQLNum, IsTrue) - c.Assert(req.CpuTimeMsList, HasLen, 1) - for i := range req.CpuTimeMsList { - c.Assert(req.CpuTimeMsList[i], Equals, uint32(id)) + c.Assert(req.RecordListCpuTimeMs, HasLen, 1) + for i := range req.RecordListCpuTimeMs { + c.Assert(req.RecordListCpuTimeMs[i], Equals, uint32(id)) } - c.Assert(req.TimestampList, HasLen, 1) - for i := range req.TimestampList { - c.Assert(req.TimestampList[i], Equals, uint64(2)) + c.Assert(req.RecordListTimestampSec, HasLen, 1) + for i := range req.RecordListTimestampSec { + c.Assert(req.RecordListTimestampSec[i], Equals, uint64(2)) } normalizedSQL, exist := agentServer.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) c.Assert(exist, IsTrue) @@ -234,7 +234,7 @@ func (s *testTopSQLReporter) TestCollectAndTopN(c *C) { c.Fatalf("the id should be 1 or 3, got: %v", id) } total := uint32(0) - for _, v := range req.CpuTimeMsList { + for _, v := range req.RecordListCpuTimeMs { total += v } c.Assert(total, Equals, uint32(3)) diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 0663a4f307b95..88748e110cff9 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -167,9 +167,9 @@ func (s *testSuite) TestTopSQLReporter(c *C) { records := server.GetLatestRecords() checkSQLPlanMap := map[string]struct{}{} for _, req := range records { - c.Assert(len(req.CpuTimeMsList) > 0, IsTrue) - c.Assert(req.CpuTimeMsList[0] > 0, IsTrue) - c.Assert(req.TimestampList[0] > 0, IsTrue) + c.Assert(len(req.RecordListCpuTimeMs) > 0, IsTrue) + c.Assert(req.RecordListCpuTimeMs[0] > 0, IsTrue) + c.Assert(req.RecordListCpuTimeMs[0] > 0, IsTrue) normalizedSQL, exist := server.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) c.Assert(exist, IsTrue) expectedNormalizedSQL, exist := sqlMap[string(req.SqlDigest)] From f2bfedf7e7a597f70152cb4a2cedcfcc4db47020 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 28 Jun 2021 18:05:27 +0800 Subject: [PATCH 05/12] Revert "update tipb and address comment" This reverts commit 0c3a76588d08231a6728b22e7733bcf5796b7e1d. --- go.mod | 3 ++- go.sum | 4 ++-- sessionctx/variable/tidb_vars.go | 2 +- util/topsql/reporter/client.go | 8 ++++---- util/topsql/reporter/reporter.go | 4 ++-- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 4e25817d439bf..8c899a662712b 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/parser v0.0.0-20210618053735-57843e8185c4 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible - github.com/pingcap/tipb v0.0.0-20210628060001-1793e022b962 + github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 @@ -76,6 +76,7 @@ require ( golang.org/x/tools v0.1.0 google.golang.org/grpc v1.27.1 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + honnef.co/go/tools v0.2.0 // indirect modernc.org/mathutil v1.2.2 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 diff --git a/go.sum b/go.sum index 644b8299e9f52..d52e75c750529 100644 --- a/go.sum +++ b/go.sum @@ -451,8 +451,8 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041 github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20210628060001-1793e022b962 h1:9Y9Eci9LwAEhyXAlAU0bSix7Nemm3G267oyN3GVK+j0= -github.com/pingcap/tipb v0.0.0-20210628060001-1793e022b962/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95 h1:Cj7FhGvYn8hrXDNcaHi0aTl0KdV67KTL+P5gBp3vqT4= +github.com/pingcap/tipb v0.0.0-20210603161937-cfb5a9225f95/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 66c7ae9c18380..3a6ae0a067576 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -712,7 +712,7 @@ const ( DefTiDBTopSQLEnable = false DefTiDBTopSQLAgentAddress = "" DefTiDBTopSQLPrecisionSeconds = 1 - DefTiDBTopSQLMaxStatementCount = 200 + DefTiDBTopSQLMaxStatementCount = 2000 DefTiDBTopSQLMaxCollect = 10000 DefTiDBTopSQLReportIntervalSeconds = 60 DefTiDBEnableGlobalTemporaryTable = false diff --git a/util/topsql/reporter/client.go b/util/topsql/reporter/client.go index 300c055dd13c5..68181f4247ce7 100644 --- a/util/topsql/reporter/client.go +++ b/util/topsql/reporter/client.go @@ -110,10 +110,10 @@ func (r *GRPCReportClient) sendBatchCPUTimeRecord(ctx context.Context, records [ } for _, record := range records { record := &tipb.CPUTimeRecord{ - RecordListTimestampSec: record.TimestampList, - RecordListCpuTimeMs: record.CPUTimeMsList, - SqlDigest: record.SQLDigest, - PlanDigest: record.PlanDigest, + TimestampList: record.TimestampList, + CpuTimeMsList: record.CPUTimeMsList, + SqlDigest: record.SQLDigest, + PlanDigest: record.PlanDigest, } if err := stream.Send(record); err != nil { return err diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index fb3417c149373..e3c4c030b89c8 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -221,7 +221,7 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { return buf.String() } -func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { +func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topn, evicted []tracecpu.SQLCPUTimeRecord) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil @@ -233,7 +233,7 @@ func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRec return records[:maxStmt], records[maxStmt:] } -func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { +func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topn, evicted []*dataPoints) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil From c83e125867cae4123a8fc429988128babbd38ae1 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 28 Jun 2021 18:05:36 +0800 Subject: [PATCH 06/12] Revert "fix test" This reverts commit 18b57b77a5d65d1eec27fbce674ae5dc9e233b0c. --- util/topsql/reporter/reporter_test.go | 26 +++++++++++++------------- util/topsql/topsql_test.go | 6 +++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 8728f992d80ae..8a25fe2a84235 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -111,13 +111,13 @@ func (s *testTopSQLReporter) TestCollectAndSendBatch(c *C) { c.Assert(err, IsNil) id = n } - c.Assert(req.RecordListCpuTimeMs, HasLen, 1) - for i := range req.RecordListCpuTimeMs { - c.Assert(req.RecordListCpuTimeMs[i], Equals, uint32(id)) + c.Assert(req.CpuTimeMsList, HasLen, 1) + for i := range req.CpuTimeMsList { + c.Assert(req.CpuTimeMsList[i], Equals, uint32(id)) } - c.Assert(req.RecordListTimestampSec, HasLen, 1) - for i := range req.RecordListTimestampSec { - c.Assert(req.RecordListTimestampSec[i], Equals, uint64(1)) + c.Assert(req.TimestampList, HasLen, 1) + for i := range req.TimestampList { + c.Assert(req.TimestampList[i], Equals, uint64(1)) } normalizedSQL, exist := agentServer.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) c.Assert(exist, IsTrue) @@ -152,13 +152,13 @@ func (s *testTopSQLReporter) TestCollectAndEvicted(c *C) { id = n } c.Assert(id >= maxSQLNum, IsTrue) - c.Assert(req.RecordListCpuTimeMs, HasLen, 1) - for i := range req.RecordListCpuTimeMs { - c.Assert(req.RecordListCpuTimeMs[i], Equals, uint32(id)) + c.Assert(req.CpuTimeMsList, HasLen, 1) + for i := range req.CpuTimeMsList { + c.Assert(req.CpuTimeMsList[i], Equals, uint32(id)) } - c.Assert(req.RecordListTimestampSec, HasLen, 1) - for i := range req.RecordListTimestampSec { - c.Assert(req.RecordListTimestampSec[i], Equals, uint64(2)) + c.Assert(req.TimestampList, HasLen, 1) + for i := range req.TimestampList { + c.Assert(req.TimestampList[i], Equals, uint64(2)) } normalizedSQL, exist := agentServer.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) c.Assert(exist, IsTrue) @@ -234,7 +234,7 @@ func (s *testTopSQLReporter) TestCollectAndTopN(c *C) { c.Fatalf("the id should be 1 or 3, got: %v", id) } total := uint32(0) - for _, v := range req.RecordListCpuTimeMs { + for _, v := range req.CpuTimeMsList { total += v } c.Assert(total, Equals, uint32(3)) diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index 88748e110cff9..0663a4f307b95 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -167,9 +167,9 @@ func (s *testSuite) TestTopSQLReporter(c *C) { records := server.GetLatestRecords() checkSQLPlanMap := map[string]struct{}{} for _, req := range records { - c.Assert(len(req.RecordListCpuTimeMs) > 0, IsTrue) - c.Assert(req.RecordListCpuTimeMs[0] > 0, IsTrue) - c.Assert(req.RecordListCpuTimeMs[0] > 0, IsTrue) + c.Assert(len(req.CpuTimeMsList) > 0, IsTrue) + c.Assert(req.CpuTimeMsList[0] > 0, IsTrue) + c.Assert(req.TimestampList[0] > 0, IsTrue) normalizedSQL, exist := server.GetSQLMetaByDigestBlocking(req.SqlDigest, time.Second) c.Assert(exist, IsTrue) expectedNormalizedSQL, exist := sqlMap[string(req.SqlDigest)] From e4d9fbcd60a9d606f7f7a19db725de4d8202476a Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 28 Jun 2021 18:07:01 +0800 Subject: [PATCH 07/12] address comment Signed-off-by: crazycs --- sessionctx/variable/tidb_vars.go | 2 +- util/topsql/reporter/reporter.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3a6ae0a067576..66c7ae9c18380 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -712,7 +712,7 @@ const ( DefTiDBTopSQLEnable = false DefTiDBTopSQLAgentAddress = "" DefTiDBTopSQLPrecisionSeconds = 1 - DefTiDBTopSQLMaxStatementCount = 2000 + DefTiDBTopSQLMaxStatementCount = 200 DefTiDBTopSQLMaxCollect = 10000 DefTiDBTopSQLReportIntervalSeconds = 60 DefTiDBEnableGlobalTemporaryTable = false diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index e3c4c030b89c8..fb3417c149373 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -221,7 +221,7 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { return buf.String() } -func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topn, evicted []tracecpu.SQLCPUTimeRecord) { +func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil @@ -233,7 +233,7 @@ func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRec return records[:maxStmt], records[maxStmt:] } -func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topn, evicted []*dataPoints) { +func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil From 0f8b0e220e1401f2c8a952f9927d7cc59c98e86e Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 28 Jun 2021 18:59:58 +0800 Subject: [PATCH 08/12] remove useless sleep Signed-off-by: crazycs --- util/topsql/reporter/reporter.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index fb3417c149373..1e49533341cfc 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -372,10 +372,6 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { for { select { case data := <-tsr.reportDataChan: - // When `reportDataChan` receives something, there could be ongoing `RegisterSQL` and `RegisterPlan` running, - // who writes to the data structure that `data` contains. So we wait for a little while to ensure that - // these writes are finished. - time.Sleep(time.Millisecond * 100) tsr.doReport(data) case <-tsr.ctx.Done(): return From dee77431a91e02a7d56be6cb2258c7e572e30c7f Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 29 Jun 2021 11:09:45 +0800 Subject: [PATCH 09/12] address comment Signed-off-by: crazycs --- util/topsql/reporter/reporter.go | 46 +++++++++++++++----------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 1e49533341cfc..a9647aa1b58f0 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -61,17 +61,17 @@ type dataPoints struct { CPUTimeMsTotal uint64 } -type dataPointsSlice []*dataPoints +type dataPointsOrderByCPUTime []*dataPoints -func (t dataPointsSlice) Len() int { +func (t dataPointsOrderByCPUTime) Len() int { return len(t) } -func (t dataPointsSlice) Less(i, j int) bool { +func (t dataPointsOrderByCPUTime) Less(i, j int) bool { // We need find the kth largest value, so here should use > return t[i].CPUTimeMsTotal > t[j].CPUTimeMsTotal } -func (t dataPointsSlice) Swap(i, j int) { +func (t dataPointsOrderByCPUTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] } @@ -221,7 +221,7 @@ func encodeKey(buf *bytes.Buffer, sqlDigest, planDigest []byte) string { return buf.String() } -func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { +func getTopNRecords(records []tracecpu.SQLCPUTimeRecord) (topN, shouldEvict []tracecpu.SQLCPUTimeRecord) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil @@ -233,12 +233,12 @@ func (tsr *RemoteTopSQLReporter) getTopNRecords(records []tracecpu.SQLCPUTimeRec return records[:maxStmt], records[maxStmt:] } -func (tsr *RemoteTopSQLReporter) getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { +func getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) { maxStmt := int(variable.TopSQLVariable.MaxStatementCount.Load()) if len(records) <= maxStmt { return records, nil } - if err := quickselect.QuickSelect(dataPointsSlice(records), maxStmt); err != nil { + if err := quickselect.QuickSelect(dataPointsOrderByCPUTime(records), maxStmt); err != nil { // skip eviction return records, nil } @@ -251,7 +251,7 @@ func (tsr *RemoteTopSQLReporter) doCollect( defer util.Recover("top-sql", "doCollect", nil, false) var evicted []tracecpu.SQLCPUTimeRecord - records, evicted = tsr.getTopNRecords(records) + records, evicted = getTopNRecords(records) keyBuf := bytes.NewBuffer(make([]byte, 0, 64)) listCapacity := int(variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1) if listCapacity < 1 { @@ -304,19 +304,22 @@ func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *m for _, v := range *collectedDataPtr { records = append(records, v) } - var evicted []*dataPoints - records, evicted = tsr.getTopNDataPoints(records) normalizedSQLMap := tsr.normalizedSQLMap.Load().(*sync.Map) normalizedPlanMap := tsr.normalizedPlanMap.Load().(*sync.Map) + + // Reset data for next report. + *collectedDataPtr = make(map[string]*dataPoints) + tsr.normalizedSQLMap.Store(&sync.Map{}) + tsr.normalizedPlanMap.Store(&sync.Map{}) + tsr.sqlMapLength.Store(0) + tsr.planMapLength.Store(0) + + // Evict redundant data. + var evicted []*dataPoints + records, evicted = getTopNDataPoints(records) for _, evict := range evicted { - _, loaded := normalizedSQLMap.LoadAndDelete(string(evict.SQLDigest)) - if loaded { - tsr.sqlMapLength.Add(-1) - } - _, loaded = normalizedPlanMap.LoadAndDelete(string(evict.PlanDigest)) - if loaded { - tsr.planMapLength.Add(-1) - } + normalizedSQLMap.LoadAndDelete(string(evict.SQLDigest)) + normalizedPlanMap.LoadAndDelete(string(evict.PlanDigest)) } data := reportData{ @@ -325,13 +328,6 @@ func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan(collectedDataPtr *m normalizedPlanMap: normalizedPlanMap, } - // Reset data for next report. - *collectedDataPtr = make(map[string]*dataPoints) - tsr.normalizedSQLMap.Store(&sync.Map{}) - tsr.normalizedPlanMap.Store(&sync.Map{}) - tsr.sqlMapLength.Store(0) - tsr.planMapLength.Store(0) - // Send to report channel. When channel is full, data will be dropped. select { case tsr.reportDataChan <- data: From 179a90f0d120a9bc13842dcb184ea0a69f10da31 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 29 Jun 2021 11:43:03 +0800 Subject: [PATCH 10/12] add comment Signed-off-by: crazycs --- util/topsql/reporter/reporter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index a9647aa1b58f0..3b938c978455a 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -245,18 +245,21 @@ func getTopNDataPoints(records []*dataPoints) (topN, shouldEvict []*dataPoints) return records[:maxStmt], records[maxStmt:] } -// doCollect uses a hashmap to store records in every second, and evict when necessary. +// doCollect collects top N records of each round into collectTarget, and evict the data that is not in top N. func (tsr *RemoteTopSQLReporter) doCollect( collectTarget map[string]*dataPoints, timestamp uint64, records []tracecpu.SQLCPUTimeRecord) { defer util.Recover("top-sql", "doCollect", nil, false) + // Get top N records of each round records. var evicted []tracecpu.SQLCPUTimeRecord records, evicted = getTopNRecords(records) + keyBuf := bytes.NewBuffer(make([]byte, 0, 64)) listCapacity := int(variable.TopSQLVariable.ReportIntervalSeconds.Load()/variable.TopSQLVariable.PrecisionSeconds.Load() + 1) if listCapacity < 1 { listCapacity = 1 } + // Collect the top N records to collectTarget for each round. for _, record := range records { key := encodeKey(keyBuf, record.SQLDigest, record.PlanDigest) entry, exist := collectTarget[key] @@ -277,6 +280,7 @@ func (tsr *RemoteTopSQLReporter) doCollect( entry.CPUTimeMsTotal += uint64(record.CPUTimeMs) } + // Evict redundant data. normalizedSQLMap := tsr.normalizedSQLMap.Load().(*sync.Map) normalizedPlanMap := tsr.normalizedPlanMap.Load().(*sync.Map) for _, evict := range evicted { From 1520b50a932d83e814a8e4f59f1600f35f91bdee Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 29 Jun 2021 13:06:44 +0800 Subject: [PATCH 11/12] add sleep back Signed-off-by: crazycs --- util/topsql/reporter/reporter.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 3b938c978455a..97f79c8395221 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -372,6 +372,10 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { for { select { case data := <-tsr.reportDataChan: + // When `reportDataChan` receives something, there could be ongoing `RegisterSQL` and `RegisterPlan` running, + // who writes to the data structure that `data` contains. So we wait for a little while to ensure that + // these writes are finished. + time.Sleep(time.Millisecond * 100) tsr.doReport(data) case <-tsr.ctx.Done(): return From 0d1242098f0f8fabd6ecb66a932646cbb4211a8c Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 29 Jun 2021 20:16:23 +0800 Subject: [PATCH 12/12] fix test Signed-off-by: crazycs --- util/topsql/reporter/reporter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 7c9f70d60db64..8728f992d80ae 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -234,7 +234,7 @@ func (s *testTopSQLReporter) TestCollectAndTopN(c *C) { c.Fatalf("the id should be 1 or 3, got: %v", id) } total := uint32(0) - for _, v := range req.CpuTimeMsList { + for _, v := range req.RecordListCpuTimeMs { total += v } c.Assert(total, Equals, uint32(3))