Skip to content

Commit

Permalink
executor: support building stats for fast analyze. (#10258)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 authored and zz-jason committed Apr 28, 2019
1 parent 45c0e51 commit 5fa16a8
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 30 deletions.
71 changes: 56 additions & 15 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -160,8 +161,8 @@ var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- analyzeResult, isCloseChanThread bool) {
defer func() {
e.wg.Done()
e.wg.Wait()
if isCloseChanThread {
e.wg.Wait()
close(resultCh)
}
if r := recover(); r != nil {
Expand Down Expand Up @@ -190,6 +191,8 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
task.job.Start()
resultCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
task.fastExec.job = task.job
task.job.Start()
for _, result := range analyzeFastExec(task.fastExec) {
resultCh <- result
}
Expand Down Expand Up @@ -507,7 +510,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
hists, cms, err := exec.buildStats()
if err != nil {
return []analyzeResult{{Err: err}}
return []analyzeResult{{Err: err, job: exec.job}}
}
var results []analyzeResult
hasPKInfo := 0
Expand All @@ -522,6 +525,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Cms: []*statistics.CMSketch{cms[i]},
IsIndex: 1,
Count: hists[i].NullCount,
job: exec.job,
}
if hists[i].Len() > 0 {
idxResult.Count += hists[i].Buckets[hists[i].Len()-1].Count
Expand All @@ -535,6 +539,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Hist: hists[:hasPKInfo+len(exec.colsInfo)],
Cms: cms[:hasPKInfo+len(exec.colsInfo)],
Count: hist.NullCount,
job: exec.job,
}
if hist.Len() > 0 {
colResult.Count += hist.Buckets[hist.Len()-1].Count
Expand All @@ -560,6 +565,7 @@ type AnalyzeFastExec struct {
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
tblInfo *model.TableInfo
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
Expand All @@ -569,6 +575,7 @@ type AnalyzeFastExec struct {
scanTasks []*tikv.KeyLocation
collectors []*statistics.SampleCollector
randSeed int64
job *statistics.AnalyzeJob
}

func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild *bool, err *error, sampTasks *[]*AnalyzeFastTask) {
Expand Down Expand Up @@ -627,12 +634,12 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
}
}

// buildSampTask return tow variable, the first bool is whether the task meeting region error
// buildSampTask returns tow variables, the first bool is whether the task meets region error
// and need to rebuild.
func (e *AnalyzeFastExec) buildSampTask() (needRebuild bool, err error) {
// Do get regions row count.
bo := tikv.NewBackoffer(context.Background(), 500)
atomic.StoreUint64(&e.rowCount, 0)
e.rowCount = 0
needRebuildForRoutine := make([]bool, e.concurrency)
errs := make([]error, e.concurrency)
sampTasksForRoutine := make([][]*AnalyzeFastTask, e.concurrency)
Expand Down Expand Up @@ -721,6 +728,11 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if err != nil {
return err
}
var rowID int64
rowID, err = tablecodec.DecodeRowKey(sKey)
if err != nil {
return err
}
// Update the primary key collector.
if hasPKInfo > 0 {
v, ok := values[e.pkInfo.ID]
Expand All @@ -735,7 +747,7 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if e.collectors[0].Samples[samplePos] == nil {
e.collectors[0].Samples[samplePos] = &statistics.SampleItem{}
}
e.collectors[0].Samples[samplePos].Ordinal = int(samplePos)
e.collectors[0].Samples[samplePos].RowID = rowID
e.collectors[0].Samples[samplePos].Value = v
}
// Update the columns' collectors.
Expand All @@ -747,7 +759,7 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if e.collectors[hasPKInfo+j].Samples[samplePos] == nil {
e.collectors[hasPKInfo+j].Samples[samplePos] = &statistics.SampleItem{}
}
e.collectors[hasPKInfo+j].Samples[samplePos].Ordinal = int(samplePos)
e.collectors[hasPKInfo+j].Samples[samplePos].RowID = rowID
e.collectors[hasPKInfo+j].Samples[samplePos].Value = v
}
// Update the indexes' collectors.
Expand All @@ -773,7 +785,7 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos] == nil {
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos] = &statistics.SampleItem{}
}
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos].Ordinal = int(samplePos)
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos].RowID = rowID
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos].Value = types.NewBytesDatum(bytes)
}
return nil
Expand Down Expand Up @@ -844,9 +856,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error {
}

func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *error) {
defer func() {
e.wg.Done()
}()
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
Expand All @@ -869,9 +879,6 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
if *err != nil {
return
}
if maxRowID <= minRowID {
continue
}

keys := make([]kv.Key, 0, task.SampSize)
for i := 0; i < int(task.SampSize); i++ {
Expand All @@ -897,8 +904,37 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
}

func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) {
// TODO: build histogram and cmsketch here for one collector.
return nil, nil
// build collector properties.
collector.Samples = collector.Samples[:e.sampCursor]
sort.Slice(collector.Samples, func(i, j int) bool { return collector.Samples[i].RowID < collector.Samples[j].RowID })
collector.CalcTotalSize()
data := make([][]byte, 0, len(collector.Samples))
for i, sample := range collector.Samples {
sample.Ordinal = i
if sample.Value.IsNull() {
collector.NullCount++
continue
}
bytes, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, sample.Value)
if err != nil {
return nil, err
}
data = append(data, bytes)
}
stats := domain.GetDomain(e.ctx).StatsHandle()
rowCount := int64(e.rowCount)
if stats.Lease > 0 {
rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount)
}
// build CMSketch
var ndv, scaleRatio uint64
collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount))
// build Histogram
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
if err != nil {
return nil, err
}
return hist, nil
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) {
Expand Down Expand Up @@ -974,6 +1010,8 @@ func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*st
return nil, nil, errors.Errorf(errMsg, maxBuildTimes)
}

defer e.job.Update(int64(e.rowCount))

// If total row count of the table is smaller than 2*MaxSampleSize, we
// translate all the sample tasks to scan tasks.
if e.rowCount < uint64(MaxSampleSize)*2 {
Expand Down Expand Up @@ -1009,6 +1047,7 @@ type AnalyzeTestFastExec struct {
IdxsInfo []*model.IndexInfo
Concurrency int
Collectors []*statistics.SampleCollector
TblInfo *model.TableInfo
}

// TestFastSample only test the fast sample in unit test.
Expand All @@ -1020,6 +1059,8 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {
e.concurrency = e.Concurrency
e.physicalTableID = e.PhysicalTableID
e.wg = &sync.WaitGroup{}
e.job = &statistics.AnalyzeJob{}
e.tblInfo = e.TblInfo
_, _, err := e.buildStats()
e.Collectors = e.collectors
return err
Expand Down
85 changes: 84 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
IdxsInfo: indicesInfo,
Concurrency: 1,
PhysicalTableID: tbl.(table.PhysicalTable).GetPhysicalID(),
TblInfo: tblInfo,
}
err = mockExec.TestFastSample()
c.Assert(err, IsNil)
Expand All @@ -197,5 +198,87 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
vals[i] = append(vals[i], s)
}
}
c.Assert(fmt.Sprintln(vals), Equals, "[[0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54] [0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54]]\n")
c.Assert(fmt.Sprintln(vals), Equals, "[[0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58] [0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58]]\n")
}

func (s *testSuite1) TestFastAnalyze(c *C) {
cluster := mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(cluster)
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(cluster),
)
c.Assert(err, IsNil)
var dom *domain.Domain
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 1000
executor.RandSeed = 123

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
for i := 0; i < 3000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID

// construct 5 regions split by {600, 1200, 1800, 2400}
splitKeys := generateTableSplitKeyForInt(tid, []int{600, 1200, 1800, 2400})
manipulateCluster(cluster, splitKeys)

tk.MustExec("analyze table t with 5 buckets")

is := executor.GetInfoSchema(tk.Se.(sessionctx.Context))
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
tbl := dom.StatsHandle().GetTableStats(tableInfo)
sTbl := fmt.Sprintln(tbl)
matched := false
if sTbl == "Table:37 Count:3000\n"+
"column:1 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:2 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" ||
sTbl == "Table:37 Count:3000\n"+
"column:2 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:1 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" {
matched = true
}
c.Assert(matched, Equals, true)
}
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,9 +1435,11 @@ func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercor
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "fast analyze columns"},
})
}
}
Expand All @@ -1464,9 +1466,11 @@ func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore
physicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: "fast analyze index " + task.IndexInfo.Name.O},
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,20 @@ type analyzeInfo struct {
PartitionName string
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
}

// AnalyzeColumnsTask is used for analyze columns.
type AnalyzeColumnsTask struct {
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
TblInfo *model.TableInfo
analyzeInfo
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
IndexInfo *model.IndexInfo
TblInfo *model.TableInfo
analyzeInfo
}

Expand Down
6 changes: 4 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
IndexInfo: idx,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -822,6 +823,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
PKInfo: pkInfo,
ColsInfo: colInfo,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -843,7 +845,7 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
}
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
return p, nil
Expand All @@ -860,7 +862,7 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) (Plan, erro
if idx.State == model.StatePublic {
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
}
Expand Down
Loading

0 comments on commit 5fa16a8

Please sign in to comment.