Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: handle ddl event for partition table #7903

Merged
merged 2 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions statistics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,43 @@ import (
func (h *Handle) HandleDDLEvent(t *util.Event) error {
switch t.Tp {
case model.ActionCreateTable, model.ActionTruncateTable:
return h.insertTableStats2KV(t.TableInfo)
ids := getPhysicalIDs(t.TableInfo)
for _, id := range ids {
if err := h.insertTableStats2KV(t.TableInfo, id); err != nil {
return err
}
}
case model.ActionAddColumn:
return h.insertColStats2KV(t.TableInfo.ID, t.ColumnInfo)
ids := getPhysicalIDs(t.TableInfo)
for _, id := range ids {
if err := h.insertColStats2KV(id, t.ColumnInfo); err != nil {
return err
}
}
}
return nil
}

func getPhysicalIDs(tblInfo *model.TableInfo) []int64 {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return []int64{tblInfo.ID}
}
ids := make([]int64, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
ids = append(ids, def.ID)
}
return ids
}

// DDLEventCh returns ddl events channel in handle.
func (h *Handle) DDLEventCh() chan *util.Event {
return h.ddlEventCh
}

// insertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the
// new columns and indices which belong to this table.
func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) {
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
Expand All @@ -56,18 +78,18 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) {
defer func() {
err = finishTransaction(context.Background(), exec, err)
}()
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.mu.ctx.Txn().StartTS(), info.ID))
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.mu.ctx.Txn().StartTS(), physicalID))
if err != nil {
return
}
for _, col := range info.Columns {
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", info.ID, col.ID, h.mu.ctx.Txn().StartTS()))
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, h.mu.ctx.Txn().StartTS()))
if err != nil {
return
}
}
for _, idx := range info.Indices {
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", info.ID, idx.ID, h.mu.ctx.Txn().StartTS()))
_, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, h.mu.ctx.Txn().StartTS()))
if err != nil {
return
}
Expand All @@ -77,7 +99,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo) (err error) {

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (err error) {
func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
Expand All @@ -89,7 +111,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
err = finishTransaction(context.Background(), exec, err)
}()
// First of all, we update the version.
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), tableID))
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.mu.ctx.Txn().StartTS(), physicalID))
if err != nil {
return
}
Expand All @@ -98,7 +120,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs []ast.RecordSet
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", tableID))
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID))
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
Expand All @@ -118,13 +140,13 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.mu.ctx.Txn().StartTS(), tableID, colInfo.ID, count))
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.mu.ctx.Txn().StartTS(), physicalID, colInfo.ID, count))
if err != nil {
return
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.mu.ctx.Txn().StartTS(), tableID, colInfo.ID, int64(len(value.GetBytes()))*count))
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.mu.ctx.Txn().StartTS(), physicalID, colInfo.ID, int64(len(value.GetBytes()))*count))
if err != nil {
return
}
Expand All @@ -133,7 +155,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) (er
return
}
// There must be only one bucket for this new column and the value is the default value.
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", tableID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
_, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
if err != nil {
return
}
Expand Down
47 changes: 47 additions & 0 deletions statistics/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,50 @@ func (s *testStatsCacheSuite) TestDDLHistogram(c *C) {
rs = testKit.MustQuery("select count(*) from mysql.stats_buckets where table_id = ? and hist_id = 1 and is_index = 1", tableInfo.ID)
rs.Check(testkit.Rows("2"))
}

func (s *testStatsCacheSuite) TestDDLPartition(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("set @@session.tidb_enable_table_partition=1")
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
createTable := `CREATE TABLE t (a int, b int, primary key(a), index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11),
PARTITION p2 VALUES LESS THAN (16),
PARTITION p3 VALUES LESS THAN (21)
)`
testKit.MustExec(createTable)
do := s.do
is := do.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := tbl.Meta()
h := do.StatsHandle()
err = h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(err, IsNil)
h.Update(is)
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
c.Assert(statsTbl.Pseudo, IsFalse)
}

testKit.MustExec("insert into t values (1,2),(6,2),(11,2),(16,2)")
testKit.MustExec("analyze table t")
testKit.MustExec("alter table t add column c varchar(15) DEFAULT '123'")
err = h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(err, IsNil)
is = do.InfoSchema()
h.Update(is)
tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo = tbl.Meta()
pi = tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
c.Assert(statsTbl.Pseudo, IsFalse)
c.Check(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSize(statsTbl.Count), Equals, 3.0)
}
}