From d260a11e0704735eb444d526c32390b229aaadfb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 30 Sep 2024 14:12:38 +0800 Subject: [PATCH 1/4] stats: implement stats handler for DDL notifier part 1 Signed-off-by: lance6716 --- pkg/ddl/notifier/subscribe.go | 4 + pkg/statistics/handle/ddl/subscriber.go | 221 ++++++++++++++++++ .../handle/history/history_stats.go | 36 ++- 3 files changed, 254 insertions(+), 7 deletions(-) create mode 100644 pkg/statistics/handle/ddl/subscriber.go diff --git a/pkg/ddl/notifier/subscribe.go b/pkg/ddl/notifier/subscribe.go index 6bdf50c7d5405..cb29f64d1282e 100644 --- a/pkg/ddl/notifier/subscribe.go +++ b/pkg/ddl/notifier/subscribe.go @@ -61,6 +61,8 @@ type HandlerID int const ( // TestHandlerID is used for testing only. TestHandlerID HandlerID = 0 + // StatsHandlerID is used to update statistics system table. + StatsHandlerID HandlerID = 1 ) // String implements fmt.Stringer interface. @@ -68,6 +70,8 @@ func (id HandlerID) String() string { switch id { case TestHandlerID: return "TestHandler" + case StatsHandlerID: + return "StatsHandler" default: return fmt.Sprintf("HandlerID(%d)", id) } diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go new file mode 100644 index 0000000000000..8b841584b7a04 --- /dev/null +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -0,0 +1,221 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/ddl/notifier" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/history" + "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util/intest" + "go.uber.org/zap" +) + +type handler struct { + statsCache types.StatsCache +} + +// NewHandlerAndRegister creates a new handler and registers it to the DDL +// notifier. +func NewHandlerAndRegister(statsCache types.StatsCache) { + h := handler{statsCache: statsCache} + notifier.RegisterHandler(notifier.StatsHandlerID, h.handle) +} + +func (h handler) handle( + ctx context.Context, + sctx sessionctx.Context, + change *notifier.SchemaChangeEvent, +) error { + switch change.GetType() { + case model.ActionCreateTable: + info := change.GetCreateTableInfo() + ids, err := h.getPhysicalIDs(sctx, info) + if err != nil { + return err + } + for _, id := range ids { + err = h.insertStats4PhysicalID(ctx, sctx, info, id) + if err != nil { + return errors.Trace(err) + } + } + case model.ActionTruncateTable: + newTableInfo, droppedTableInfo := change.GetTruncateTableInfo() + ids, err := h.getPhysicalIDs(sctx, newTableInfo) + if err != nil { + return err + } + for _, id := range ids { + err = h.insertStats4PhysicalID(ctx, sctx, newTableInfo, id) + if err != nil { + return errors.Trace(err) + } + } + + // Remove the old table stats. + droppedIDs, err2 := h.getPhysicalIDs(sctx, droppedTableInfo) + if err2 != nil { + return err2 + } + for _, id := range droppedIDs { + err2 = h.delayedDeleteStats4PhysicalID(ctx, sctx, id) + if err2 != nil { + return errors.Trace(err2) + } + } + case model.ActionDropTable: + droppedTableInfo := change.GetDropTableInfo() + ids, err := h.getPhysicalIDs(sctx, droppedTableInfo) + if err != nil { + return err + } + for _, id := range ids { + err = h.delayedDeleteStats4PhysicalID(ctx, sctx, id) + if err != nil { + return errors.Trace(err) + } + } + case model.ActionAddColumn: + // TODO: implement me + case model.ActionModifyColumn: + // TODO: implement me + case model.ActionAddTablePartition: + // TODO: implement me + case model.ActionTruncateTablePartition: + // TODO: implement me + case model.ActionDropTablePartition: + // TODO: implement me + case model.ActionExchangeTablePartition: + // TODO: implement me + case model.ActionReorganizePartition: + // TODO: implement me + case model.ActionAlterTablePartitioning: + // TODO: implement me + case model.ActionRemovePartitioning: + // TODO: implement me + case model.ActionFlashbackCluster: + // TODO: implement me + default: + intest.Assert(false) + logutil.StatsLogger().Error("Unhandled schema change event", + zap.Stringer("type", change)) + } + return nil +} + +func (h handler) insertStats4PhysicalID( + ctx context.Context, + sctx sessionctx.Context, + info *model.TableInfo, + id int64, +) error { + startTS, err := storage.InsertTableStats2KV(ctx, sctx, info, id) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) +} + +func (h handler) recordHistoricalStatsMeta( + ctx context.Context, + sctx sessionctx.Context, + id int64, + startTS uint64, +) error { + if startTS == 0 { + return nil + } + enableHistoricalStats, err2 := h.getEnableHistoricalStats(sctx) + if err2 != nil { + return err2 + } + if !enableHistoricalStats { + return nil + } + + tbl, ok := h.statsCache.Get(id) + if !ok || !tbl.IsInitialized() { + return nil + } + + return history.RecordHistoricalStatsMeta( + ctx, + sctx, + id, + startTS, + util.StatsMetaHistorySourceSchemaChange, + ) +} + +func (h handler) delayedDeleteStats4PhysicalID( + ctx context.Context, + sctx sessionctx.Context, + id int64, +) error { + startTS, err2 := storage.UpdateStatsMetaVersionForGC(ctx, sctx, id) + if err2 != nil { + return errors.Trace(err2) + } + return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) +} + +func (h handler) getPhysicalIDs( + sctx sessionctx.Context, + tblInfo *model.TableInfo, +) (ids []int64, err error) { + pi := tblInfo.GetPartitionInfo() + if pi == nil { + return []int64{tblInfo.ID}, nil + } + ids = make([]int64, 0, len(pi.Definitions)+1) + for _, def := range pi.Definitions { + ids = append(ids, def.ID) + } + pruneMode, err := h.getCurrentPruneMode(sctx) + if err != nil { + return nil, err + } + if pruneMode == variable.Dynamic { + ids = append(ids, tblInfo.ID) + } + return ids, nil +} + +func (h handler) getCurrentPruneMode( + sctx sessionctx.Context, +) (variable.PartitionPruneMode, error) { + pruneMode, err := sctx.GetSessionVars(). + GlobalVarsAccessor. + GetGlobalSysVar(variable.TiDBPartitionPruneMode) + return variable.PartitionPruneMode(pruneMode), errors.Trace(err) +} + +func (h handler) getEnableHistoricalStats( + sctx sessionctx.Context, +) (bool, error) { + val, err := sctx.GetSessionVars(). + GlobalVarsAccessor. + GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + return variable.TiDBOptOn(val), errors.Trace(err) +} diff --git a/pkg/statistics/handle/history/history_stats.go b/pkg/statistics/handle/history/history_stats.go index 54e109b78b2ea..1382d6ddbde4f 100644 --- a/pkg/statistics/handle/history/history_stats.go +++ b/pkg/statistics/handle/history/history_stats.go @@ -15,6 +15,7 @@ package history import ( + "context" "time" "github.com/pingcap/errors" @@ -80,7 +81,10 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin } } err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { - return RecordHistoricalStatsMeta(sctx, tableID, version, source) + if !sctx.GetSessionVars().EnableHistoricalStats { + return nil + } + return RecordHistoricalStatsMeta(util.StatsCtx, sctx, tableID, version, source) }, util.FlagWrapTxn) if err != nil { // just log the error, hide the error from the outside caller. logutil.BgLogger().Error("record historical stats meta failed", @@ -101,14 +105,23 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error } // RecordHistoricalStatsMeta records the historical stats meta. -func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version uint64, source string) error { +func RecordHistoricalStatsMeta( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, + version uint64, + source string, +) error { if tableID == 0 || version == 0 { return errors.Errorf("tableID %d, version %d are invalid", tableID, version) } - if !sctx.GetSessionVars().EnableHistoricalStats { - return nil - } - rows, _, err := util.ExecRows(sctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) + rows, _, err := util.ExecRowsWithCtx( + ctx, + sctx, + "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", + tableID, + version, + ) if err != nil { return errors.Trace(err) } @@ -118,7 +131,16 @@ func RecordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())" - if _, err := util.Exec(sctx, sql, tableID, modifyCount, count, version, source); err != nil { + if _, err := util.ExecWithCtx( + ctx, + sctx, + sql, + tableID, + modifyCount, + count, + version, + source, + ); err != nil { return errors.Trace(err) } cache.TableRowStatsCache.Invalidate(tableID) From 034a10a21eb4f5f9adea956ee7d0777573d18aba Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 30 Sep 2024 14:24:40 +0800 Subject: [PATCH 2/4] fix bazel Signed-off-by: lance6716 --- pkg/statistics/handle/ddl/BUILD.bazel | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/statistics/handle/ddl/BUILD.bazel b/pkg/statistics/handle/ddl/BUILD.bazel index e685889c181e8..9fd64c0c3a183 100644 --- a/pkg/statistics/handle/ddl/BUILD.bazel +++ b/pkg/statistics/handle/ddl/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "drop_partition.go", "exchange_partition.go", "reorganize_partition.go", + "subscriber.go", "truncate_partition.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/ddl", @@ -17,6 +18,7 @@ go_library( "//pkg/meta/model", "//pkg/sessionctx", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/history", "//pkg/statistics/handle/lockstats", "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", From 09fd78903aa71189f5df362cf15218b4ca010486 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 30 Sep 2024 14:35:24 +0800 Subject: [PATCH 3/4] fix lint Signed-off-by: lance6716 --- pkg/statistics/handle/ddl/subscriber.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index 8b841584b7a04..94862259bcfc2 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -50,7 +50,7 @@ func (h handler) handle( switch change.GetType() { case model.ActionCreateTable: info := change.GetCreateTableInfo() - ids, err := h.getPhysicalIDs(sctx, info) + ids, err := getPhysicalIDs(sctx, info) if err != nil { return err } @@ -62,7 +62,7 @@ func (h handler) handle( } case model.ActionTruncateTable: newTableInfo, droppedTableInfo := change.GetTruncateTableInfo() - ids, err := h.getPhysicalIDs(sctx, newTableInfo) + ids, err := getPhysicalIDs(sctx, newTableInfo) if err != nil { return err } @@ -74,7 +74,7 @@ func (h handler) handle( } // Remove the old table stats. - droppedIDs, err2 := h.getPhysicalIDs(sctx, droppedTableInfo) + droppedIDs, err2 := getPhysicalIDs(sctx, droppedTableInfo) if err2 != nil { return err2 } @@ -86,7 +86,7 @@ func (h handler) handle( } case model.ActionDropTable: droppedTableInfo := change.GetDropTableInfo() - ids, err := h.getPhysicalIDs(sctx, droppedTableInfo) + ids, err := getPhysicalIDs(sctx, droppedTableInfo) if err != nil { return err } @@ -146,7 +146,7 @@ func (h handler) recordHistoricalStatsMeta( if startTS == 0 { return nil } - enableHistoricalStats, err2 := h.getEnableHistoricalStats(sctx) + enableHistoricalStats, err2 := getEnableHistoricalStats(sctx) if err2 != nil { return err2 } @@ -180,7 +180,7 @@ func (h handler) delayedDeleteStats4PhysicalID( return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) } -func (h handler) getPhysicalIDs( +func getPhysicalIDs( sctx sessionctx.Context, tblInfo *model.TableInfo, ) (ids []int64, err error) { @@ -192,7 +192,7 @@ func (h handler) getPhysicalIDs( for _, def := range pi.Definitions { ids = append(ids, def.ID) } - pruneMode, err := h.getCurrentPruneMode(sctx) + pruneMode, err := getCurrentPruneMode(sctx) if err != nil { return nil, err } @@ -202,7 +202,7 @@ func (h handler) getPhysicalIDs( return ids, nil } -func (h handler) getCurrentPruneMode( +func getCurrentPruneMode( sctx sessionctx.Context, ) (variable.PartitionPruneMode, error) { pruneMode, err := sctx.GetSessionVars(). @@ -211,7 +211,7 @@ func (h handler) getCurrentPruneMode( return variable.PartitionPruneMode(pruneMode), errors.Trace(err) } -func (h handler) getEnableHistoricalStats( +func getEnableHistoricalStats( sctx sessionctx.Context, ) (bool, error) { val, err := sctx.GetSessionVars(). From 09ce01dec5b26137f6cf075cd0e0f90089ce1c75 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 30 Sep 2024 17:15:13 +0800 Subject: [PATCH 4/4] address comment Signed-off-by: lance6716 --- pkg/ddl/notifier/subscribe.go | 8 ++++---- pkg/statistics/handle/ddl/subscriber.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ddl/notifier/subscribe.go b/pkg/ddl/notifier/subscribe.go index cb29f64d1282e..149286eb4ea72 100644 --- a/pkg/ddl/notifier/subscribe.go +++ b/pkg/ddl/notifier/subscribe.go @@ -61,8 +61,8 @@ type HandlerID int const ( // TestHandlerID is used for testing only. TestHandlerID HandlerID = 0 - // StatsHandlerID is used to update statistics system table. - StatsHandlerID HandlerID = 1 + // StatsMetaHandlerID is used to update statistics system table. + StatsMetaHandlerID HandlerID = 1 ) // String implements fmt.Stringer interface. @@ -70,8 +70,8 @@ func (id HandlerID) String() string { switch id { case TestHandlerID: return "TestHandler" - case StatsHandlerID: - return "StatsHandler" + case StatsMetaHandlerID: + return "StatsMetaHandler" default: return fmt.Sprintf("HandlerID(%d)", id) } diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index 94862259bcfc2..d4e5d71822a0a 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -39,7 +39,7 @@ type handler struct { // notifier. func NewHandlerAndRegister(statsCache types.StatsCache) { h := handler{statsCache: statsCache} - notifier.RegisterHandler(notifier.StatsHandlerID, h.handle) + notifier.RegisterHandler(notifier.StatsMetaHandlerID, h.handle) } func (h handler) handle(