Skip to content

Commit

Permalink
statistics: update stats per table in the pipeline (#19370)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu authored Sep 15, 2020
1 parent 8ae5b1c commit 411eba6
Showing 1 changed file with 53 additions and 18 deletions.
71 changes: 53 additions & 18 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -502,28 +503,62 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) {

// HandleUpdateStats update the stats using feedback.
func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error {
sql := "select table_id, hist_id, is_index, feedback from mysql.stats_feedback order by table_id, hist_id, is_index"
rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql)
if len(rows) == 0 || err != nil {
sql := "SELECT distinct table_id from mysql.stats_feedback"
tables, _, err := h.restrictedExec.ExecRestrictedSQL(sql)
if err != nil {
return errors.Trace(err)
}

var groupedRows [][]chunk.Row
preIdx := 0
tableID, histID, isIndex := rows[0].GetInt64(0), rows[0].GetInt64(1), rows[0].GetInt64(2)
for i := 1; i < len(rows); i++ {
row := rows[i]
if row.GetInt64(0) != tableID || row.GetInt64(1) != histID || row.GetInt64(2) != isIndex {
groupedRows = append(groupedRows, rows[preIdx:i])
tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
preIdx = i
}
if len(tables) == 0 {
return nil
}
groupedRows = append(groupedRows, rows[preIdx:])

for _, rows := range groupedRows {
if err := h.handleSingleHistogramUpdate(is, rows); err != nil {
return errors.Trace(err)
for _, ptbl := range tables {
// this func lets `defer` works normally, where `Close()` should be called before any return
err = func() error {
tbl := ptbl.GetInt64(0)
sql = fmt.Sprintf("select table_id, hist_id, is_index, feedback from mysql.stats_feedback where table_id=%d order by hist_id, is_index", tbl)
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
if err != nil {
return errors.Trace(err)
}
tableID, histID, isIndex := int64(-1), int64(-1), int64(-1)
var rows []chunk.Row
for {
req := rc[0].NewChunk()
iter := chunk.NewIterator4Chunk(req)
err := rc[0].Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
if len(rows) > 0 {
if err := h.handleSingleHistogramUpdate(is, rows); err != nil {
return errors.Trace(err)
}
}
break
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
// len(rows) > 100000 limits the rows to avoid OOM
if row.GetInt64(0) != tableID || row.GetInt64(1) != histID || row.GetInt64(2) != isIndex || len(rows) > 100000 {
if len(rows) > 0 {
if err := h.handleSingleHistogramUpdate(is, rows); err != nil {
return errors.Trace(err)
}
}
tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
rows = rows[:0]
}
rows = append(rows, row)
}
}
return nil
}()
if err != nil {
return err
}
}
return nil
Expand Down

0 comments on commit 411eba6

Please sign in to comment.