Skip to content

Commit

Permalink
stats: extract some functions for future SchemaChangeHandler use (#56379
Browse files Browse the repository at this point in the history
)

ref #55722
  • Loading branch information
lance6716 committed Sep 30, 2024
1 parent 762a6f2 commit 5ad55c2
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 123 deletions.
1 change: 1 addition & 0 deletions pkg/statistics/handle/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
"//pkg/parser/model",
"//pkg/planner/cardinality",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/util",
"//pkg/testkit",
"//pkg/types",
"//pkg/util",
Expand Down
21 changes: 11 additions & 10 deletions pkg/statistics/handle/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,20 @@ import (
)

type ddlHandlerImpl struct {
ddlEventCh chan *notifier.SchemaChangeEvent
statsWriter types.StatsReadWriter
statsHandler types.StatsHandle
globalStatsHandler types.StatsGlobal
ddlEventCh chan *notifier.SchemaChangeEvent
statsWriter types.StatsReadWriter
statsHandler types.StatsHandle
}

// NewDDLHandler creates a new ddl handler.
func NewDDLHandler(
statsWriter types.StatsReadWriter,
statsHandler types.StatsHandle,
globalStatsHandler types.StatsGlobal,
) types.DDL {
return &ddlHandlerImpl{
ddlEventCh: make(chan *notifier.SchemaChangeEvent, 1000),
statsWriter: statsWriter,
statsHandler: statsHandler,
globalStatsHandler: globalStatsHandler,
ddlEventCh: make(chan *notifier.SchemaChangeEvent, 1000),
statsWriter: statsWriter,
statsHandler: statsHandler,
}
}

Expand Down Expand Up @@ -235,7 +232,11 @@ func updateStatsWithCountDeltaAndModifyCountDelta(
}

// Because count can not be negative, so we need to get the current and calculate the delta.
count, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCountForUpdate(sctx, tableID)
count, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCountForUpdate(
util.StatsCtx,
sctx,
tableID,
)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/statistics/handle/ddl"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -1334,7 +1335,7 @@ func TestExchangePartition(t *testing.T) {
})
}
wg.Wait()
count, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCount(tk.Session(), tbl.Meta().ID)
count, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCount(statsutil.StatsCtx, tk.Session(), tbl.Meta().ID)
require.NoError(t, err)
require.False(t, isNull)
require.Equal(t, int64(200), count)
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/ddl/drop_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (h *ddlHandlerImpl) onDropPartitions(t *notifier.SchemaChangeEvent) error {
count := int64(0)
for _, def := range droppedPartitionInfo.Definitions {
// Get the count and modify count of the partition.
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(sctx, def.ID)
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(util.StatsCtx, sctx, def.ID)
if err != nil {
return err
}
Expand All @@ -54,6 +54,7 @@ func (h *ddlHandlerImpl) onDropPartitions(t *notifier.SchemaChangeEvent) error {
// Because we drop the partition, we should subtract the count from the global stats.
delta := -count
err = storage.UpdateStatsMeta(
util.StatsCtx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/ddl/exchange_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ func getCountsAndModifyCounts(
sctx sessionctx.Context,
partitionID, tableID int64,
) (partCount, partModifyCount, tableCount, tableModifyCount int64, err error) {
partCount, partModifyCount, _, err = storage.StatsMetaCountAndModifyCount(sctx, partitionID)
partCount, partModifyCount, _, err = storage.StatsMetaCountAndModifyCount(util.StatsCtx, sctx, partitionID)
if err != nil {
return
}

tableCount, tableModifyCount, _, err = storage.StatsMetaCountAndModifyCount(sctx, tableID)
tableCount, tableModifyCount, _, err = storage.StatsMetaCountAndModifyCount(util.StatsCtx, sctx, tableID)
if err != nil {
return
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/ddl/truncate_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (h *ddlHandlerImpl) onTruncatePartitions(t *notifier.SchemaChangeEvent) err
partitionNames := make([]string, 0, len(droppedPartInfo.Definitions))
for _, def := range droppedPartInfo.Definitions {
// Get the count and modify count of the partition.
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(sctx, def.ID)
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(util.StatsCtx, sctx, def.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -83,6 +83,7 @@ func (h *ddlHandlerImpl) onTruncatePartitions(t *notifier.SchemaChangeEvent) err
// 5. The global stats should not be `count` and 0 modify count. We need to keep the modify count.
delta := -count
err = storage.UpdateStatsMeta(
util.StatsCtx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
Expand Down
6 changes: 5 additions & 1 deletion pkg/statistics/handle/globalstats/global_stats_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ func (a *AsyncMergePartitionStats2GlobalStats) prepare(sctx sessionctx.Context,
}
tableInfo := partitionTable.Meta()
a.tableInfo[partitionID] = tableInfo
realtimeCount, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCount(sctx, partitionID)
realtimeCount, modifyCount, isNull, err := storage.StatsMetaCountAndModifyCount(
util.StatsCtx,
sctx,
partitionID,
)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func NewHandle(
handle.DDL = ddl.NewDDLHandler(
handle.StatsReadWriter,
handle,
handle.StatsGlobal,
)
return handle, nil
}
Expand Down
28 changes: 21 additions & 7 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"context"
"encoding/json"
"strconv"
"time"
Expand Down Expand Up @@ -42,21 +43,34 @@ import (
)

// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta.
func StatsMetaCountAndModifyCount(sctx sessionctx.Context, tableID int64) (count, modifyCount int64, isNull bool, err error) {
return statsMetaCountAndModifyCount(sctx, tableID, false)
func StatsMetaCountAndModifyCount(
ctx context.Context,
sctx sessionctx.Context,
tableID int64,
) (count, modifyCount int64, isNull bool, err error) {
return statsMetaCountAndModifyCount(ctx, sctx, tableID, false)
}

// StatsMetaCountAndModifyCountForUpdate reads count and modify_count for the given table from mysql.stats_meta with lock.
func StatsMetaCountAndModifyCountForUpdate(sctx sessionctx.Context, tableID int64) (count, modifyCount int64, isNull bool, err error) {
return statsMetaCountAndModifyCount(sctx, tableID, true)
func StatsMetaCountAndModifyCountForUpdate(
ctx context.Context,
sctx sessionctx.Context,
tableID int64,
) (count, modifyCount int64, isNull bool, err error) {
return statsMetaCountAndModifyCount(ctx, sctx, tableID, true)
}

func statsMetaCountAndModifyCount(sctx sessionctx.Context, tableID int64, forUpdate bool) (count, modifyCount int64, isNull bool, err error) {
func statsMetaCountAndModifyCount(
ctx context.Context,
sctx sessionctx.Context,
tableID int64,
forUpdate bool,
) (count, modifyCount int64, isNull bool, err error) {
sql := "select count, modify_count from mysql.stats_meta where table_id = %?"
if forUpdate {
sql += " for update"
}
rows, _, err := util.ExecRows(sctx, sql, tableID)
rows, _, err := util.ExecRowsWithCtx(ctx, sctx, sql, tableID)
if err != nil {
return 0, 0, false, err
}
Expand Down Expand Up @@ -517,7 +531,7 @@ func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo *
}
table.Pseudo = false

realtimeCount, modidyCount, isNull, err := StatsMetaCountAndModifyCount(sctx, tableID)
realtimeCount, modidyCount, isNull, err := StatsMetaCountAndModifyCount(util.StatsCtx, sctx, tableID)
if err != nil || isNull {
return nil, err
}
Expand Down
141 changes: 141 additions & 0 deletions pkg/statistics/handle/storage/save.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package storage

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
Expand Down Expand Up @@ -409,3 +412,141 @@ func SaveMetaToStorage(
cache.TableRowStatsCache.Invalidate(tableID)
return
}

// InsertColStats2KV insert a record to stats_histograms with distinct_count 1
// and insert a bucket to stats_buckets with default value. This operation also
// updates version.
func InsertColStats2KV(
ctx context.Context,
sctx sessionctx.Context,
physicalID int64,
colInfos []*model.ColumnInfo,
) (uint64, error) {
startTS, err := util.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
}

// First of all, we update the version.
_, err = util.ExecWithCtx(
ctx, sctx,
"update mysql.stats_meta set version = %? where table_id = %?",
startTS, physicalID,
)
if err != nil {
return 0, errors.Trace(err)
}
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if sctx.GetSessionVars().StmtCtx.AffectedRows() == 0 {
return startTS, nil
}

// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs sqlexec.RecordSet
rs, err = util.ExecWithCtx(
ctx, sctx,
"select count from mysql.stats_meta where table_id = %?",
physicalID,
)
if err != nil {
return 0, errors.Trace(err)
}
defer terror.Call(rs.Close)
req := rs.NewChunk(nil)
err = rs.Next(ctx, req)
if err != nil {
return 0, errors.Trace(err)
}
count := req.GetRow(0).GetInt64(0)
for _, colInfo := range colInfos {
value := types.NewDatum(colInfo.GetOriginDefaultValue())
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx.TypeCtx(), &colInfo.FieldType)
if err != nil {
return 0, errors.Trace(err)
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
if _, err = util.ExecWithCtx(
ctx, sctx,
`insert into mysql.stats_histograms
(version, table_id, is_index, hist_id, distinct_count, null_count)
values (%?, %?, 0, %?, 0, %?)`,
startTS, physicalID, colInfo.ID, count,
); err != nil {
return 0, errors.Trace(err)
}
continue
}

// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
if _, err = util.ExecWithCtx(
ctx, sctx,
`insert into mysql.stats_histograms
(version, table_id, is_index, hist_id, distinct_count, tot_col_size)
values (%?, %?, 0, %?, 1, GREATEST(%?, 0))`,
startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count,
); err != nil {
return 0, errors.Trace(err)
}
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx.TypeCtx(), types.NewFieldType(mysql.TypeBlob))
if err != nil {
return 0, errors.Trace(err)
}
// There must be only one bucket for this new column and the value is the default value.
if _, err = util.ExecWithCtx(
ctx, sctx,
`insert into mysql.stats_buckets
(table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound)
values (%?, 0, %?, 0, %?, %?, %?, %?)`,
physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes(),
); err != nil {
return 0, errors.Trace(err)
}
}
return startTS, nil
}

// InsertTableStats2KV inserts a record standing for a new table to stats_meta
// and inserts some records standing for the new columns and indices which belong
// to this table.
func InsertTableStats2KV(
ctx context.Context,
sctx sessionctx.Context,
info *model.TableInfo,
physicalID int64,
) (uint64, error) {
startTS, err := util.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
}
if _, err = util.ExecWithCtx(
ctx, sctx,
"insert into mysql.stats_meta (version, table_id) values(%?, %?)",
startTS, physicalID,
); err != nil {
return 0, errors.Trace(err)
}
for _, col := range info.Columns {
if _, err = util.ExecWithCtx(
ctx, sctx,
`insert into mysql.stats_histograms
(table_id, is_index, hist_id, distinct_count, version)
values (%?, 0, %?, 0, %?)`,
physicalID, col.ID, startTS,
); err != nil {
return 0, errors.Trace(err)
}
}
for _, idx := range info.Indices {
if _, err = util.ExecWithCtx(
ctx, sctx,
`insert into mysql.stats_histograms
(table_id, is_index, hist_id, distinct_count, version)
values(%?, 1, %?, 0, %?)`,
physicalID, idx.ID, startTS,
); err != nil {
return 0, errors.Trace(err)
}
}
return startTS, nil
}
Loading

0 comments on commit 5ad55c2

Please sign in to comment.