From 8e33fcd86b3ed2e9104f3cfc5427ed8849e0f2ce Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 30 Sep 2024 19:08:07 +0800 Subject: [PATCH] stats: implement stats handler for DDL notifier part 1 (#56419) ref pingcap/tidb#55722 --- pkg/ddl/notifier/subscribe.go | 4 + pkg/statistics/handle/ddl/BUILD.bazel | 2 + pkg/statistics/handle/ddl/subscriber.go | 221 ++++++++++++++++++ .../handle/history/history_stats.go | 36 ++- 4 files changed, 256 insertions(+), 7 deletions(-) create mode 100644 pkg/statistics/handle/ddl/subscriber.go diff --git a/pkg/ddl/notifier/subscribe.go b/pkg/ddl/notifier/subscribe.go index 6bdf50c7d5405..149286eb4ea72 100644 --- a/pkg/ddl/notifier/subscribe.go +++ b/pkg/ddl/notifier/subscribe.go @@ -61,6 +61,8 @@ type HandlerID int const ( // TestHandlerID is used for testing only. TestHandlerID HandlerID = 0 + // StatsMetaHandlerID is used to update statistics system table. + StatsMetaHandlerID HandlerID = 1 ) // String implements fmt.Stringer interface. @@ -68,6 +70,8 @@ func (id HandlerID) String() string { switch id { case TestHandlerID: return "TestHandler" + case StatsMetaHandlerID: + return "StatsMetaHandler" default: return fmt.Sprintf("HandlerID(%d)", id) } diff --git a/pkg/statistics/handle/ddl/BUILD.bazel b/pkg/statistics/handle/ddl/BUILD.bazel index e685889c181e8..9fd64c0c3a183 100644 --- a/pkg/statistics/handle/ddl/BUILD.bazel +++ b/pkg/statistics/handle/ddl/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "drop_partition.go", "exchange_partition.go", "reorganize_partition.go", + "subscriber.go", "truncate_partition.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/ddl", @@ -17,6 +18,7 @@ go_library( "//pkg/meta/model", "//pkg/sessionctx", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/history", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go new file mode 100644 index 0000000000000..d4e5d71822a0a --- /dev/null +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -0,0 +1,221 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/ddl/notifier" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/history" + "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util/intest" + "go.uber.org/zap" +) + +type handler struct { + statsCache types.StatsCache +} + +// NewHandlerAndRegister creates a new handler and registers it to the DDL +// notifier. +func NewHandlerAndRegister(statsCache types.StatsCache) { + h := handler{statsCache: statsCache} + notifier.RegisterHandler(notifier.StatsMetaHandlerID, h.handle) +} + +func (h handler) handle( + ctx context.Context, + sctx sessionctx.Context, + change *notifier.SchemaChangeEvent, +) error { + switch change.GetType() { + case model.ActionCreateTable: + info := change.GetCreateTableInfo() + ids, err := getPhysicalIDs(sctx, info) + if err != nil { + return err + } + for _, id := range ids { + err = h.insertStats4PhysicalID(ctx, sctx, info, id) + if err != nil { + return errors.Trace(err) + } + } + case model.ActionTruncateTable: + newTableInfo, droppedTableInfo := change.GetTruncateTableInfo() + ids, err := getPhysicalIDs(sctx, newTableInfo) + if err != nil { + return err + } + for _, id := range ids { + err = h.insertStats4PhysicalID(ctx, sctx, newTableInfo, id) + if err != nil { + return errors.Trace(err) + } + } + + // Remove the old table stats. + droppedIDs, err2 := getPhysicalIDs(sctx, droppedTableInfo) + if err2 != nil { + return err2 + } + for _, id := range droppedIDs { + err2 = h.delayedDeleteStats4PhysicalID(ctx, sctx, id) + if err2 != nil { + return errors.Trace(err2) + } + } + case model.ActionDropTable: + droppedTableInfo := change.GetDropTableInfo() + ids, err := getPhysicalIDs(sctx, droppedTableInfo) + if err != nil { + return err + } + for _, id := range ids { + err = h.delayedDeleteStats4PhysicalID(ctx, sctx, id) + if err != nil { + return errors.Trace(err) + } + } + case model.ActionAddColumn: + // TODO: implement me + case model.ActionModifyColumn: + // TODO: implement me + case model.ActionAddTablePartition: + // TODO: implement me + case model.ActionTruncateTablePartition: + // TODO: implement me + case model.ActionDropTablePartition: + // TODO: implement me + case model.ActionExchangeTablePartition: + // TODO: implement me + case model.ActionReorganizePartition: + // TODO: implement me + case model.ActionAlterTablePartitioning: + // TODO: implement me + case model.ActionRemovePartitioning: + // TODO: implement me + case model.ActionFlashbackCluster: + // TODO: implement me + default: + intest.Assert(false) + logutil.StatsLogger().Error("Unhandled schema change event", + zap.Stringer("type", change)) + } + return nil +} + +func (h handler) insertStats4PhysicalID( + ctx context.Context, + sctx sessionctx.Context, + info *model.TableInfo, + id int64, +) error { + startTS, err := storage.InsertTableStats2KV(ctx, sctx, info, id) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) +} + +func (h handler) recordHistoricalStatsMeta( + ctx context.Context, + sctx sessionctx.Context, + id int64, + startTS uint64, +) error { + if startTS == 0 { + return nil + } + enableHistoricalStats, err2 := getEnableHistoricalStats(sctx) + if err2 != nil { + return err2 + } + if !enableHistoricalStats { + return nil + } + + tbl, ok := h.statsCache.Get(id) + if !ok || !tbl.IsInitialized() { + return nil + } + + return history.RecordHistoricalStatsMeta( + ctx, + sctx, + id, + startTS, + util.StatsMetaHistorySourceSchemaChange, + ) +} + +func (h handler) delayedDeleteStats4PhysicalID( + ctx context.Context, + sctx sessionctx.Context, + id int64, +) error { + startTS, err2 := storage.UpdateStatsMetaVersionForGC(ctx, sctx, id) + if err2 != nil { + return errors.Trace(err2) + } + return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) +} + +func getPhysicalIDs( + sctx sessionctx.Context, + tblInfo *model.TableInfo, +) (ids []int64, err error) { + pi := tblInfo.GetPartitionInfo() + if pi == nil { + return []int64{tblInfo.ID}, nil + } + ids = make([]int64, 0, len(pi.Definitions)+1) + for _, def := range pi.Definitions { + ids = append(ids, def.ID) + } + pruneMode, err := getCurrentPruneMode(sctx) + if err != nil { + return nil, err + } + if pruneMode == variable.Dynamic { + ids = append(ids, tblInfo.ID) + } + return ids, nil +} + +func getCurrentPruneMode( + sctx sessionctx.Context, +) (variable.PartitionPruneMode, error) { + pruneMode, err := sctx.GetSessionVars(). + GlobalVarsAccessor. + GetGlobalSysVar(variable.TiDBPartitionPruneMode) + return variable.PartitionPruneMode(pruneMode), errors.Trace(err) +} + +func getEnableHistoricalStats( + sctx sessionctx.Context, +) (bool, error) { + val, err := sctx.GetSessionVars(). + GlobalVarsAccessor. + GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + return variable.TiDBOptOn(val), errors.Trace(err) +} diff --git a/pkg/statistics/handle/history/history_stats.go b/pkg/statistics/handle/history/history_stats.go index 54e109b78b2ea..1382d6ddbde4f 100644 --- a/pkg/statistics/handle/history/history_stats.go +++ b/pkg/statistics/handle/history/history_stats.go @@ -15,6 +15,7 @@ package history import ( + "context" "time" "github.com/pingcap/errors" @@ -80,7 +81,10 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin } } err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { - return RecordHistoricalStatsMeta(sctx, tableID, version, source) + if !sctx.GetSessionVars().EnableHistoricalStats { + return nil + } + return RecordHistoricalStatsMeta(util.StatsCtx, sctx, tableID, version, source) }, util.FlagWrapTxn) if err != nil { // just log the error, hide the error from the outside caller. logutil.BgLogger().Error("record historical stats meta failed", @@ -101,14 +105,23 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error } // RecordHistoricalStatsMeta records the historical stats meta. -func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version uint64, source string) error { +func RecordHistoricalStatsMeta( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, + version uint64, + source string, +) error { if tableID == 0 || version == 0 { return errors.Errorf("tableID %d, version %d are invalid", tableID, version) } - if !sctx.GetSessionVars().EnableHistoricalStats { - return nil - } - rows, _, err := util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) + rows, _, err := util.ExecRowsWithCtx( + ctx, + sctx, + "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", + tableID, + version, + ) if err != nil { return errors.Trace(err) } @@ -118,7 +131,16 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())" - if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil { + if _, err := util.ExecWithCtx( + ctx, + sctx, + sql, + tableID, + modifyCount, + count, + version, + source, + ); err != nil { return errors.Trace(err) } cache.TableRowStatsCache.Invalidate(tableID)