Skip to content

Commit

Permalink
ddl, stats: switch to new struct for create/truncate table (#55811)
Browse files Browse the repository at this point in the history
ref #55723
  • Loading branch information
lance6716 committed Sep 5, 2024
1 parent 9c7f4eb commit c90168e
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 347 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, e
tblInfo,
[]*model.ColumnInfo{columnInfo},
)
asyncNotifyEvent(jobCtx, addColumnEvent)
asyncNotifyEvent(jobCtx, addColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent())
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
Expand All @@ -844,7 +844,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
}
}

asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent())
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, t, job)
Expand Down
29 changes: 13 additions & 16 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -181,11 +182,10 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := statsutil.NewCreateTableEvent(
job.SchemaID,
tbInfo,
)
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -213,11 +213,10 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := statsutil.NewCreateTableEvent(
job.SchemaID,
tbInfo,
)
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, nil
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tbInfo.State))
Expand Down Expand Up @@ -270,13 +269,11 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, args)

for i := range args {
createTableEvent := statsutil.NewCreateTableEvent(
job.SchemaID,
args[i],
)
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(args[i]),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
}

return ver, errors.Trace(err)
Expand Down
9 changes: 8 additions & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,14 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {

// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent) {
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent, job *model.Job) {
// skip notify for system databases, system databases are expected to change at
// bootstrap and other nodes can also handle the changing in its bootstrap rather
// than be notified.
if tidbutil.IsMemOrSysDB(job.SchemaName) {
return
}

ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (w *worker) doModifyColumnTypeWithData(
tblInfo,
[]*model.ColumnInfo{changingCol},
)
asyncNotifyEvent(jobCtx, modifyColumnEvent)
asyncNotifyEvent(jobCtx, modifyColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, t *meta.Meta, job *mode
tblInfo,
partInfo,
)
asyncNotifyEvent(jobCtx, addPartitionEvent)
asyncNotifyEvent(jobCtx, addPartitionEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
}
Expand Down Expand Up @@ -2338,7 +2338,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, t *meta.Meta, job *mod
tblInfo,
&model.PartitionInfo{Definitions: droppedDefs},
)
asyncNotifyEvent(jobCtx, dropPartitionEvent)
asyncNotifyEvent(jobCtx, dropPartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}
default:
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}

Expand Down Expand Up @@ -2570,7 +2570,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}
default:
Expand Down Expand Up @@ -2943,7 +2943,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: []model.PartitionDefinition{originalPartitionDef}},
originalNt,
)
asyncNotifyEvent(jobCtx, exchangePartitionEvent)
asyncNotifyEvent(jobCtx, exchangePartitionEvent, job)
return ver, nil
}

Expand Down Expand Up @@ -3481,7 +3481,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo
if err != nil {
return ver, errors.Trace(err)
}
asyncNotifyEvent(jobCtx, event)
asyncNotifyEvent(jobCtx, event, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}

Expand Down
17 changes: 8 additions & 9 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/placement"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta"
Expand All @@ -42,7 +43,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
tidb_util "github.com/pingcap/tidb/pkg/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gcutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -127,7 +128,7 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in
job.SchemaID,
tblInfo,
)
asyncNotifyEvent(jobCtx, dropTableEvent)
asyncNotifyEvent(jobCtx, dropTableEvent, job)
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State))
Expand Down Expand Up @@ -569,12 +570,10 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
truncateTableEvent := statsutil.NewTruncateTableEvent(
job.SchemaID,
tblInfo,
oldTblInfo,
)
asyncNotifyEvent(jobCtx, truncateTableEvent)
truncateTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo),
}
asyncNotifyEvent(jobCtx, truncateTableEvent, job)
startKey := tablecodec.EncodeTablePrefix(tableID)
job.Args = []any{startKey, oldPartitionIDs}
return ver, nil
Expand Down Expand Up @@ -1074,7 +1073,7 @@ func (w *worker) onSetTableFlashReplica(jobCtx *jobContext, t *meta.Meta, job *m
}

// Ban setting replica count for tables in system database.
if tidb_util.IsMemOrSysDB(job.SchemaName) {
if tidbutil.IsMemOrSysDB(job.SchemaName) {
return ver, errors.Trace(dbterror.ErrUnsupportedTiFlashOperationForSysOrMemTable)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/table/tables",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/mock",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
Expand Down
80 changes: 77 additions & 3 deletions pkg/ddl/util/schema_change_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,90 @@

package util

import "github.com/pingcap/tidb/pkg/meta/model"
import (
"fmt"
"strings"

// SchemaChangeEvent stands for a schema change event. DDL will generate one event or multiple events (only for multi-schema change DDL).
// The caller should check the Type field of SchemaChange and call the corresponding getter function to retrieve the needed information.
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util/intest"
)

// SchemaChangeEvent stands for a schema change event. DDL will generate one
// event or multiple events (only for multi-schema change DDL). The caller should
// check the GetType of SchemaChange and call the corresponding getter function
// to retrieve the needed information.
type SchemaChangeEvent struct {
// todo: field and method will be added in the next few pr on demand
newTableInfo *model.TableInfo
oldTableInfo *model.TableInfo

tp model.ActionType
}

// String implements fmt.Stringer interface.
func (s *SchemaChangeEvent) String() string {
if s == nil {
return "nil SchemaChangeEvent"
}

var sb strings.Builder
_, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.tp)
if s.newTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.newTableInfo.ID, s.newTableInfo.Name)
}
if s.oldTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.oldTableInfo.ID, s.oldTableInfo.Name)
}
sb.WriteString(")")

return sb.String()
}

// GetType returns the type of the schema change event.
func (s *SchemaChangeEvent) GetType() model.ActionType {
if s == nil {
return model.ActionNone
}
return s.tp
}

// NewCreateTableEvent creates a SchemaChangeEvent whose type is
// ActionCreateTable.
func NewCreateTableEvent(
newTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionCreateTable,
newTableInfo: newTableInfo,
}
}

// GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type
// is ActionCreateTable.
func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo {
intest.Assert(s.tp == model.ActionCreateTable)
return s.newTableInfo
}

// NewTruncateTableEvent creates a SchemaChangeEvent whose type is
// ActionTruncateTable.
func NewTruncateTableEvent(
newTableInfo *model.TableInfo,
droppedTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionTruncateTable,
newTableInfo: newTableInfo,
oldTableInfo: droppedTableInfo,
}
}

// GetTruncateTableInfo returns the new and old table info of the
// SchemaChangeEvent whose type is ActionTruncateTable.
func (s *SchemaChangeEvent) GetTruncateTableInfo() (
newTableInfo *model.TableInfo,
droppedTableInfo *model.TableInfo,
) {
intest.Assert(s.tp == model.ActionTruncateTable)
return s.newTableInfo, s.oldTableInfo
}
4 changes: 2 additions & 2 deletions pkg/session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
tidb_util "github.com/pingcap/tidb/pkg/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
cmt := fmt.Sprintf("job: %s", runJob.String())
isPause := runJob.IsPausedBySystem() || runJob.IsPausing()
if tidb_util.IsSysDB(runJob.SchemaName) {
if tidbutil.IsSysDB(runJob.SchemaName) {
require.False(t, isPause, cmt)
} else {
require.True(t, isPause, cmt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ go_test(
timeout = "short",
srcs = ["ddl_test.go"],
flaky = True,
shard_count = 27,
shard_count = 18,
deps = [
"//pkg/meta/model",
"//pkg/parser/model",
Expand Down
Loading

0 comments on commit c90168e

Please sign in to comment.