Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support building stats for fast analyze. #10258

Merged
merged 16 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -190,6 +191,8 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
task.job.Start()
resultCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
task.fastExec.job = task.job
task.job.Start()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
for _, result := range analyzeFastExec(task.fastExec) {
resultCh <- result
}
Expand Down Expand Up @@ -507,7 +510,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
hists, cms, err := exec.buildStats()
if err != nil {
return []analyzeResult{{Err: err}}
return []analyzeResult{{Err: err, job: exec.job}}
}
var results []analyzeResult
hasPKInfo := 0
Expand All @@ -522,6 +525,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Cms: []*statistics.CMSketch{cms[i]},
IsIndex: 1,
Count: hists[i].NullCount,
job: exec.job,
}
if hists[i].Len() > 0 {
idxResult.Count += hists[i].Buckets[hists[i].Len()-1].Count
Expand All @@ -535,6 +539,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Hist: hists[:hasPKInfo+len(exec.colsInfo)],
Cms: cms[:hasPKInfo+len(exec.colsInfo)],
Count: hist.NullCount,
job: exec.job,
}
if hist.Len() > 0 {
colResult.Count += hist.Buckets[hist.Len()-1].Count
Expand All @@ -560,6 +565,7 @@ type AnalyzeFastExec struct {
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
tblInfo *model.TableInfo
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
Expand All @@ -569,6 +575,7 @@ type AnalyzeFastExec struct {
scanTasks []*tikv.KeyLocation
collectors []*statistics.SampleCollector
randSeed int64
job *statistics.AnalyzeJob
}

func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild *bool, err *error, sampTasks *[]*AnalyzeFastTask) {
Expand Down Expand Up @@ -627,12 +634,12 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
}
}

// buildSampTask return tow variable, the first bool is whether the task meeting region error
// buildSampTask returns tow variables, the first bool is whether the task meets region error
// and need to rebuild.
func (e *AnalyzeFastExec) buildSampTask() (needRebuild bool, err error) {
// Do get regions row count.
bo := tikv.NewBackoffer(context.Background(), 500)
atomic.StoreUint64(&e.rowCount, 0)
e.rowCount = 0
needRebuildForRoutine := make([]bool, e.concurrency)
errs := make([]error, e.concurrency)
sampTasksForRoutine := make([][]*AnalyzeFastTask, e.concurrency)
Expand Down Expand Up @@ -897,8 +904,35 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
}

func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) {
// TODO: build histogram and cmsketch here for one collector.
return nil, nil
// build collector properties.
collector.Samples = collector.Samples[:e.sampCursor]
err := collector.CalcTotalSize()
if err != nil {
return nil, err
}
collector.Count = int64(e.sampCursor)
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
data := make([][]byte, 0, len(collector.Samples))
for _, sample := range collector.Samples {
if sample.Value.IsNull() {
collector.NullCount++
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
continue
}
bytes, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, sample.Value)
if err != nil {
return nil, err
}
data = append(data, bytes)
}
rowCount := mathutil.MinInt64(domain.GetDomain(e.ctx).StatsHandle().GetTableStats(e.tblInfo).Count, int64(e.rowCount))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
// build CMSketch
var ndv, scaleRatio uint64
collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, uint32(len(data)), uint64(rowCount))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use len(data) as topN?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea about the number of topN. So just keep the len(data) here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erjiaqing PTAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use a relative small number, maybe 20 or 40.
Finding element in TopN is not cheap, we should keep size of TopN relative small.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a constant ok? Or can we calculate it by the sample size?

// build Histogram
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount, int64(ndv), int64(scaleRatio))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return hist, nil
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) {
Expand Down Expand Up @@ -1009,6 +1043,7 @@ type AnalyzeTestFastExec struct {
IdxsInfo []*model.IndexInfo
Concurrency int
Collectors []*statistics.SampleCollector
TblInfo *model.TableInfo
}

// TestFastSample only test the fast sample in unit test.
Expand All @@ -1020,6 +1055,7 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {
e.concurrency = e.Concurrency
e.physicalTableID = e.PhysicalTableID
e.wg = &sync.WaitGroup{}
e.tblInfo = e.TblInfo
_, _, err := e.buildStats()
e.Collectors = e.collectors
return err
Expand Down
85 changes: 84 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
IdxsInfo: indicesInfo,
Concurrency: 1,
PhysicalTableID: tbl.(table.PhysicalTable).GetPhysicalID(),
TblInfo: tblInfo,
}
err = mockExec.TestFastSample()
c.Assert(err, IsNil)
Expand All @@ -197,5 +198,87 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
vals[i] = append(vals[i], s)
}
}
c.Assert(fmt.Sprintln(vals), Equals, "[[0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54] [0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54]]\n")
c.Assert(fmt.Sprintln(vals), Equals, "[[0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58] [0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58]]\n")
}

func (s *testSuite1) TestFastAnalyze(c *C) {
cluster := mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(cluster)
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(cluster),
)
c.Assert(err, IsNil)
var dom *domain.Domain
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 1000
executor.RandSeed = 123

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
for i := 0; i < 3000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID

// construct 5 regions split by {600, 1200, 1800, 2400}
splitKeys := generateTableSplitKeyForInt(tid, []int{600, 1200, 1800, 2400})
manipulateCluster(cluster, splitKeys)

tk.MustExec("analyze table t with 5 buckets")

is := executor.GetInfoSchema(tk.Se.(sessionctx.Context))
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
tbl := dom.StatsHandle().GetTableStats(tableInfo)
sTbl := fmt.Sprintln(tbl)
matched := false
if sTbl == "Table:37 Count:3001\n"+
"column:1 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:2 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" ||
sTbl == "Table:37 Count:3001\n"+
"column:2 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:1 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" {
matched = true
}
c.Assert(matched, Equals, true)
}
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,9 +1435,11 @@ func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercor
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "fast analyze columns"},
})
}
}
Expand All @@ -1464,9 +1466,11 @@ func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore
physicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: "fast analyze index " + task.IndexInfo.Name.O},
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,20 @@ type analyzeInfo struct {
PartitionName string
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
}

// AnalyzeColumnsTask is used for analyze columns.
type AnalyzeColumnsTask struct {
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
TblInfo *model.TableInfo
analyzeInfo
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
IndexInfo *model.IndexInfo
TblInfo *model.TableInfo
analyzeInfo
}

Expand Down
6 changes: 4 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
IndexInfo: idx,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -822,6 +823,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
PKInfo: pkInfo,
ColsInfo: colInfo,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -843,7 +845,7 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
}
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
return p, nil
Expand All @@ -860,7 +862,7 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) (Plan, erro
if idx.State == model.StatePublic {
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
}
Expand Down
22 changes: 16 additions & 6 deletions statistics/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,28 @@ func (b *SortedBuilder) Iterate(data types.Datum) error {
return nil
}

// BuildColumn builds histogram from samples for column.
func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) {
count := collector.Count
ndv := collector.FMSketch.NDV()
// BuildColumnHist build a histogram for a column.
// numBuckets: number of buckets for the histogram.
// id: the id of the table.
// collector: the collector of samples.
// tp: the FieldType for the column.
// count: represents the row count for the column.
// ndv: represents the number of distinct values for the column.
// nullCount: represents the number of null values for the column.
func BuildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType, count int64, ndv int64, nullCount int64) (*Histogram, error) {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if ndv > count {
ndv = count
}
if count == 0 || len(collector.Samples) == 0 {
return NewHistogram(id, ndv, collector.NullCount, 0, tp, 0, collector.TotalSize), nil
return NewHistogram(id, ndv, nullCount, 0, tp, 0, collector.TotalSize), nil
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
err := SortSampleItems(sc, samples)
if err != nil {
return nil, err
}
hg := NewHistogram(id, ndv, collector.NullCount, 0, tp, int(numBuckets), collector.TotalSize)
hg := NewHistogram(id, ndv, nullCount, 0, tp, int(numBuckets), collector.TotalSize)

sampleNum := int64(len(samples))
// As we use samples to build the histogram, the bucket number and repeat should multiply a factor.
Expand Down Expand Up @@ -174,3 +179,8 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum)
return hg, nil
}

// BuildColumn builds histogram from samples for column.
func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) {
return BuildColumnHist(ctx, numBuckets, id, collector, tp, collector.Count, collector.FMSketch.NDV(), collector.NullCount)
}
6 changes: 3 additions & 3 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ func newTopNHelper(sample [][]byte, numTop uint32) *topNHelper {
return &topNHelper{uint64(len(sample)), numTop, counter, sorted, onlyOnceItems, sumTopN, last}
}

// NewCMSketchWithTopN returns a new CM sketch with TopN elements.
func NewCMSketchWithTopN(d, w int32, sample [][]byte, numTop uint32, rowCount uint64) *CMSketch {
// NewCMSketchWithTopN returns a new CM sketch with TopN elements, the estimate NDV and the scale ratio.
func NewCMSketchWithTopN(d, w int32, sample [][]byte, numTop uint32, rowCount uint64) (*CMSketch, uint64, uint64) {
helper := newTopNHelper(sample, numTop)
// rowCount is not a accurate value when fast analyzing
// In some cases, if user triggers fast analyze when rowCount is close to sampleSize, unexpected bahavior might happen.
rowCount = mathutil.MaxUint64(rowCount, uint64(len(sample)))
estimateNDV, scaleRatio := calculateEstimateNDV(helper, rowCount)
c := buildCMSWithTopN(helper, d, w, scaleRatio)
c.calculateDefaultVal(helper, estimateNDV, scaleRatio, rowCount)
return c
return c, estimateNDV, scaleRatio
}

func buildCMSWithTopN(helper *topNHelper, d, w int32, scaleRatio uint64) (c *CMSketch) {
Expand Down
3 changes: 2 additions & 1 deletion statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func prepareCMSWithTopN(d, w int32, vals []*types.Datum, n uint32, total uint64)
}
data = append(data, bytes)
}
return NewCMSketchWithTopN(d, w, data, n, total), nil
cms, _, _ := NewCMSketchWithTopN(d, w, data, n, total)
return cms, nil
}

func buildCMSketchAndMap(d, w int32, seed int64, total, imax uint64, s float64) (*CMSketch, map[int64]uint32, error) {
Expand Down
9 changes: 9 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ func (c *SampleCollector) collect(sc *stmtctx.StatementContext, d types.Datum) e
return nil
}

// CalcTotalSize is to calculate total size based on samples.
func (c *SampleCollector) CalcTotalSize() error {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
c.TotalSize = 0
for _, item := range c.Samples {
c.TotalSize += int64(len(item.Value.GetBytes()))
}
return nil
}

// SampleBuilder is used to build samples for columns.
// Also, if primary key is handle, it will directly build histogram for it.
type SampleBuilder struct {
Expand Down