Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support building stats for fast analyze. #10258

Merged
merged 16 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/cznic/mathutil"

lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -190,6 +192,8 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
task.job.Start()
resultCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
task.fastExec.job = task.job
task.job.Start()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
for _, result := range analyzeFastExec(task.fastExec) {
resultCh <- result
}
Expand Down Expand Up @@ -507,7 +511,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
hists, cms, err := exec.buildStats()
if err != nil {
return []analyzeResult{{Err: err}}
return []analyzeResult{{Err: err, job: exec.job}}
}
var results []analyzeResult
hasPKInfo := 0
Expand All @@ -522,6 +526,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Cms: []*statistics.CMSketch{cms[i]},
IsIndex: 1,
Count: hists[i].NullCount,
job: exec.job,
}
if hists[i].Len() > 0 {
idxResult.Count += hists[i].Buckets[hists[i].Len()-1].Count
Expand All @@ -535,6 +540,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Hist: hists[:hasPKInfo+len(exec.colsInfo)],
Cms: cms[:hasPKInfo+len(exec.colsInfo)],
Count: hist.NullCount,
job: exec.job,
}
if hist.Len() > 0 {
colResult.Count += hist.Buckets[hist.Len()-1].Count
Expand All @@ -560,6 +566,7 @@ type AnalyzeFastExec struct {
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
tblInfo *model.TableInfo
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
Expand All @@ -569,6 +576,7 @@ type AnalyzeFastExec struct {
scanTasks []*tikv.KeyLocation
collectors []*statistics.SampleCollector
randSeed int64
job *statistics.AnalyzeJob
}

func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild *bool, err *error, sampTasks *[]*AnalyzeFastTask) {
Expand Down Expand Up @@ -897,8 +905,33 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
}

func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) {
// TODO: build histogram and cmsketch here for one collector.
return nil, nil
// build collector properties.
collector.Samples = collector.Samples[:e.sampCursor]
err := collector.UpdateTotalSize()
if err != nil {
return nil, err
}
collector.Count = int64(e.sampCursor)
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
data := make([][]byte, 0, len(collector.Samples))
for _, sample := range collector.Samples {
bytes, err := sample.Value.ToString()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
data = append(data, []byte(bytes))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if sample.Value.IsNull() {
collector.NullCount++
}
}
rowCount := mathutil.MinInt64(domain.GetDomain(e.ctx).StatsHandle().GetTableStats(e.tblInfo).Count, int64(e.rowCount))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
// build CMSketch
collector.CMSketch = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, uint32(e.sampCursor), uint64(rowCount))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
// build Histogram
hist, err := statistics.BuildColumnWithSamples(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount)
if err != nil {
return nil, errors.Trace(err)
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
return hist, nil
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) {
Expand Down Expand Up @@ -1009,6 +1042,7 @@ type AnalyzeTestFastExec struct {
IdxsInfo []*model.IndexInfo
Concurrency int
Collectors []*statistics.SampleCollector
TblInfo *model.TableInfo
}

// TestFastSample only test the fast sample in unit test.
Expand All @@ -1020,6 +1054,7 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {
e.concurrency = e.Concurrency
e.physicalTableID = e.PhysicalTableID
e.wg = &sync.WaitGroup{}
e.tblInfo = e.TblInfo
_, _, err := e.buildStats()
e.Collectors = e.collectors
return err
Expand Down
85 changes: 84 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
IdxsInfo: indicesInfo,
Concurrency: 1,
PhysicalTableID: tbl.(table.PhysicalTable).GetPhysicalID(),
TblInfo: tblInfo,
}
err = mockExec.TestFastSample()
c.Assert(err, IsNil)
Expand All @@ -197,5 +198,87 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
vals[i] = append(vals[i], s)
}
}
c.Assert(fmt.Sprintln(vals), Equals, "[[0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54] [0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54]]\n")
c.Assert(fmt.Sprintln(vals), Equals, "[[0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58] [0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58]]\n")
}

func (s *testSuite1) TestFastAnalyze(c *C) {
cluster := mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(cluster)
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(cluster),
)
c.Assert(err, IsNil)
var dom *domain.Domain
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 1000
executor.RandSeed = 123

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
for i := 0; i < 3000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID

// construct 5 regions split by {600, 1200, 1800, 2400}
splitKeys := generateTableSplitKeyForInt(tid, []int{600, 1200, 1800, 2400})
manipulateCluster(cluster, splitKeys)

tk.MustExec("analyze table t with 5 buckets")

is := executor.GetInfoSchema(tk.Se.(sessionctx.Context))
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
tbl := dom.StatsHandle().GetTableStats(tableInfo)
sTbl := fmt.Sprintln(tbl)
matched := false
if sTbl == "Table:37 Count:3000\n"+
"column:1 ndv:3000 totColSize:3639\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:2 ndv:3000 totColSize:3639\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" ||
sTbl == "Table:37 Count:3000\n"+
"column:2 ndv:3000 totColSize:3639\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:1 ndv:3000 totColSize:3639\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" {
matched = true
}
c.Assert(matched, Equals, true)
}
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,9 +1435,11 @@ func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercor
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "fast analyze columns"},
})
}
}
Expand All @@ -1464,9 +1466,11 @@ func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore
physicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: "fast analyze index " + task.IndexInfo.Name.O},
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,20 @@ type analyzeInfo struct {
PartitionName string
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
}

// AnalyzeColumnsTask is used for analyze columns.
type AnalyzeColumnsTask struct {
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
TblInfo *model.TableInfo
analyzeInfo
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
IndexInfo *model.IndexInfo
TblInfo *model.TableInfo
analyzeInfo
}

Expand Down
6 changes: 4 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
IndexInfo: idx,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -822,6 +823,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
PKInfo: pkInfo,
ColsInfo: colInfo,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -843,7 +845,7 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
}
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
return p, nil
Expand All @@ -860,7 +862,7 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) (Plan, erro
if idx.State == model.StatePublic {
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
}
Expand Down
32 changes: 26 additions & 6 deletions statistics/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package statistics

import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -93,23 +95,20 @@ func (b *SortedBuilder) Iterate(data types.Datum) error {
return nil
}

// BuildColumn builds histogram from samples for column.
func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) {
count := collector.Count
ndv := collector.FMSketch.NDV()
func buildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType, count int64, ndv int64, nullCount int64) (*Histogram, error) {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if ndv > count {
ndv = count
}
if count == 0 || len(collector.Samples) == 0 {
return NewHistogram(id, ndv, collector.NullCount, 0, tp, 0, collector.TotalSize), nil
return NewHistogram(id, ndv, nullCount, 0, tp, 0, collector.TotalSize), nil
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
err := SortSampleItems(sc, samples)
if err != nil {
return nil, err
}
hg := NewHistogram(id, ndv, collector.NullCount, 0, tp, int(numBuckets), collector.TotalSize)
hg := NewHistogram(id, ndv, nullCount, 0, tp, int(numBuckets), collector.TotalSize)

sampleNum := int64(len(samples))
// As we use samples to build the histogram, the bucket number and repeat should multiply a factor.
Expand Down Expand Up @@ -174,3 +173,24 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum)
return hg, nil
}

// BuildColumn builds histogram from samples for column.
func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) {
return buildColumnHist(ctx, numBuckets, id, collector, tp, collector.Count, collector.FMSketch.NDV(), collector.NullCount)
}

// BuildColumnWithSamples builds histogram from samples for column.
// It was used in that collector.Count is not the entire count but the sample count.
func BuildColumnWithSamples(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType, count int64) (*Histogram, error) {
samplesBytes := make([][]byte, 0, len(collector.Samples))
for _, sample := range collector.Samples {
str, err := sample.Value.ToString()
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
samplesBytes = append(samplesBytes, []byte(str))
}
ndv, _ := calculateEstimateNDV(newTopNHelper(samplesBytes, 0), uint64(count))
nullCount := collector.NullCount * int64(math.Round(float64(count)/float64(collector.Count)))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
return buildColumnHist(ctx, numBuckets, id, collector, tp, count, int64(ndv), nullCount)
}
14 changes: 14 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"sort"
"unicode/utf8"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -173,6 +174,19 @@ func (c *SampleCollector) collect(sc *stmtctx.StatementContext, d types.Datum) e
return nil
}

// UpdateTotalSize is to calculate total size based on samples.
func (c *SampleCollector) UpdateTotalSize() error {
c.TotalSize = 0
for _, item := range c.Samples {
str, err := item.Value.ToString()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
c.TotalSize += int64(utf8.RuneCountInString(str))
}
return nil
}

// SampleBuilder is used to build samples for columns.
// Also, if primary key is handle, it will directly build histogram for it.
type SampleBuilder struct {
Expand Down