diff --git a/executor/analyze.go b/executor/analyze.go index 06a437ce60013..8c84913ed2e6d 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -331,7 +331,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee if needCMS { if resp.Cms == nil { logutil.Logger(context.TODO()).Warn("nil CMS in response", zap.String("table", e.idxInfo.Table.O), zap.String("index", e.idxInfo.Name.O)) - } else if err := cms.MergeCMSketch(statistics.CMSketchFromProto(resp.Cms)); err != nil { + } else if err := cms.MergeCMSketch(statistics.CMSketchFromProto(resp.Cms), 0); err != nil { return nil, nil, err } } @@ -972,11 +972,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e } } -func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) { - // build collector properties. - collector.Samples = collector.Samples[:e.sampCursor] - sort.Slice(collector.Samples, func(i, j int) bool { return collector.Samples[i].RowID < collector.Samples[j].RowID }) - collector.CalcTotalSize() +func (e *AnalyzeFastExec) buildColumnStats(ID int64, collector *statistics.SampleCollector, tp *types.FieldType, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, error) { data := make([][]byte, 0, len(collector.Samples)) for i, sample := range collector.Samples { sample.Ordinal = i @@ -986,29 +982,49 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec } bytes, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, sample.Value) if err != nil { - return nil, err + return nil, nil, err } data = append(data, bytes) } - handle := domain.GetDomain(e.ctx).StatsHandle() - tblStats := handle.GetTableStats(e.tblInfo) - rowCount := int64(e.rowCount) - if handle.Lease() > 0 && !tblStats.Pseudo { - rowCount = mathutil.MinInt64(tblStats.Count, rowCount) - } - // Adjust the row count in case the count of `tblStats` is not accurate and too small. - rowCount = mathutil.MaxInt64(rowCount, int64(len(collector.Samples))) - // build CMSketch - var ndv, scaleRatio uint64 - collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount)) - // Scale the total column size. - collector.TotalSize *= rowCount / int64(len(collector.Samples)) - // build Histogram + // Build CMSketch. + cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount)) + // Build Histogram. hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount, int64(ndv), collector.NullCount*int64(scaleRatio)) - if err != nil { - return nil, err + return hist, cmSketch, err +} + +func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *statistics.SampleCollector, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, error) { + data := make([][][]byte, len(idxInfo.Columns), len(idxInfo.Columns)) + for _, sample := range collector.Samples { + var preLen int + remained := sample.Value.GetBytes() + // We need to insert each prefix values into CM Sketch. + for i := 0; i < len(idxInfo.Columns); i++ { + var err error + var value []byte + value, remained, err = codec.CutOne(remained) + if err != nil { + return nil, nil, err + } + preLen += len(value) + data[i] = append(data[i], sample.Value.GetBytes()[:preLen]) + } + } + numTop := uint32(20) + cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[0], numTop, uint64(rowCount)) + // Build CM Sketch for each prefix and merge them into one. + for i := 1; i < len(idxInfo.Columns); i++ { + var curCMSketch *statistics.CMSketch + // `ndv` should be the ndv of full index, so just rewrite it here. + curCMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[i], numTop, uint64(rowCount)) + err := cmSketch.MergeCMSketch(curCMSketch, numTop) + if err != nil { + return nil, nil, err + } } - return hist, nil + // Build Histogram. + hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio)) + return hist, cmSketch, err } func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) { @@ -1044,19 +1060,32 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS return nil, nil, err } + stats := domain.GetDomain(e.ctx).StatsHandle() + rowCount := int64(e.rowCount) + if stats.Lease() > 0 { + rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount) + } hists, cms := make([]*statistics.Histogram, length), make([]*statistics.CMSketch, length) for i := 0; i < length; i++ { + // Build collector properties. + collector := e.collectors[i] + collector.Samples = collector.Samples[:e.sampCursor] + sort.Slice(collector.Samples, func(i, j int) bool { return collector.Samples[i].RowID < collector.Samples[j].RowID }) + collector.CalcTotalSize() + // Adjust the row count in case the count of `tblStats` is not accurate and too small. + rowCount = mathutil.MaxInt64(rowCount, int64(len(collector.Samples))) + // Scale the total column size. + collector.TotalSize *= rowCount / int64(len(collector.Samples)) if i < hasPKInfo { - hists[i], err = e.buildHist(e.pkInfo.ID, e.collectors[i], &e.pkInfo.FieldType) + hists[i], cms[i], err = e.buildColumnStats(e.pkInfo.ID, e.collectors[i], &e.pkInfo.FieldType, rowCount) } else if i < hasPKInfo+len(e.colsInfo) { - hists[i], err = e.buildHist(e.colsInfo[i-hasPKInfo].ID, e.collectors[i], &e.colsInfo[i-hasPKInfo].FieldType) + hists[i], cms[i], err = e.buildColumnStats(e.colsInfo[i-hasPKInfo].ID, e.collectors[i], &e.colsInfo[i-hasPKInfo].FieldType, rowCount) } else { - hists[i], err = e.buildHist(e.idxsInfo[i-hasPKInfo-len(e.colsInfo)].ID, e.collectors[i], types.NewFieldType(mysql.TypeBlob)) + hists[i], cms[i], err = e.buildIndexStats(e.idxsInfo[i-hasPKInfo-len(e.colsInfo)], e.collectors[i], rowCount) } if err != nil { return nil, nil, err } - cms[i] = e.collectors[i].CMSketch } return hists, cms, nil } diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 5c48ccde9a7dd..d07debd9b9f87 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -286,6 +286,20 @@ func (s *testSuite1) TestFastAnalyze(c *C) { "num: 603 lower_bound: 1250 upper_bound: 1823 repeats: 1\n"+ "num: 603 lower_bound: 1830 upper_bound: 2379 repeats: 1\n"+ "num: 588 lower_bound: 2380 upper_bound: 2998 repeats: 1") + + // Test CM Sketch built from fast analyze. + tk.MustExec("create table t1(a int, b int, index idx(a, b))") + tk.MustExec("insert into t1 values (1,1),(1,1),(1,2),(1,2)") + tk.MustExec("analyze table t1") + tk.MustQuery("explain select a from t1 where a = 1").Check(testkit.Rows( + "IndexReader_6 4.00 root index:IndexScan_5", + "└─IndexScan_5 4.00 cop table:t1, index:a, b, range:[1,1], keep order:false")) + tk.MustQuery("explain select a, b from t1 where a = 1 and b = 1").Check(testkit.Rows( + "IndexReader_6 2.00 root index:IndexScan_5", + "└─IndexScan_5 2.00 cop table:t1, index:a, b, range:[1 1,1 1], keep order:false")) + tk.MustQuery("explain select a, b from t1 where a = 1 and b = 2").Check(testkit.Rows( + "IndexReader_6 2.00 root index:IndexScan_5", + "└─IndexScan_5 2.00 cop table:t1, index:a, b, range:[1 2,1 2], keep order:false")) } func (s *testSuite1) TestAnalyzeIncremental(c *C) { diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index ac717bd930835..86ea0338a4530 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -287,14 +287,46 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 { return uint64(res) } +func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32) { + counter := make(map[hack.MutableString]uint64) + for _, metas := range lTopN { + for _, meta := range metas { + counter[hack.String(meta.Data)] += meta.Count + } + } + for _, metas := range rTopN { + for _, meta := range metas { + counter[hack.String(meta.Data)] += meta.Count + } + } + sorted := make([]uint64, len(counter)) + for _, cnt := range counter { + sorted = append(sorted, cnt) + } + sort.Slice(sorted, func(i, j int) bool { + return sorted[i] > sorted[j] + }) + numTop = mathutil.MinUint32(uint32(len(counter)), numTop) + lastTopCnt := sorted[numTop-1] + c.topN = make(map[uint64][]*TopNMeta) + for value, cnt := range counter { + data := hack.Slice(string(value)) + if cnt >= lastTopCnt { + h1, h2 := murmur3.Sum128(data) + c.topN[h1] = append(c.topN[h1], &TopNMeta{h2, data, cnt}) + } else { + c.insertBytesByCount(data, cnt) + } + } +} + // MergeCMSketch merges two CM Sketch. -// Call with CMSketch with Top-N initialized may downgrade the result -func (c *CMSketch) MergeCMSketch(rc *CMSketch) error { +func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error { if c.depth != rc.depth || c.width != rc.width { return errors.New("Dimensions of Count-Min Sketch should be the same") } if c.topN != nil || rc.topN != nil { - return errors.New("CMSketch with Top-N does not support merge") + c.mergeTopN(c.topN, rc.topN, numTopN) } c.count += rc.count for i := range c.table { diff --git a/statistics/cmsketch_test.go b/statistics/cmsketch_test.go index 0c038fb4bef5b..ac04e01429b6b 100644 --- a/statistics/cmsketch_test.go +++ b/statistics/cmsketch_test.go @@ -136,7 +136,7 @@ func (s *testStatisticsSuite) TestCMSketch(c *C) { c.Assert(err, IsNil) c.Check(avg, LessEqual, t.avgError) - err = lSketch.MergeCMSketch(rSketch) + err = lSketch.MergeCMSketch(rSketch, 0) c.Assert(err, IsNil) for val, count := range rMap { lMap[val] += count diff --git a/statistics/sample.go b/statistics/sample.go index 41d139ebdef2b..64872746fdf2c 100644 --- a/statistics/sample.go +++ b/statistics/sample.go @@ -91,7 +91,7 @@ func (c *SampleCollector) MergeSampleCollector(sc *stmtctx.StatementContext, rc c.TotalSize += rc.TotalSize c.FMSketch.mergeFMSketch(rc.FMSketch) if rc.CMSketch != nil { - err := c.CMSketch.MergeCMSketch(rc.CMSketch) + err := c.CMSketch.MergeCMSketch(rc.CMSketch, 0) terror.Log(errors.Trace(err)) } for _, item := range rc.Samples {