Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support building stats for fast analyze. #10258

Merged
merged 16 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ type AnalyzeFastExec struct {
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
tblInfo *model.TableInfo
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
Expand Down Expand Up @@ -897,8 +898,33 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
}

func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) {
// TODO: build histogram and cmsketch here for one collector.
return nil, nil
// build collector properties.
collector.UpdateTotalSize()
collector.Samples = collector.Samples[:e.sampCursor]
collector.Count = int64(e.sampCursor)
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
data := make([][]byte, 0, len(collector.Samples))
for _, sample := range collector.Samples {
bytes, err := sample.Value.ToString()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
data = append(data, []byte(bytes))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if sample.Value.IsNull() {
collector.NullCount++
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
}
}
rowCount := domain.GetDomain(e.ctx).StatsHandle().GetTableStats(e.tblInfo).Count
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if int64(e.rowCount) < rowCount {
rowCount = int64(e.rowCount)
}
// build CMSketch
collector.CMSketch = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, uint32(e.sampCursor), uint64(rowCount))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
// build Histogram
hist, err := statistics.BuildColumnWithSamples(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount)
if err != nil {
return nil, errors.Trace(err)
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
return hist, nil
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) {
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,7 @@ func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercor
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
Expand Down Expand Up @@ -1464,6 +1465,7 @@ func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore
physicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,20 @@ type analyzeInfo struct {
PartitionName string
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
}

// AnalyzeColumnsTask is used for analyze columns.
type AnalyzeColumnsTask struct {
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
TblInfo *model.TableInfo
analyzeInfo
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
IndexInfo *model.IndexInfo
TblInfo *model.TableInfo
analyzeInfo
}

Expand Down
6 changes: 4 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
IndexInfo: idx,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -822,6 +823,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
PKInfo: pkInfo,
ColsInfo: colInfo,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -843,7 +845,7 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
}
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
return p, nil
Expand All @@ -860,7 +862,7 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) (Plan, erro
if idx.State == model.StatePublic {
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
}
Expand Down
32 changes: 26 additions & 6 deletions statistics/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package statistics

import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -93,23 +95,20 @@ func (b *SortedBuilder) Iterate(data types.Datum) error {
return nil
}

// BuildColumn builds histogram from samples for column.
func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) {
count := collector.Count
ndv := collector.FMSketch.NDV()
func buildColumnHist(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType, count int64, ndv int64, nullCount int64) (*Histogram, error) {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if ndv > count {
ndv = count
}
if count == 0 || len(collector.Samples) == 0 {
return NewHistogram(id, ndv, collector.NullCount, 0, tp, 0, collector.TotalSize), nil
return NewHistogram(id, ndv, nullCount, 0, tp, 0, collector.TotalSize), nil
}
sc := ctx.GetSessionVars().StmtCtx
samples := collector.Samples
err := SortSampleItems(sc, samples)
if err != nil {
return nil, err
}
hg := NewHistogram(id, ndv, collector.NullCount, 0, tp, int(numBuckets), collector.TotalSize)
hg := NewHistogram(id, ndv, nullCount, 0, tp, int(numBuckets), collector.TotalSize)

sampleNum := int64(len(samples))
// As we use samples to build the histogram, the bucket number and repeat should multiply a factor.
Expand Down Expand Up @@ -174,3 +173,24 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample
hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum)
return hg, nil
}

// BuildColumn builds histogram from samples for column.
func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType) (*Histogram, error) {
return buildColumnHist(ctx, numBuckets, id, collector, tp, collector.Count, collector.FMSketch.NDV(), collector.NullCount)
}

// BuildColumnWithSamples builds histogram from samples for column.
// It was used in that collector.Count is not the entire count but the sample count.
func BuildColumnWithSamples(ctx sessionctx.Context, numBuckets, id int64, collector *SampleCollector, tp *types.FieldType, count int64) (*Histogram, error) {
samplesBytes := make([][]byte, 0, len(collector.Samples))
for _, sample := range collector.Samples {
str, err := sample.Value.ToString()
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
samplesBytes = append(samplesBytes, []byte(str))
}
ndv, _ := calculateEstimateNDV(newTopNHelper(samplesBytes, 0), uint64(count))
nullCount := collector.NullCount * int64(math.Round(float64(count)/float64(collector.Count)))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
return buildColumnHist(ctx, numBuckets, id, collector, tp, count, int64(ndv), nullCount)
}
8 changes: 8 additions & 0 deletions statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ func (c *SampleCollector) collect(sc *stmtctx.StatementContext, d types.Datum) e
return nil
}

// UpdateTotalSize is to calculate total size based on samples.
func (c *SampleCollector) UpdateTotalSize() {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
c.TotalSize = 0
for _, item := range c.Samples {
c.TotalSize += int64(len(item.Value.GetBytes()))
}
}

// SampleBuilder is used to build samples for columns.
// Also, if primary key is handle, it will directly build histogram for it.
type SampleBuilder struct {
Expand Down