Skip to content

Commit

Permalink
stats: fix the index cm sketch for fast analyze (#10839)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and zz-jason committed Jun 22, 2019
1 parent c8d1ff7 commit b26cb0d
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 33 deletions.
85 changes: 57 additions & 28 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
if needCMS {
if resp.Cms == nil {
logutil.Logger(context.TODO()).Warn("nil CMS in response", zap.String("table", e.idxInfo.Table.O), zap.String("index", e.idxInfo.Name.O))
} else if err := cms.MergeCMSketch(statistics.CMSketchFromProto(resp.Cms)); err != nil {
} else if err := cms.MergeCMSketch(statistics.CMSketchFromProto(resp.Cms), 0); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -972,11 +972,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
}
}

func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) {
// build collector properties.
collector.Samples = collector.Samples[:e.sampCursor]
sort.Slice(collector.Samples, func(i, j int) bool { return collector.Samples[i].RowID < collector.Samples[j].RowID })
collector.CalcTotalSize()
func (e *AnalyzeFastExec) buildColumnStats(ID int64, collector *statistics.SampleCollector, tp *types.FieldType, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, error) {
data := make([][]byte, 0, len(collector.Samples))
for i, sample := range collector.Samples {
sample.Ordinal = i
Expand All @@ -986,29 +982,49 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec
}
bytes, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, sample.Value)
if err != nil {
return nil, err
return nil, nil, err
}
data = append(data, bytes)
}
handle := domain.GetDomain(e.ctx).StatsHandle()
tblStats := handle.GetTableStats(e.tblInfo)
rowCount := int64(e.rowCount)
if handle.Lease() > 0 && !tblStats.Pseudo {
rowCount = mathutil.MinInt64(tblStats.Count, rowCount)
}
// Adjust the row count in case the count of `tblStats` is not accurate and too small.
rowCount = mathutil.MaxInt64(rowCount, int64(len(collector.Samples)))
// build CMSketch
var ndv, scaleRatio uint64
collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount))
// Scale the total column size.
collector.TotalSize *= rowCount / int64(len(collector.Samples))
// build Histogram
// Build CMSketch.
cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount))
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
if err != nil {
return nil, err
return hist, cmSketch, err
}

func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *statistics.SampleCollector, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, error) {
data := make([][][]byte, len(idxInfo.Columns), len(idxInfo.Columns))
for _, sample := range collector.Samples {
var preLen int
remained := sample.Value.GetBytes()
// We need to insert each prefix values into CM Sketch.
for i := 0; i < len(idxInfo.Columns); i++ {
var err error
var value []byte
value, remained, err = codec.CutOne(remained)
if err != nil {
return nil, nil, err
}
preLen += len(value)
data[i] = append(data[i], sample.Value.GetBytes()[:preLen])
}
}
numTop := uint32(20)
cmSketch, ndv, scaleRatio := statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[0], numTop, uint64(rowCount))
// Build CM Sketch for each prefix and merge them into one.
for i := 1; i < len(idxInfo.Columns); i++ {
var curCMSketch *statistics.CMSketch
// `ndv` should be the ndv of full index, so just rewrite it here.
curCMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data[i], numTop, uint64(rowCount))
err := cmSketch.MergeCMSketch(curCMSketch, numTop)
if err != nil {
return nil, nil, err
}
}
return hist, nil
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
return hist, cmSketch, err
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) {
Expand Down Expand Up @@ -1044,19 +1060,32 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
return nil, nil, err
}

stats := domain.GetDomain(e.ctx).StatsHandle()
rowCount := int64(e.rowCount)
if stats.Lease() > 0 {
rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount)
}
hists, cms := make([]*statistics.Histogram, length), make([]*statistics.CMSketch, length)
for i := 0; i < length; i++ {
// Build collector properties.
collector := e.collectors[i]
collector.Samples = collector.Samples[:e.sampCursor]
sort.Slice(collector.Samples, func(i, j int) bool { return collector.Samples[i].RowID < collector.Samples[j].RowID })
collector.CalcTotalSize()
// Adjust the row count in case the count of `tblStats` is not accurate and too small.
rowCount = mathutil.MaxInt64(rowCount, int64(len(collector.Samples)))
// Scale the total column size.
collector.TotalSize *= rowCount / int64(len(collector.Samples))
if i < hasPKInfo {
hists[i], err = e.buildHist(e.pkInfo.ID, e.collectors[i], &e.pkInfo.FieldType)
hists[i], cms[i], err = e.buildColumnStats(e.pkInfo.ID, e.collectors[i], &e.pkInfo.FieldType, rowCount)
} else if i < hasPKInfo+len(e.colsInfo) {
hists[i], err = e.buildHist(e.colsInfo[i-hasPKInfo].ID, e.collectors[i], &e.colsInfo[i-hasPKInfo].FieldType)
hists[i], cms[i], err = e.buildColumnStats(e.colsInfo[i-hasPKInfo].ID, e.collectors[i], &e.colsInfo[i-hasPKInfo].FieldType, rowCount)
} else {
hists[i], err = e.buildHist(e.idxsInfo[i-hasPKInfo-len(e.colsInfo)].ID, e.collectors[i], types.NewFieldType(mysql.TypeBlob))
hists[i], cms[i], err = e.buildIndexStats(e.idxsInfo[i-hasPKInfo-len(e.colsInfo)], e.collectors[i], rowCount)
}
if err != nil {
return nil, nil, err
}
cms[i] = e.collectors[i].CMSketch
}
return hists, cms, nil
}
Expand Down
14 changes: 14 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,20 @@ func (s *testSuite1) TestFastAnalyze(c *C) {
"num: 603 lower_bound: 1250 upper_bound: 1823 repeats: 1\n"+
"num: 603 lower_bound: 1830 upper_bound: 2379 repeats: 1\n"+
"num: 588 lower_bound: 2380 upper_bound: 2998 repeats: 1")

// Test CM Sketch built from fast analyze.
tk.MustExec("create table t1(a int, b int, index idx(a, b))")
tk.MustExec("insert into t1 values (1,1),(1,1),(1,2),(1,2)")
tk.MustExec("analyze table t1")
tk.MustQuery("explain select a from t1 where a = 1").Check(testkit.Rows(
"IndexReader_6 4.00 root index:IndexScan_5",
"└─IndexScan_5 4.00 cop table:t1, index:a, b, range:[1,1], keep order:false"))
tk.MustQuery("explain select a, b from t1 where a = 1 and b = 1").Check(testkit.Rows(
"IndexReader_6 2.00 root index:IndexScan_5",
"└─IndexScan_5 2.00 cop table:t1, index:a, b, range:[1 1,1 1], keep order:false"))
tk.MustQuery("explain select a, b from t1 where a = 1 and b = 2").Check(testkit.Rows(
"IndexReader_6 2.00 root index:IndexScan_5",
"└─IndexScan_5 2.00 cop table:t1, index:a, b, range:[1 2,1 2], keep order:false"))
}

func (s *testSuite1) TestAnalyzeIncremental(c *C) {
Expand Down
38 changes: 35 additions & 3 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,46 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

func (c *CMSketch) mergeTopN(lTopN map[uint64][]*TopNMeta, rTopN map[uint64][]*TopNMeta, numTop uint32) {
counter := make(map[hack.MutableString]uint64)
for _, metas := range lTopN {
for _, meta := range metas {
counter[hack.String(meta.Data)] += meta.Count
}
}
for _, metas := range rTopN {
for _, meta := range metas {
counter[hack.String(meta.Data)] += meta.Count
}
}
sorted := make([]uint64, len(counter))
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
numTop = mathutil.MinUint32(uint32(len(counter)), numTop)
lastTopCnt := sorted[numTop-1]
c.topN = make(map[uint64][]*TopNMeta)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
h1, h2 := murmur3.Sum128(data)
c.topN[h1] = append(c.topN[h1], &TopNMeta{h2, data, cnt})
} else {
c.insertBytesByCount(data, cnt)
}
}
}

// MergeCMSketch merges two CM Sketch.
// Call with CMSketch with Top-N initialized may downgrade the result
func (c *CMSketch) MergeCMSketch(rc *CMSketch) error {
func (c *CMSketch) MergeCMSketch(rc *CMSketch, numTopN uint32) error {
if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
c.mergeTopN(c.topN, rc.topN, numTopN)
}
c.count += rc.count
for i := range c.table {
Expand Down
2 changes: 1 addition & 1 deletion statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *testStatisticsSuite) TestCMSketch(c *C) {
c.Assert(err, IsNil)
c.Check(avg, LessEqual, t.avgError)

err = lSketch.MergeCMSketch(rSketch)
err = lSketch.MergeCMSketch(rSketch, 0)
c.Assert(err, IsNil)
for val, count := range rMap {
lMap[val] += count
Expand Down
2 changes: 1 addition & 1 deletion statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *SampleCollector) MergeSampleCollector(sc *stmtctx.StatementContext, rc
c.TotalSize += rc.TotalSize
c.FMSketch.mergeFMSketch(rc.FMSketch)
if rc.CMSketch != nil {
err := c.CMSketch.MergeCMSketch(rc.CMSketch)
err := c.CMSketch.MergeCMSketch(rc.CMSketch, 0)
terror.Log(errors.Trace(err))
}
for _, item := range rc.Samples {
Expand Down

0 comments on commit b26cb0d

Please sign in to comment.