From c90168e20749015b6138d52d89a2cf5d7edeca05 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 5 Sep 2024 20:04:04 +0800 Subject: [PATCH] ddl, stats: switch to new struct for create/truncate table (#55811) ref pingcap/tidb#55723 --- pkg/ddl/add_column.go | 2 +- pkg/ddl/cluster.go | 4 +- pkg/ddl/create_table.go | 29 +- pkg/ddl/ddl.go | 9 +- pkg/ddl/modify_column.go | 2 +- pkg/ddl/partition.go | 12 +- pkg/ddl/table.go | 17 +- pkg/ddl/util/BUILD.bazel | 1 + pkg/ddl/util/schema_change_notifier.go | 80 +++++- .../bootstraptest/bootstrap_upgrade_test.go | 4 +- pkg/statistics/handle/ddl/BUILD.bazel | 2 +- pkg/statistics/handle/ddl/ddl.go | 82 +++--- pkg/statistics/handle/ddl/ddl_test.go | 252 ++---------------- pkg/statistics/handle/util/ddl_event.go | 42 +-- 14 files changed, 191 insertions(+), 347 deletions(-) diff --git a/pkg/ddl/add_column.go b/pkg/ddl/add_column.go index 396c2cc04c2f9..3b6db8f386190 100644 --- a/pkg/ddl/add_column.go +++ b/pkg/ddl/add_column.go @@ -137,7 +137,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, e tblInfo, []*model.ColumnInfo{columnInfo}, ) - asyncNotifyEvent(jobCtx, addColumnEvent) + asyncNotifyEvent(jobCtx, addColumnEvent, job) default: err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State) } diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index 93e8e4849abc9..b52deef85a6d2 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -821,7 +821,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model case model.StateWriteReorganization: // TODO: Support flashback in unistore. if inFlashbackTest { - asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent()) + asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job) job.State = model.JobStateDone job.SchemaState = model.StatePublic return ver, nil @@ -844,7 +844,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model } } - asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent()) + asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job) job.State = model.JobStateDone job.SchemaState = model.StatePublic return updateSchemaVersion(jobCtx, t, job) diff --git a/pkg/ddl/create_table.go b/pkg/ddl/create_table.go index e436336e85185..af4dafa0f7198 100644 --- a/pkg/ddl/create_table.go +++ b/pkg/ddl/create_table.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/ddl/placement" + "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" @@ -181,11 +182,10 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) - createTableEvent := statsutil.NewCreateTableEvent( - job.SchemaID, - tbInfo, - ) - asyncNotifyEvent(jobCtx, createTableEvent) + createTableEvent := &statsutil.DDLEvent{ + SchemaChangeEvent: util.NewCreateTableEvent(tbInfo), + } + asyncNotifyEvent(jobCtx, createTableEvent, job) return ver, errors.Trace(err) } @@ -213,11 +213,10 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) - createTableEvent := statsutil.NewCreateTableEvent( - job.SchemaID, - tbInfo, - ) - asyncNotifyEvent(jobCtx, createTableEvent) + createTableEvent := &statsutil.DDLEvent{ + SchemaChangeEvent: util.NewCreateTableEvent(tbInfo), + } + asyncNotifyEvent(jobCtx, createTableEvent, job) return ver, nil default: return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tbInfo.State)) @@ -270,13 +269,11 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er job.State = model.JobStateDone job.SchemaState = model.StatePublic job.BinlogInfo.SetTableInfos(ver, args) - for i := range args { - createTableEvent := statsutil.NewCreateTableEvent( - job.SchemaID, - args[i], - ) - asyncNotifyEvent(jobCtx, createTableEvent) + createTableEvent := &statsutil.DDLEvent{ + SchemaChangeEvent: util.NewCreateTableEvent(args[i]), + } + asyncNotifyEvent(jobCtx, createTableEvent, job) } return ver, errors.Trace(err) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index a671504d717bf..9fb1bcdeec913 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -556,7 +556,14 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) { // asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may // give up notify and log it. -func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent) { +func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent, job *model.Job) { + // skip notify for system databases, system databases are expected to change at + // bootstrap and other nodes can also handle the changing in its bootstrap rather + // than be notified. + if tidbutil.IsMemOrSysDB(job.SchemaName) { + return + } + ch := jobCtx.oldDDLCtx.ddlEventCh if ch != nil { for i := 0; i < 10; i++ { diff --git a/pkg/ddl/modify_column.go b/pkg/ddl/modify_column.go index 2665fdd9c7846..167c1b8e95a4e 100644 --- a/pkg/ddl/modify_column.go +++ b/pkg/ddl/modify_column.go @@ -535,7 +535,7 @@ func (w *worker) doModifyColumnTypeWithData( tblInfo, []*model.ColumnInfo{changingCol}, ) - asyncNotifyEvent(jobCtx, modifyColumnEvent) + asyncNotifyEvent(jobCtx, modifyColumnEvent, job) default: err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State) } diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 32b60954682d7..32bae65319c80 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -234,7 +234,7 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, t *meta.Meta, job *mode tblInfo, partInfo, ) - asyncNotifyEvent(jobCtx, addPartitionEvent) + asyncNotifyEvent(jobCtx, addPartitionEvent, job) default: err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) } @@ -2338,7 +2338,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, t *meta.Meta, job *mod tblInfo, &model.PartitionInfo{Definitions: droppedDefs}, ) - asyncNotifyEvent(jobCtx, dropPartitionEvent) + asyncNotifyEvent(jobCtx, dropPartitionEvent, job) // A background job will be created to delete old partition data. job.Args = []any{physicalTableIDs} default: @@ -2431,7 +2431,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job &model.PartitionInfo{Definitions: newPartitions}, &model.PartitionInfo{Definitions: oldPartitions}, ) - asyncNotifyEvent(jobCtx, truncatePartitionEvent) + asyncNotifyEvent(jobCtx, truncatePartitionEvent, job) // A background job will be created to delete old partition data. job.Args = []any{oldIDs} @@ -2570,7 +2570,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job &model.PartitionInfo{Definitions: newPartitions}, &model.PartitionInfo{Definitions: oldPartitions}, ) - asyncNotifyEvent(jobCtx, truncatePartitionEvent) + asyncNotifyEvent(jobCtx, truncatePartitionEvent, job) // A background job will be created to delete old partition data. job.Args = []any{oldIDs} default: @@ -2943,7 +2943,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job &model.PartitionInfo{Definitions: []model.PartitionDefinition{originalPartitionDef}}, originalNt, ) - asyncNotifyEvent(jobCtx, exchangePartitionEvent) + asyncNotifyEvent(jobCtx, exchangePartitionEvent, job) return ver, nil } @@ -3481,7 +3481,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo if err != nil { return ver, errors.Trace(err) } - asyncNotifyEvent(jobCtx, event) + asyncNotifyEvent(jobCtx, event, job) // A background job will be created to delete old partition data. job.Args = []any{physicalTableIDs} diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index af6d984d54e2d..954877bd8361c 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/ddl/placement" sess "github.com/pingcap/tidb/pkg/ddl/session" + "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/meta" @@ -42,7 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" - tidb_util "github.com/pingcap/tidb/pkg/util" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/gcutil" "go.uber.org/zap" @@ -127,7 +128,7 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in job.SchemaID, tblInfo, ) - asyncNotifyEvent(jobCtx, dropTableEvent) + asyncNotifyEvent(jobCtx, dropTableEvent, job) } default: return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State)) @@ -569,12 +570,10 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) - truncateTableEvent := statsutil.NewTruncateTableEvent( - job.SchemaID, - tblInfo, - oldTblInfo, - ) - asyncNotifyEvent(jobCtx, truncateTableEvent) + truncateTableEvent := &statsutil.DDLEvent{ + SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo), + } + asyncNotifyEvent(jobCtx, truncateTableEvent, job) startKey := tablecodec.EncodeTablePrefix(tableID) job.Args = []any{startKey, oldPartitionIDs} return ver, nil @@ -1074,7 +1073,7 @@ func (w *worker) onSetTableFlashReplica(jobCtx *jobContext, t *meta.Meta, job *m } // Ban setting replica count for tables in system database. - if tidb_util.IsMemOrSysDB(job.SchemaName) { + if tidbutil.IsMemOrSysDB(job.SchemaName) { return ver, errors.Trace(dbterror.ErrUnsupportedTiFlashOperationForSysOrMemTable) } diff --git a/pkg/ddl/util/BUILD.bazel b/pkg/ddl/util/BUILD.bazel index 2d49cf8e28ff0..277f63044c813 100644 --- a/pkg/ddl/util/BUILD.bazel +++ b/pkg/ddl/util/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/sessionctx/variable", "//pkg/table/tables", "//pkg/util/chunk", + "//pkg/util/intest", "//pkg/util/mock", "//pkg/util/sqlexec", "@com_github_pingcap_errors//:errors", diff --git a/pkg/ddl/util/schema_change_notifier.go b/pkg/ddl/util/schema_change_notifier.go index 9509c94b870ae..752d650227fb5 100644 --- a/pkg/ddl/util/schema_change_notifier.go +++ b/pkg/ddl/util/schema_change_notifier.go @@ -14,16 +14,90 @@ package util -import "github.com/pingcap/tidb/pkg/meta/model" +import ( + "fmt" + "strings" -// SchemaChangeEvent stands for a schema change event. DDL will generate one event or multiple events (only for multi-schema change DDL). -// The caller should check the Type field of SchemaChange and call the corresponding getter function to retrieve the needed information. + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/util/intest" +) + +// SchemaChangeEvent stands for a schema change event. DDL will generate one +// event or multiple events (only for multi-schema change DDL). The caller should +// check the GetType of SchemaChange and call the corresponding getter function +// to retrieve the needed information. type SchemaChangeEvent struct { // todo: field and method will be added in the next few pr on demand + newTableInfo *model.TableInfo + oldTableInfo *model.TableInfo + tp model.ActionType } +// String implements fmt.Stringer interface. +func (s *SchemaChangeEvent) String() string { + if s == nil { + return "nil SchemaChangeEvent" + } + + var sb strings.Builder + _, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.tp) + if s.newTableInfo != nil { + _, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.newTableInfo.ID, s.newTableInfo.Name) + } + if s.oldTableInfo != nil { + _, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.oldTableInfo.ID, s.oldTableInfo.Name) + } + sb.WriteString(")") + + return sb.String() +} + // GetType returns the type of the schema change event. func (s *SchemaChangeEvent) GetType() model.ActionType { + if s == nil { + return model.ActionNone + } return s.tp } + +// NewCreateTableEvent creates a SchemaChangeEvent whose type is +// ActionCreateTable. +func NewCreateTableEvent( + newTableInfo *model.TableInfo, +) *SchemaChangeEvent { + return &SchemaChangeEvent{ + tp: model.ActionCreateTable, + newTableInfo: newTableInfo, + } +} + +// GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type +// is ActionCreateTable. +func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo { + intest.Assert(s.tp == model.ActionCreateTable) + return s.newTableInfo +} + +// NewTruncateTableEvent creates a SchemaChangeEvent whose type is +// ActionTruncateTable. +func NewTruncateTableEvent( + newTableInfo *model.TableInfo, + droppedTableInfo *model.TableInfo, +) *SchemaChangeEvent { + return &SchemaChangeEvent{ + tp: model.ActionTruncateTable, + newTableInfo: newTableInfo, + oldTableInfo: droppedTableInfo, + } +} + +// GetTruncateTableInfo returns the new and old table info of the +// SchemaChangeEvent whose type is ActionTruncateTable. +func (s *SchemaChangeEvent) GetTruncateTableInfo() ( + newTableInfo *model.TableInfo, + droppedTableInfo *model.TableInfo, +) { + intest.Assert(s.tp == model.ActionTruncateTable) + return s.newTableInfo, s.oldTableInfo +} diff --git a/pkg/session/bootstraptest/bootstrap_upgrade_test.go b/pkg/session/bootstraptest/bootstrap_upgrade_test.go index 445d8c3196379..29107c9739675 100644 --- a/pkg/session/bootstraptest/bootstrap_upgrade_test.go +++ b/pkg/session/bootstraptest/bootstrap_upgrade_test.go @@ -37,7 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" - tidb_util "github.com/pingcap/tidb/pkg/util" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" @@ -718,7 +718,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) { require.NoError(t, err) cmt := fmt.Sprintf("job: %s", runJob.String()) isPause := runJob.IsPausedBySystem() || runJob.IsPausing() - if tidb_util.IsSysDB(runJob.SchemaName) { + if tidbutil.IsSysDB(runJob.SchemaName) { require.False(t, isPause, cmt) } else { require.True(t, isPause, cmt) diff --git a/pkg/statistics/handle/ddl/BUILD.bazel b/pkg/statistics/handle/ddl/BUILD.bazel index b833cf1204fc0..4976d6676941e 100644 --- a/pkg/statistics/handle/ddl/BUILD.bazel +++ b/pkg/statistics/handle/ddl/BUILD.bazel @@ -31,7 +31,7 @@ go_test( timeout = "short", srcs = ["ddl_test.go"], flaky = True, - shard_count = 27, + shard_count = 18, deps = [ "//pkg/meta/model", "//pkg/parser/model", diff --git a/pkg/statistics/handle/ddl/ddl.go b/pkg/statistics/handle/ddl/ddl.go index 6b70f73968cb1..b84cb1bef3657 100644 --- a/pkg/statistics/handle/ddl/ddl.go +++ b/pkg/statistics/handle/ddl/ddl.go @@ -58,7 +58,7 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error { // ActionFlashbackCluster will not create any new stats info // and it's SchemaID alwayws equals to 0, so skip check it. - if t.GetType() != model.ActionFlashbackCluster { + if t.GetType() != model.ActionFlashbackCluster && t.SchemaChangeEvent == nil { if isSysDB, err := t.IsMemOrSysDB(sctx.(sessionctx.Context)); err != nil { return err } else if isSysDB { @@ -75,42 +75,13 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error { return nil } } - logutil.StatsLogger().Info("Handle ddl event", zap.Stringer("event", t)) + if t.SchemaChangeEvent == nil { + // when SchemaChangeEvent is set, it will be printed in the default branch of + // below switch. + logutil.StatsLogger().Info("Handle ddl event", zap.Stringer("event", t)) + } switch t.GetType() { - case model.ActionCreateTable: - newTableInfo := t.GetCreateTableInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { - return err - } - } - case model.ActionTruncateTable: - newTableInfo, droppedTableInfo := t.GetTruncateTableInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { - return err - } - } - - // Remove the old table stats. - droppedIDs, err := h.getTableIDs(droppedTableInfo) - if err != nil { - return err - } - for _, id := range droppedIDs { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil { - return err - } - } case model.ActionDropTable: droppedTableInfo := t.GetDropTableInfo() ids, err := h.getTableIDs(droppedTableInfo) @@ -197,13 +168,46 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error { } case model.ActionFlashbackCluster: return h.statsWriter.UpdateStatsVersion() + default: + logutil.StatsLogger().Info("Handle schema change event", zap.Stringer("event", t.SchemaChangeEvent)) } - //revive:disable:empty-block - switch t.SchemaChangeEvent.GetType() { - // todo: we will replace the DDLEvent with SchemaChangeEvent, gradually move above switch-case logical to here + e := t.SchemaChangeEvent + switch e.GetType() { + case model.ActionCreateTable: + newTableInfo := e.GetCreateTableInfo() + ids, err := h.getTableIDs(newTableInfo) + if err != nil { + return err + } + for _, id := range ids { + if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { + return err + } + } + case model.ActionTruncateTable: + newTableInfo, droppedTableInfo := e.GetTruncateTableInfo() + ids, err := h.getTableIDs(newTableInfo) + if err != nil { + return err + } + for _, id := range ids { + if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { + return err + } + } + + // Remove the old table stats. + droppedIDs, err := h.getTableIDs(droppedTableInfo) + if err != nil { + return err + } + for _, id := range droppedIDs { + if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil { + return err + } + } } - //revive:enable:empty-block return nil } diff --git a/pkg/statistics/handle/ddl/ddl_test.go b/pkg/statistics/handle/ddl/ddl_test.go index df036e1d9b499..c85d0cfb97095 100644 --- a/pkg/statistics/handle/ddl/ddl_test.go +++ b/pkg/statistics/handle/ddl/ddl_test.go @@ -114,246 +114,41 @@ func TestDDLTable(t *testing.T) { require.False(t, statsTbl.Pseudo) } -func TestCreateASystemTable(t *testing.T) { +func TestSystemTableDDLHasNoEvent(t *testing.T) { store, do := testkit.CreateMockStoreAndDomain(t) testKit := testkit.NewTestKit(t, store) testKit.MustExec("use test") // Test create a system table. testKit.MustExec("create table mysql.test (c1 int, c2 int)") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() h := do.StatsHandle() - err = h.HandleDDLEvent(<-h.DDLEventCh()) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") -} - -func TestTruncateASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - // Test truncate a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int)") + require.Len(t, h.DDLEventCh(), 0) testKit.MustExec("truncate table mysql.test") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - h := do.StatsHandle() - // Find the truncate table partition event. - truncateTableEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTable) - err = h.HandleDDLEvent(truncateTableEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") -} - -func TestDropASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - // Test drop a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int)") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - tableID := tableInfo.ID - testKit.MustExec("drop table mysql.test") - h := do.StatsHandle() - // Find the drop table partition event. - dropTableEvent := findEvent(h.DDLEventCh(), model.ActionDropTable) - err = h.HandleDDLEvent(dropTableEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - // No stats for the table. - testKit.MustQuery("select count(*) from mysql.stats_meta where table_id = ?", tableID).Check(testkit.Rows("0")) -} - -func TestAddColumnToASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - // Test add column to a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int)") + require.Len(t, h.DDLEventCh(), 0) testKit.MustExec("alter table mysql.test add column c3 int") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - h := do.StatsHandle() - // Find the add column event. - addColumnEvent := findEvent(h.DDLEventCh(), model.ActionAddColumn) - err = h.HandleDDLEvent(addColumnEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") -} - -func TestModifyColumnOfASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - // Test modify column of a system table. - // NOTE: Types have to be different, otherwise it won't trigger the modify column event. - testKit.MustExec("create table mysql.test (c1 varchar(255), c2 int)") - testKit.MustExec("insert into mysql.test values ('1',2)") + require.Len(t, h.DDLEventCh(), 0) testKit.MustExec("alter table mysql.test modify column c1 int") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - h := do.StatsHandle() - // Find the modify column event. - modifyColumnEvent := findEvent(h.DDLEventCh(), model.ActionModifyColumn) - err = h.HandleDDLEvent(modifyColumnEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") -} - -func TestAddNewPartitionToASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - testKit.MustExec("use test") - // Test add new partition to a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int) partition by range (c1) (partition p0 values less than (6))") - // Add partition p1. - testKit.MustExec("alter table mysql.test add partition (partition p1 values less than (11))") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - h := do.StatsHandle() - // Find the add partition event. - addPartitionEvent := findEvent(h.DDLEventCh(), model.ActionAddTablePartition) - err = h.HandleDDLEvent(addPartitionEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") - // Check the partitions' stats. - pi := tableInfo.GetPartitionInfo() - for _, def := range pi.Definitions { - statsTbl := h.GetPartitionStats(tableInfo, def.ID) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") - } -} - -func TestDropPartitionOfASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - h := do.StatsHandle() - testKit.MustExec("use test") - // Test drop partition of a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int) partition by range (c1) (partition p0 values less than (6), partition p1 values less than (11))") - // Drop partition p1. - testKit.MustExec("alter table mysql.test drop partition p1") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - // Find the drop partition event. - dropPartitionEvent := findEvent(h.DDLEventCh(), model.ActionDropTablePartition) - err = h.HandleDDLEvent(dropPartitionEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") - // Check the partitions' stats. - pi := tableInfo.GetPartitionInfo() - for _, def := range pi.Definitions { - statsTbl := h.GetPartitionStats(tableInfo, def.ID) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") - } -} + require.Len(t, h.DDLEventCh(), 0) + testKit.MustExec("drop table mysql.test") + require.Len(t, h.DDLEventCh(), 0) + + testKit.MustExec("create table mysql.test2 (c1 int, c2 int) partition by range (c1) (partition p0 values less than (6))") + require.Len(t, h.DDLEventCh(), 0) + testKit.MustExec("alter table mysql.test2 add partition (partition p1 values less than (11))") + require.Len(t, h.DDLEventCh(), 0) + testKit.MustExec("alter table mysql.test2 truncate partition p1") + require.Len(t, h.DDLEventCh(), 0) + testKit.MustExec("alter table mysql.test2 drop partition p1") + require.Len(t, h.DDLEventCh(), 0) + testKit.MustExec("alter table mysql.test2 remove partitioning") + require.Len(t, h.DDLEventCh(), 0) -func TestExchangePartitionWithASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - h := do.StatsHandle() - testKit.MustExec("use test") - // Test exchange partition with a system table. testKit.MustExec("create table t (c1 int, c2 int, index idx(c1, c2)) partition by range (c1) (partition p0 values less than (6))") - testKit.MustExec("create table mysql.test (c1 int, c2 int, index idx(c1, c2))") - // Insert some data to table t. - testKit.MustExec("insert into t values (1,2),(2,2)") - // Analyze table t. - testKit.MustExec("analyze table t") - // Insert some data to table mysql.test. - testKit.MustExec("insert into mysql.test values (1,2),(2,2)") + <-h.DDLEventCh() + testKit.MustExec("create table mysql.test3 (c1 int, c2 int, index idx(c1, c2))") // Exchange partition. - testKit.MustExec("alter table t exchange partition p0 with table mysql.test") - // Find the exchange partition event. - exchangePartitionEvent := findEvent(h.DDLEventCh(), model.ActionExchangeTablePartition) - err := h.HandleDDLEvent(exchangePartitionEvent) - require.NoError(t, err) - is := do.InfoSchema() - require.Nil(t, h.Update(context.Background(), is)) - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - require.NoError(t, err) - statsTbl := h.GetTableStats(tableInfo) // NOTE: This is a rare case and the effort required to address it outweighs the benefits, hence it is not prioritized for a fix. - require.False(t, statsTbl.Pseudo, "even we skip the DDL event, but the table ID is still changed, so we can see the stats") -} - -func TestRemovePartitioningOfASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - h := do.StatsHandle() - testKit.MustExec("use test") - // Test remove partitioning of a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int) partition by range (c1) (partition p0 values less than (6))") - // Remove partitioning. - testKit.MustExec("alter table mysql.test remove partitioning") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - // Find the remove partitioning event. - removePartitioningEvent := findEvent(h.DDLEventCh(), model.ActionRemovePartitioning) - err = h.HandleDDLEvent(removePartitioningEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") -} - -func TestTruncateAPartitionOfASystemTable(t *testing.T) { - store, do := testkit.CreateMockStoreAndDomain(t) - testKit := testkit.NewTestKit(t, store) - h := do.StatsHandle() - testKit.MustExec("use test") - // Test truncate a partition of a system table. - testKit.MustExec("create table mysql.test (c1 int, c2 int) partition by range (c1) (partition p0 values less than (6), partition p1 values less than (11))") - // Truncate partition p1. - testKit.MustExec("alter table mysql.test truncate partition p1") - is := do.InfoSchema() - tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("mysql"), pmodel.NewCIStr("test")) - require.NoError(t, err) - tableInfo := tbl.Meta() - // Find the truncate partition event. - truncatePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition) - err = h.HandleDDLEvent(truncatePartitionEvent) - require.NoError(t, err) - require.Nil(t, h.Update(context.Background(), is)) - statsTbl := h.GetTableStats(tableInfo) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") - // Check the partitions' stats. - pi := tableInfo.GetPartitionInfo() - for _, def := range pi.Definitions { - statsTbl := h.GetPartitionStats(tableInfo, def.ID) - require.True(t, statsTbl.Pseudo, "we should not collect stats for system tables") - } + testKit.MustExec("alter table t exchange partition p0 with table mysql.test3") + require.Len(t, h.DDLEventCh(), 0) } func TestTruncateTable(t *testing.T) { @@ -1511,6 +1306,9 @@ func findEvent(eventCh <-chan *util.DDLEvent, eventType model.ActionType) *util. // Find the target event. for { event := <-eventCh + if event.SchemaChangeEvent.GetType() == eventType { + return event + } if event.GetType() == eventType { return event } diff --git a/pkg/statistics/handle/util/ddl_event.go b/pkg/statistics/handle/util/ddl_event.go index aed9dcf76edd6..f7036693df451 100644 --- a/pkg/statistics/handle/util/ddl_event.go +++ b/pkg/statistics/handle/util/ddl_event.go @@ -29,6 +29,8 @@ import ( // DDLEvent contains the information of a ddl event that is used to update stats. type DDLEvent struct { + // todo: replace DDLEvent by SchemaChangeEvent gradually + SchemaChangeEvent *ddlutil.SchemaChangeEvent // For different ddl types, the following fields are used. // They have different meanings for different ddl types. // Please do **not** use these fields directly, use the corresponding @@ -38,6 +40,7 @@ type DDLEvent struct { oldTableInfo *model.TableInfo oldPartInfo *model.PartitionInfo columnInfos []*model.ColumnInfo + // schemaID is the ID of the schema that the table belongs to. // Used to filter out the system or memory tables. schemaID int64 @@ -45,9 +48,6 @@ type DDLEvent struct { // It applies when a table structure is being changed from partitioned to non-partitioned, or vice versa. oldTableID int64 tp model.ActionType - - // todo: replace DDLEvent by SchemaChangeEvent gradually - SchemaChangeEvent ddlutil.SchemaChangeEvent } // IsMemOrSysDB checks whether the table is in the memory or system database. @@ -61,42 +61,6 @@ func (e *DDLEvent) IsMemOrSysDB(sctx sessionctx.Context) (bool, error) { return util.IsMemOrSysDB(schema.Name.L), nil } -// NewCreateTableEvent creates a new ddl event that creates a table. -func NewCreateTableEvent( - schemaID int64, - newTableInfo *model.TableInfo, -) *DDLEvent { - return &DDLEvent{ - tp: model.ActionCreateTable, - schemaID: schemaID, - tableInfo: newTableInfo, - } -} - -// GetCreateTableInfo gets the table info of the table that is created. -func (e *DDLEvent) GetCreateTableInfo() (newTableInfo *model.TableInfo) { - return e.tableInfo -} - -// NewTruncateTableEvent creates a new ddl event that truncates a table. -func NewTruncateTableEvent( - schemaID int64, - newTableInfo *model.TableInfo, - droppedTableInfo *model.TableInfo, -) *DDLEvent { - return &DDLEvent{ - tp: model.ActionTruncateTable, - schemaID: schemaID, - tableInfo: newTableInfo, - oldTableInfo: droppedTableInfo, - } -} - -// GetTruncateTableInfo gets the table info of the table that is truncated. -func (e *DDLEvent) GetTruncateTableInfo() (newTableInfo *model.TableInfo, droppedTableInfo *model.TableInfo) { - return e.tableInfo, e.oldTableInfo -} - // NewDropTableEvent creates a new ddl event that drops a table. func NewDropTableEvent( schemaID int64,