Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, stats: switch to new struct for create/truncate table #55811

Merged
merged 19 commits into from
Sep 5, 2024
Merged
2 changes: 1 addition & 1 deletion pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, e
tblInfo,
[]*model.ColumnInfo{columnInfo},
)
asyncNotifyEvent(jobCtx, addColumnEvent)
asyncNotifyEvent(jobCtx, addColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
if inFlashbackTest {
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent())
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
Expand All @@ -844,7 +844,7 @@ func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model
}
}

asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent())
asyncNotifyEvent(jobCtx, statsutil.NewFlashbackClusterEvent(), job)
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return updateSchemaVersion(jobCtx, t, job)
Expand Down
29 changes: 13 additions & 16 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down Expand Up @@ -181,11 +182,10 @@ func onCreateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := statsutil.NewCreateTableEvent(
job.SchemaID,
tbInfo,
)
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -213,11 +213,10 @@ func createTableWithForeignKeys(jobCtx *jobContext, t *meta.Meta, job *model.Job
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
createTableEvent := statsutil.NewCreateTableEvent(
job.SchemaID,
tbInfo,
)
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(tbInfo),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
return ver, nil
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tbInfo.State))
Expand Down Expand Up @@ -270,13 +269,11 @@ func onCreateTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int64, er
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
job.BinlogInfo.SetTableInfos(ver, args)

for i := range args {
createTableEvent := statsutil.NewCreateTableEvent(
job.SchemaID,
args[i],
)
asyncNotifyEvent(jobCtx, createTableEvent)
createTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewCreateTableEvent(args[i]),
}
asyncNotifyEvent(jobCtx, createTableEvent, job)
}

return ver, errors.Trace(err)
Expand Down
9 changes: 8 additions & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,14 @@ func (d *ddl) RegisterStatsHandle(h *handle.Handle) {

// asyncNotifyEvent will notify the ddl event to outside world, say statistic handle. When the channel is full, we may
// give up notify and log it.
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent) {
func asyncNotifyEvent(jobCtx *jobContext, e *statsutil.DDLEvent, job *model.Job) {
// skip notify for system databases, system databases are expected to change at
// bootstrap and other nodes can also handle the changing in its bootstrap rather
// than be notified.
if tidbutil.IsMemOrSysDB(job.SchemaName) {
return
}

ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (w *worker) doModifyColumnTypeWithData(
tblInfo,
[]*model.ColumnInfo{changingCol},
)
asyncNotifyEvent(jobCtx, modifyColumnEvent)
asyncNotifyEvent(jobCtx, modifyColumnEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, t *meta.Meta, job *mode
tblInfo,
partInfo,
)
asyncNotifyEvent(jobCtx, addPartitionEvent)
asyncNotifyEvent(jobCtx, addPartitionEvent, job)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState)
}
Expand Down Expand Up @@ -2338,7 +2338,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, t *meta.Meta, job *mod
tblInfo,
&model.PartitionInfo{Definitions: droppedDefs},
)
asyncNotifyEvent(jobCtx, dropPartitionEvent)
asyncNotifyEvent(jobCtx, dropPartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}
default:
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}

Expand Down Expand Up @@ -2570,7 +2570,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: newPartitions},
&model.PartitionInfo{Definitions: oldPartitions},
)
asyncNotifyEvent(jobCtx, truncatePartitionEvent)
asyncNotifyEvent(jobCtx, truncatePartitionEvent, job)
// A background job will be created to delete old partition data.
job.Args = []any{oldIDs}
default:
Expand Down Expand Up @@ -2943,7 +2943,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job
&model.PartitionInfo{Definitions: []model.PartitionDefinition{originalPartitionDef}},
originalNt,
)
asyncNotifyEvent(jobCtx, exchangePartitionEvent)
asyncNotifyEvent(jobCtx, exchangePartitionEvent, job)
return ver, nil
}

Expand Down Expand Up @@ -3481,7 +3481,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo
if err != nil {
return ver, errors.Trace(err)
}
asyncNotifyEvent(jobCtx, event)
asyncNotifyEvent(jobCtx, event, job)
// A background job will be created to delete old partition data.
job.Args = []any{physicalTableIDs}

Expand Down
17 changes: 8 additions & 9 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/ddl/placement"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta"
Expand All @@ -42,7 +43,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
tidb_util "github.com/pingcap/tidb/pkg/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/gcutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -127,7 +128,7 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in
job.SchemaID,
tblInfo,
)
asyncNotifyEvent(jobCtx, dropTableEvent)
asyncNotifyEvent(jobCtx, dropTableEvent, job)
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State))
Expand Down Expand Up @@ -569,12 +570,10 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
truncateTableEvent := statsutil.NewTruncateTableEvent(
job.SchemaID,
tblInfo,
oldTblInfo,
)
asyncNotifyEvent(jobCtx, truncateTableEvent)
truncateTableEvent := &statsutil.DDLEvent{
SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo),
}
asyncNotifyEvent(jobCtx, truncateTableEvent, job)
startKey := tablecodec.EncodeTablePrefix(tableID)
job.Args = []any{startKey, oldPartitionIDs}
return ver, nil
Expand Down Expand Up @@ -1074,7 +1073,7 @@ func (w *worker) onSetTableFlashReplica(jobCtx *jobContext, t *meta.Meta, job *m
}

// Ban setting replica count for tables in system database.
if tidb_util.IsMemOrSysDB(job.SchemaName) {
if tidbutil.IsMemOrSysDB(job.SchemaName) {
return ver, errors.Trace(dbterror.ErrUnsupportedTiFlashOperationForSysOrMemTable)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/table/tables",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/mock",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
Expand Down
80 changes: 77 additions & 3 deletions pkg/ddl/util/schema_change_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,90 @@

package util

import "github.com/pingcap/tidb/pkg/meta/model"
import (
"fmt"
"strings"

// SchemaChangeEvent stands for a schema change event. DDL will generate one event or multiple events (only for multi-schema change DDL).
// The caller should check the Type field of SchemaChange and call the corresponding getter function to retrieve the needed information.
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util/intest"
)

// SchemaChangeEvent stands for a schema change event. DDL will generate one
// event or multiple events (only for multi-schema change DDL). The caller should
// check the GetType of SchemaChange and call the corresponding getter function
// to retrieve the needed information.
type SchemaChangeEvent struct {
// todo: field and method will be added in the next few pr on demand
newTableInfo *model.TableInfo
oldTableInfo *model.TableInfo

tp model.ActionType
}

// String implements fmt.Stringer interface.
func (s *SchemaChangeEvent) String() string {
if s == nil {
return "nil SchemaChangeEvent"
}

var sb strings.Builder
_, _ = fmt.Fprintf(&sb, "(Event Type: %s", s.tp)
if s.newTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Table ID: %d, Table Name: %s", s.newTableInfo.ID, s.newTableInfo.Name)
}
if s.oldTableInfo != nil {
_, _ = fmt.Fprintf(&sb, ", Old Table ID: %d, Old Table Name: %s", s.oldTableInfo.ID, s.oldTableInfo.Name)
}
sb.WriteString(")")

return sb.String()
}

// GetType returns the type of the schema change event.
func (s *SchemaChangeEvent) GetType() model.ActionType {
if s == nil {
return model.ActionNone
}
return s.tp
}

// NewCreateTableEvent creates a SchemaChangeEvent whose type is
// ActionCreateTable.
func NewCreateTableEvent(
newTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionCreateTable,
newTableInfo: newTableInfo,
}
}

// GetCreateTableInfo returns the table info of the SchemaChangeEvent whose type
// is ActionCreateTable.
func (s *SchemaChangeEvent) GetCreateTableInfo() *model.TableInfo {
intest.Assert(s.tp == model.ActionCreateTable)
return s.newTableInfo
}

// NewTruncateTableEvent creates a SchemaChangeEvent whose type is
// ActionTruncateTable.
func NewTruncateTableEvent(
newTableInfo *model.TableInfo,
droppedTableInfo *model.TableInfo,
) *SchemaChangeEvent {
return &SchemaChangeEvent{
tp: model.ActionTruncateTable,
newTableInfo: newTableInfo,
oldTableInfo: droppedTableInfo,
}
}

// GetTruncateTableInfo returns the new and old table info of the
// SchemaChangeEvent whose type is ActionTruncateTable.
func (s *SchemaChangeEvent) GetTruncateTableInfo() (
newTableInfo *model.TableInfo,
droppedTableInfo *model.TableInfo,
) {
intest.Assert(s.tp == model.ActionTruncateTable)
return s.newTableInfo, s.oldTableInfo
}
4 changes: 2 additions & 2 deletions pkg/session/bootstraptest/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
tidb_util "github.com/pingcap/tidb/pkg/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestUpgradeWithPauseDDL(t *testing.T) {
require.NoError(t, err)
cmt := fmt.Sprintf("job: %s", runJob.String())
isPause := runJob.IsPausedBySystem() || runJob.IsPausing()
if tidb_util.IsSysDB(runJob.SchemaName) {
if tidbutil.IsSysDB(runJob.SchemaName) {
require.False(t, isPause, cmt)
} else {
require.True(t, isPause, cmt)
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ go_test(
timeout = "short",
srcs = ["ddl_test.go"],
flaky = True,
shard_count = 27,
shard_count = 18,
deps = [
"//pkg/meta/model",
"//pkg/parser/model",
Expand Down
Loading