diff --git a/cdc/api/validator.go b/cdc/api/validator.go index 8c7a5878c68..f2f053e0f17 100644 --- a/cdc/api/validator.go +++ b/cdc/api/validator.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tiflow/cdc/capture" - "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink" @@ -215,25 +215,25 @@ func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s if err != nil { return nil, nil, errors.Trace(err) } - snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */) + snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */) if err != nil { return nil, nil, errors.Trace(err) } - for _, tableInfo := range snap.Tables() { + snap.IterTables(true, func(tableInfo *model.TableInfo) { if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { - continue + return } // Sequence is not supported yet, TiCDC needs to filter all sequence tables. // See https://github.com/pingcap/tiflow/issues/4559 if tableInfo.IsSequence() { - continue + return } if !tableInfo.IsEligible(false /* forceReplicate */) { ineligibleTables = append(ineligibleTables, tableInfo.TableName) } else { eligibleTables = append(eligibleTables, tableInfo.TableName) } - } + }) return } diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index f6f4394a7ed..6396b9bfe60 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -276,7 +276,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct { err := scheamStorage.HandleDDLJob(job) require.Nil(t, err) } - tableInfo, ok := scheamStorage.GetLastSnapshot().GetTableByName("test", tc.tableName) + tableInfo, ok := scheamStorage.GetLastSnapshot().TableByName("test", tc.tableName) require.True(t, ok) if tableInfo.IsCommonHandle { // we can check this log to make sure if the clustered-index is enabled diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go new file mode 100644 index 00000000000..0636e1bf817 --- /dev/null +++ b/cdc/entry/schema/snapshot.go @@ -0,0 +1,1156 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "math" + "strings" + "sync" + + "github.com/google/btree" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" + + timeta "github.com/pingcap/tidb/meta" + timodel "github.com/pingcap/tidb/parser/model" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// Snapshot stores the source TiDB all schema information. +// If no special comments, all public methods are thread-safe. +type Snapshot struct { + inner snapshot + rwlock *sync.RWMutex +} + +// PreTableInfo returns the table info which will be overwritten by the specified job +func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) { + switch job.Type { + case timodel.ActionCreateSchema, timodel.ActionModifySchemaCharsetAndCollate, timodel.ActionDropSchema: + return nil, nil + case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable: + // no pre table info + return nil, nil + case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable: + // get the table will be dropped + table, ok := s.PhysicalTableByID(job.TableID) + if !ok { + return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(job.TableID) + } + return table, nil + case timodel.ActionRenameTables: + // DDL on multiple tables, ignore pre table info + return nil, nil + default: + binlogInfo := job.BinlogInfo + if binlogInfo == nil { + log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) + return nil, nil + } + tbInfo := binlogInfo.TableInfo + if tbInfo == nil { + log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) + return nil, nil + } + tableID := tbInfo.ID + table, ok := s.PhysicalTableByID(tableID) + if !ok { + return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(job.TableID) + } + return table, nil + } +} + +// FillSchemaName fills the schema name in ddl job. +func (s *Snapshot) FillSchemaName(job *timodel.Job) error { + if job.Type == timodel.ActionRenameTables { + // DDLs on multiple schema or tables, ignore them. + return nil + } + if job.Type == timodel.ActionCreateSchema || + job.Type == timodel.ActionDropSchema { + job.SchemaName = job.BinlogInfo.DBInfo.Name.O + return nil + } + dbInfo, exist := s.SchemaByID(job.SchemaID) + if !exist { + return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(job.SchemaID) + } + job.SchemaName = dbInfo.Name.O + return nil +} + +// NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta +func NewSingleSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*Snapshot, error) { + // meta is nil only in unit tests + if meta == nil { + snap := NewEmptySnapshot(forceReplicate) + snap.inner.currentTs = currentTs + return snap, nil + } + return NewSnapshotFromMeta(meta, currentTs, forceReplicate) +} + +// NewSnapshotFromMeta creates a schema snapshot from meta. +func NewSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*Snapshot, error) { + snap := NewEmptySnapshot(forceReplicate) + dbinfos, err := meta.ListDatabases() + if err != nil { + return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) + } + // `tag` is used to reverse sort all versions in the generated snapshot. + tag := negative(currentTs) + + for _, dbinfo := range dbinfos { + vid := newVersionedID(dbinfo.ID, tag) + vid.target = dbinfo + snap.inner.schemas.ReplaceOrInsert(vid) + + vname := newVersionedEntityName(-1, dbinfo.Name.O, tag) // -1 means the entity is a schema. + vname.target = dbinfo.ID + snap.inner.schemaNameToID.ReplaceOrInsert(vname) + + tableInfos, err := meta.ListTables(dbinfo.ID) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) + } + for _, tableInfo := range tableInfos { + tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) + snap.inner.tables.ReplaceOrInsert(versionedID{ + id: tableInfo.ID, + tag: tag, + target: tableInfo, + }) + snap.inner.tableNameToID.ReplaceOrInsert(versionedEntityName{ + prefix: dbinfo.ID, + entity: tableInfo.Name.O, + tag: tag, + target: tableInfo.ID, + }) + + ineligible := !tableInfo.IsEligible(forceReplicate) + if ineligible { + snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag}) + } + if pi := tableInfo.GetPartitionInfo(); pi != nil { + for _, partition := range pi.Definitions { + vid := newVersionedID(partition.ID, tag) + vid.target = tableInfo + snap.inner.partitions.ReplaceOrInsert(vid) + if ineligible { + snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: partition.ID, tag: tag}) + } + } + } + } + } + + snap.inner.currentTs = currentTs + return snap, nil +} + +// NewEmptySnapshot creates an empty schema snapshot. +func NewEmptySnapshot(forceReplicate bool) *Snapshot { + inner := snapshot{ + tableNameToID: btree.New(16), + schemaNameToID: btree.New(16), + schemas: btree.New(16), + tables: btree.New(16), + partitions: btree.New(16), + truncatedTables: btree.New(16), + ineligibleTables: btree.New(16), + forceReplicate: forceReplicate, + currentTs: 0, + } + return &Snapshot{inner: inner, rwlock: new(sync.RWMutex)} +} + +// Copy creates a new schema snapshot based on the given one. The copied one shares same internal +// data structures with the old one to save memory usage. +func (s *Snapshot) Copy() *Snapshot { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return &Snapshot{inner: s.inner, rwlock: s.rwlock} +} + +// PrintStatus prints the schema snapshot. +func (s *Snapshot) PrintStatus(logger func(msg string, fields ...zap.Field)) { + logger("[SchemaSnap] Start to print status", zap.Uint64("currentTs", s.CurrentTs())) + + availableSchemas := make(map[int64]string, s.inner.schemas.Len()) + s.IterSchemas(func(dbInfo *timodel.DBInfo) { + availableSchemas[dbInfo.ID] = dbInfo.Name.O + logger("[SchemaSnap] --> Schemas", zap.Int64("schemaID", dbInfo.ID), zap.Reflect("dbInfo", dbInfo)) + // check schemaNameToID + id, ok := s.inner.schemaIDByName(dbInfo.Name.O) + if !ok || id != dbInfo.ID { + logger("[SchemaSnap] ----> schemaNameToID item lost", zap.String("name", dbInfo.Name.O), zap.Int64("schemaNameToID", id)) + } + }) + s.IterSchemaNames(func(schema string, target int64) { + if _, ok := availableSchemas[target]; !ok { + logger("[SchemaSnap] ----> schemas item lost", zap.String("name", schema), zap.Int64("schema", target)) + } + }) + + availableTables := make(map[int64]struct{}, s.inner.tables.Len()) + s.IterTables(true, func(tableInfo *model.TableInfo) { + availableTables[tableInfo.ID] = struct{}{} + logger("[SchemaSnap] --> Tables", zap.Int64("tableID", tableInfo.ID), + zap.Stringer("tableInfo", tableInfo), + zap.Bool("ineligible", s.inner.isIneligibleTableID(tableInfo.ID))) + id, ok := s.inner.tableIDByName(tableInfo.TableName.Schema, tableInfo.TableName.Table) + if !ok || id != tableInfo.ID { + logger("[SchemaSnap] ----> tableNameToID item lost", zap.Stringer("name", tableInfo.TableName), zap.Int64("tableNameToID", id)) + } + }) + s.IterTableNames(func(schemaID int64, table string, target int64) { + if _, ok := availableTables[target]; !ok { + name := fmt.Sprintf("%s.%s", availableSchemas[schemaID], table) + logger("[SchemaSnap] ----> tables item lost", zap.String("name", name), zap.Int64("table", target)) + } + }) + + s.IterPartitions(true, func(pid int64, table *model.TableInfo) { + logger("[SchemaSnap] --> Partitions", zap.Int64("partitionID", pid), zap.Int64("tableID", table.ID), + zap.Bool("ineligible", s.inner.isIneligibleTableID(pid))) + }) +} + +// IterSchemas iterates all schemas in the snapshot. +func (s *Snapshot) IterSchemas(f func(i *timodel.DBInfo)) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + s.inner.iterSchemas(f) +} + +// IterSchemaNames iterates all schema names in the snapshot. +func (s *Snapshot) IterSchemaNames(f func(schema string, target int64)) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + s.inner.iterSchemaNames(f) +} + +// IterTables iterates all tables in the snapshot. +func (s *Snapshot) IterTables(includeIneligible bool, f func(i *model.TableInfo)) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + s.inner.iterTables(includeIneligible, f) +} + +// IterTableNames iterates all table names in the snapshot. +func (s *Snapshot) IterTableNames(f func(schema int64, table string, target int64)) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + s.inner.iterTableNames(f) +} + +// IterPartitions iterates all partitions in the snapshot. +func (s *Snapshot) IterPartitions(includeIneligible bool, f func(id int64, i *model.TableInfo)) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + s.inner.iterPartitions(includeIneligible, f) +} + +// SchemaByID returns the DBInfo by schema id. +// The second returned value is false if no schema with the specified id is found. +// NOTE: The returned table info should always be READ-ONLY! +func (s *Snapshot) SchemaByID(id int64) (*timodel.DBInfo, bool) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return s.inner.schemaByID(id) +} + +// PhysicalTableByID returns the TableInfo by table id or partition id. +// The second returned value is false if no table with the specified id is found. +// NOTE: The returned table info should always be READ-ONLY! +func (s *Snapshot) PhysicalTableByID(id int64) (*model.TableInfo, bool) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return s.inner.physicalTableByID(id) +} + +// SchemaIDByName gets the schema id from the given schema name. +func (s *Snapshot) SchemaIDByName(schema string) (int64, bool) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return s.inner.schemaIDByName(schema) +} + +// TableIDByName returns the tableID by table schemaName and tableName. +// The second returned value is false if no table with the specified name is found. +func (s *Snapshot) TableIDByName(schema string, table string) (int64, bool) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return s.inner.tableIDByName(schema, table) +} + +// TableByName queries a table by name, +// The second returned value is false if no table with the specified name is found. +// NOTE: The returned table info should always be READ-ONLY! +func (s *Snapshot) TableByName(schema, table string) (*model.TableInfo, bool) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return s.inner.tableByName(schema, table) +} + +// SchemaByTableID returns the schema ID by table ID. +func (s *Snapshot) SchemaByTableID(tableID int64) (*timodel.DBInfo, bool) { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + tableInfo, ok := s.inner.physicalTableByID(tableID) + if !ok { + return nil, false + } + return s.inner.schemaByID(tableInfo.SchemaID) +} + +// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL. +func (s *Snapshot) IsTruncateTableID(id int64) bool { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + tag, ok := s.inner.tableTagByID(id, true) + return ok && s.inner.truncatedTables.Get(newVersionedID(id, tag)) != nil +} + +// IsIneligibleTableID returns true if the table is ineligible. +func (s *Snapshot) IsIneligibleTableID(id int64) bool { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + return s.inner.isIneligibleTableID(id) +} + +// HandleDDL handles the given job. +func (s *Snapshot) HandleDDL(job *timodel.Job) error { + if err := s.FillSchemaName(job); err != nil { + return errors.Trace(err) + } + return s.DoHandleDDL(job) +} + +// CurrentTs returns the finish timestamp of the schema snapshot. +func (s *Snapshot) CurrentTs() uint64 { + return s.inner.currentTs +} + +// Drop drops the snapshot. It must be called when GC some snapshots. +// Drop a snapshot will also drop all snapshots with a less timestamp. +func (s *Snapshot) Drop() { + s.rwlock.Lock() + defer s.rwlock.Unlock() + s.inner.drop() +} + +// DoHandleDDL is like HandleDDL but doesn't fill schema name into job. +// NOTE: it's public because some tests in the upper package need this. +func (s *Snapshot) DoHandleDDL(job *timodel.Job) error { + s.rwlock.Lock() + defer s.rwlock.Unlock() + + getWrapTableInfo := func(job *timodel.Job) *model.TableInfo { + return model.WrapTableInfo(job.SchemaID, job.SchemaName, + job.BinlogInfo.FinishedTS, + job.BinlogInfo.TableInfo) + } + switch job.Type { + case timodel.ActionCreateSchema: + // get the DBInfo from job rawArgs + err := s.inner.createSchema(job.BinlogInfo.DBInfo, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionModifySchemaCharsetAndCollate: + err := s.inner.replaceSchema(job.BinlogInfo.DBInfo, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionDropSchema: + err := s.inner.dropSchema(job.SchemaID, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionRenameTable: + // first drop the table + err := s.inner.dropTable(job.TableID, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + // create table + err = s.inner.createTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionRenameTables: + err := s.inner.renameTables(job, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable: + err := s.inner.createTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionDropTable, timodel.ActionDropView: + err := s.inner.dropTable(job.TableID, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + + case timodel.ActionTruncateTable: + // job.TableID is the old table id, different from table.ID + err := s.inner.truncateTable(job.TableID, getWrapTableInfo(job), job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionTruncateTablePartition, timodel.ActionAddTablePartition, timodel.ActionDropTablePartition: + err := s.inner.updatePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + default: + binlogInfo := job.BinlogInfo + if binlogInfo == nil { + log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) + return nil + } + tbInfo := binlogInfo.TableInfo + if tbInfo == nil { + log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) + return nil + } + err := s.inner.replaceTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + } + if s.inner.currentTs != job.BinlogInfo.FinishedTS { + panic("HandleDDL should update currentTs") + } + return nil +} + +// TableCount counts tables in the snapshot. It's only for tests. +func (s *Snapshot) TableCount(includeIneligible bool) (count int) { + s.IterTables(includeIneligible, func(i *model.TableInfo) { count += 1 }) + return +} + +// SchemaCount counts schemas in the snapshot. It's only for tests. +func (s *Snapshot) SchemaCount() (count int) { + s.IterSchemas(func(i *timodel.DBInfo) { count += 1 }) + return +} + +// DumpToString dumps the snapshot to a string. +func (s *Snapshot) DumpToString() string { + schemas := make([]string, 0, s.inner.schemas.Len()) + s.IterSchemas(func(dbInfo *timodel.DBInfo) { + schemas = append(schemas, fmt.Sprintf("%v", dbInfo)) + }) + + tables := make([]string, 0, s.inner.tables.Len()) + s.IterTables(true, func(tbInfo *model.TableInfo) { + tables = append(tables, fmt.Sprintf("%v", tbInfo)) + }) + + partitions := make([]string, 0, s.inner.partitions.Len()) + s.IterPartitions(true, func(id int64, _ *model.TableInfo) { + partitions = append(partitions, fmt.Sprintf("%d", id)) + }) + + schemaNames := make([]string, 0, s.inner.schemaNameToID.Len()) + s.IterSchemaNames(func(schema string, target int64) { + schemaNames = append(schemaNames, fmt.Sprintf("%s:%d", schema, target)) + }) + + tableNames := make([]string, 0, s.inner.tableNameToID.Len()) + s.IterTableNames(func(schemaID int64, table string, target int64) { + schema, _ := s.inner.schemaByID(schemaID) + tableNames = append(tableNames, fmt.Sprintf("%s.%s:%d", schema.Name.O, table, target)) + }) + + return fmt.Sprintf("%s\n%s\n%s\n%s\n%s", + strings.Join(schemas, "\t"), + strings.Join(tables, "\t"), + strings.Join(partitions, "\t"), + strings.Join(schemaNames, "\t"), + strings.Join(tableNames, "\t")) +} + +type snapshot struct { + // map[versionedEntityName] -> int64 + // The ID can be `-1` which means the table is deleted. + tableNameToID *btree.BTree + + // map[versionedEntityName] -> int64 + // The ID can be `-1` which means the table is deleted. + schemaNameToID *btree.BTree + + // map[versionedID] -> *timodel.DBInfo + // The target can be `nil` which means the entity is deleted. + schemas *btree.BTree + + // map[versionedID] -> *model.TableInfo + // The target can be `nil` which means the entity is deleted. + tables *btree.BTree + + // map[versionedID] -> *model.TableInfo + partitions *btree.BTree + + // map[versionedID] -> struct{} + truncatedTables *btree.BTree + + // map[versionedID] -> struct{} + // Partitions and tables share ineligibleTables because their IDs won't conflict. + ineligibleTables *btree.BTree + + // if forceReplicate is true, treat ineligible tables as eligible. + forceReplicate bool + + currentTs uint64 +} + +func (s *snapshot) schemaByID(id int64) (val *timodel.DBInfo, ok bool) { + tag := negative(s.currentTs) + start := versionedID{id: id, tag: tag, target: nil} + end := versionedID{id: id, tag: negative(uint64(0)), target: nil} + s.schemas.AscendRange(start, end, func(i btree.Item) bool { + val = targetToDBInfo(i.(versionedID).target) + ok = val != nil + return false + }) + return +} + +func (s *snapshot) physicalTableByID(id int64) (tableInfo *model.TableInfo, ok bool) { + tag := negative(s.currentTs) + start := versionedID{id: id, tag: tag, target: nil} + end := versionedID{id: id, tag: negative(uint64(0)), target: nil} + s.tables.AscendRange(start, end, func(i btree.Item) bool { + tableInfo = targetToTableInfo(i.(versionedID).target) + ok = tableInfo != nil + return false + }) + if !ok { + // Try partition, it could be a partition table. + s.partitions.AscendRange(start, end, func(i btree.Item) bool { + tableInfo = targetToTableInfo(i.(versionedID).target) + ok = tableInfo != nil + return false + }) + } + return +} + +func (s *snapshot) schemaIDByName(schema string) (id int64, ok bool) { + tag := negative(s.currentTs) + start := newVersionedEntityName(-1, schema, tag) + end := newVersionedEntityName(-1, schema, negative(uint64(0))) + s.schemaNameToID.AscendRange(start, end, func(i btree.Item) bool { + id = i.(versionedEntityName).target + ok = id >= 0 // negative values are treated as invalid. + return false + }) + return +} + +func (s *snapshot) tableIDByName(schema string, table string) (id int64, ok bool) { + var prefix int64 + prefix, ok = s.schemaIDByName(schema) + if ok { + tag := negative(s.currentTs) + start := newVersionedEntityName(prefix, table, tag) + end := newVersionedEntityName(prefix, table, negative(uint64(0))) + s.tableNameToID.AscendRange(start, end, func(i btree.Item) bool { + id = i.(versionedEntityName).target + ok = id >= 0 // negative values are treated as invalid. + return false + }) + } + return +} + +func (s *snapshot) tableByName(schema, table string) (info *model.TableInfo, ok bool) { + id, ok := s.tableIDByName(schema, table) + if !ok { + return nil, ok + } + return s.physicalTableByID(id) +} + +func (s *snapshot) isIneligibleTableID(id int64) (ok bool) { + tag, ok := s.tableTagByID(id, false) + return ok && s.ineligibleTables.Get(newVersionedID(id, tag)) != nil +} + +func (s *snapshot) tableTagByID(id int64, nilAcceptable bool) (foundTag uint64, ok bool) { + tag := negative(s.currentTs) + start := newVersionedID(id, tag) + end := newVersionedID(id, negative(uint64(0))) + s.tables.AscendRange(start, end, func(i btree.Item) bool { + tableInfo := targetToTableInfo(i.(versionedID).target) + if nilAcceptable || tableInfo != nil { + foundTag = i.(versionedID).tag + ok = true + } + return false + }) + if !ok { + // Try partition, it could be a partition table. + s.partitions.AscendRange(start, end, func(i btree.Item) bool { + tableInfo := targetToTableInfo(i.(versionedID).target) + if nilAcceptable || tableInfo != nil { + foundTag = i.(versionedID).tag + ok = true + } + return false + }) + } + return +} + +// dropSchema removes a schema from the snapshot. +// Tables in the schema will also be dropped. +func (s *snapshot) dropSchema(id int64, currentTs uint64) error { + dbInfo, ok := s.schemaByID(id) + if !ok { + return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(id) + } + tag := negative(currentTs) + s.schemas.ReplaceOrInsert(newVersionedID(id, tag)) + s.schemaNameToID.ReplaceOrInsert(newVersionedEntityName(-1, dbInfo.Name.O, tag)) + for _, id := range s.tablesInSchema(dbInfo.Name.O) { + tbInfo, _ := s.physicalTableByID(id) + s.doDropTable(tbInfo, currentTs) + } + s.currentTs = currentTs + log.Debug("drop schema success", zap.String("name", dbInfo.Name.O), zap.Int64("id", dbInfo.ID)) + return nil +} + +// Create a new schema in the snapshot. `dbInfo` will be deep copied. +func (s *snapshot) createSchema(dbInfo *timodel.DBInfo, currentTs uint64) error { + x, ok := s.schemaByID(dbInfo.ID) + if ok { + return cerror.ErrSnapshotSchemaExists.GenWithStackByArgs(x.Name, x.ID) + } + if id, ok := s.schemaIDByName(dbInfo.Name.O); ok { + return cerror.ErrSnapshotSchemaExists.GenWithStackByArgs(dbInfo.Name.O, id) + } + s.doCreateSchema(dbInfo, currentTs) + s.currentTs = currentTs + log.Debug("create schema success", zap.String("name", dbInfo.Name.O), zap.Int64("id", dbInfo.ID)) + return nil +} + +// Replace a schema. dbInfo will be deep copied. +// Callers should ensure `dbInfo` information not conflict with other schemas. +func (s *snapshot) replaceSchema(dbInfo *timodel.DBInfo, currentTs uint64) error { + old, ok := s.schemaByID(dbInfo.ID) + if !ok { + return cerror.ErrSnapshotSchemaNotFound.GenWithStack("schema %s(%d) not found", dbInfo.Name, dbInfo.ID) + } + s.doCreateSchema(dbInfo, currentTs) + if old.Name.O != dbInfo.Name.O { + tag := negative(currentTs) + s.schemaNameToID.ReplaceOrInsert(newVersionedEntityName(-1, old.Name.O, tag)) + } + s.currentTs = currentTs + log.Debug("replace schema success", zap.String("name", dbInfo.Name.O), zap.Int64("id", dbInfo.ID)) + return nil +} + +func (s *snapshot) doCreateSchema(dbInfo *timodel.DBInfo, currentTs uint64) { + tag := negative(currentTs) + vid := newVersionedID(dbInfo.ID, tag) + vid.target = dbInfo.Clone() + s.schemas.ReplaceOrInsert(vid) + vname := newVersionedEntityName(-1, dbInfo.Name.O, tag) + vname.target = dbInfo.ID + s.schemaNameToID.ReplaceOrInsert(vname) +} + +// dropTable removes a table(NOT partition) from the snapshot. +func (s *snapshot) dropTable(id int64, currentTs uint64) error { + tbInfo, ok := s.physicalTableByID(id) + if !ok { + return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id) + } + s.doDropTable(tbInfo, currentTs) + s.currentTs = currentTs + log.Debug("drop table success", + zap.String("schema", tbInfo.TableName.Schema), + zap.String("table", tbInfo.TableName.Table), + zap.Int64("id", tbInfo.ID)) + return nil +} + +func (s *snapshot) doDropTable(tbInfo *model.TableInfo, currentTs uint64) { + tag := negative(currentTs) + s.tables.ReplaceOrInsert(newVersionedID(tbInfo.ID, tag)) + s.tableNameToID.ReplaceOrInsert(newVersionedEntityName(tbInfo.SchemaID, tbInfo.TableName.Table, tag)) + if pi := tbInfo.GetPartitionInfo(); pi != nil { + for _, partition := range pi.Definitions { + s.partitions.ReplaceOrInsert(newVersionedID(partition.ID, tag)) + } + } +} + +// truncateTable truncate the table with the given ID, and replace it with a new `tbInfo`. +// NOTE: after a table is truncated: +// * physicalTableByID(id) will return nil; +// * IsTruncateTableID(id) should return true. +func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs uint64) (err error) { + old, ok := s.physicalTableByID(id) + if !ok { + return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id) + } + s.doDropTable(old, currentTs) + s.doCreateTable(tbInfo, currentTs) + s.truncatedTables.ReplaceOrInsert(newVersionedID(id, negative(currentTs))) + s.currentTs = currentTs + log.Debug("truncate table success", + zap.String("schema", tbInfo.TableName.Schema), + zap.String("table", tbInfo.TableName.Table), + zap.Int64("id", tbInfo.ID)) + return +} + +// Create a new table in the snapshot. `tbInfo` will be deep copied. +func (s *snapshot) createTable(tbInfo *model.TableInfo, currentTs uint64) error { + if _, ok := s.schemaByID(tbInfo.SchemaID); !ok { + return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", tbInfo.SchemaID) + } + if _, ok := s.physicalTableByID(tbInfo.ID); ok { + return cerror.ErrSnapshotTableExists.GenWithStackByArgs(tbInfo.TableName.Schema, tbInfo.TableName.Table) + } + s.doCreateTable(tbInfo, currentTs) + s.currentTs = currentTs + log.Debug("create table success", zap.Int64("id", tbInfo.ID), + zap.String("name", fmt.Sprintf("%s.%s", tbInfo.TableName.Schema, tbInfo.TableName.Table))) + return nil +} + +// ReplaceTable replace the table by new tableInfo +func (s *snapshot) replaceTable(tbInfo *model.TableInfo, currentTs uint64) error { + if _, ok := s.schemaByID(tbInfo.SchemaID); !ok { + return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", tbInfo.SchemaID) + } + if _, ok := s.physicalTableByID(tbInfo.ID); !ok { + return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", tbInfo.Name, tbInfo.ID) + } + s.doCreateTable(tbInfo, currentTs) + s.currentTs = currentTs + log.Debug("replace table success", zap.String("name", tbInfo.Name.O), zap.Int64("id", tbInfo.ID)) + return nil +} + +func (s *snapshot) doCreateTable(tbInfo *model.TableInfo, currentTs uint64) { + tbInfo = tbInfo.Clone() + tag := negative(currentTs) + vid := newVersionedID(tbInfo.ID, tag) + vid.target = tbInfo + s.tables.ReplaceOrInsert(vid) + + vname := newVersionedEntityName(tbInfo.SchemaID, tbInfo.TableName.Table, tag) + vname.target = tbInfo.ID + s.tableNameToID.ReplaceOrInsert(vname) + + ineligible := !tbInfo.IsEligible(s.forceReplicate) + if ineligible { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !tbInfo.IsSequence() { + log.Warn("this table is ineligible to replicate", + zap.String("tableName", tbInfo.Name.O), zap.Int64("tableID", tbInfo.ID)) + } + s.ineligibleTables.ReplaceOrInsert(newVersionedID(tbInfo.ID, tag)) + } + if pi := tbInfo.GetPartitionInfo(); pi != nil { + for _, partition := range pi.Definitions { + vid := newVersionedID(partition.ID, tag) + vid.target = tbInfo + s.partitions.ReplaceOrInsert(vid) + if ineligible { + s.ineligibleTables.ReplaceOrInsert(newVersionedID(partition.ID, tag)) + } + } + } +} + +// updatePartition updates partition info for `tbInfo`. +func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) error { + oldTbInfo, ok := s.physicalTableByID(tbInfo.ID) + if !ok { + return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(tbInfo.ID) + } + oldPi := oldTbInfo.GetPartitionInfo() + if oldPi == nil { + return cerror.ErrSnapshotTableNotFound.GenWithStack("table %d is not a partition table", tbInfo.ID) + } + newPi := tbInfo.GetPartitionInfo() + if newPi == nil { + return cerror.ErrSnapshotTableNotFound.GenWithStack("table %d is not a partition table", tbInfo.ID) + } + + tag := negative(currentTs) + vid := newVersionedID(tbInfo.ID, tag) + vid.target = tbInfo.Clone() + s.tables.ReplaceOrInsert(vid) + ineligible := !tbInfo.IsEligible(s.forceReplicate) + if ineligible { + s.ineligibleTables.ReplaceOrInsert(newVersionedID(tbInfo.ID, tag)) + } + for _, partition := range oldPi.Definitions { + s.partitions.ReplaceOrInsert(newVersionedID(partition.ID, tag)) + } + for _, partition := range newPi.Definitions { + vid := newVersionedID(partition.ID, tag) + vid.target = tbInfo + s.partitions.ReplaceOrInsert(vid) + if ineligible { + s.ineligibleTables.ReplaceOrInsert(newVersionedID(partition.ID, tag)) + } + } + s.currentTs = currentTs + // TODO: is it necessary to print changes detailly? + log.Debug("adjust partition success", + zap.String("schema", tbInfo.TableName.Schema), + zap.String("table", tbInfo.TableName.Table)) + return nil +} + +func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error { + var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 + var newTableNames, oldSchemaNames []*timodel.CIStr + err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &newTableNames, &oldTableIDs, &oldSchemaNames) + if err != nil { + return errors.Trace(err) + } + if len(job.BinlogInfo.MultipleTableInfos) < len(newTableNames) { + return cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID) + } + // NOTE: should handle failures in halfway better. + for _, tableID := range oldTableIDs { + if err := s.dropTable(tableID, currentTs); err != nil { + return errors.Trace(err) + } + } + for i, tableInfo := range job.BinlogInfo.MultipleTableInfos { + newSchema, ok := s.schemaByID(newSchemaIDs[i]) + if !ok { + return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i]) + } + newSchemaName := newSchema.Name.L + tbInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo) + err = s.createTable(tbInfo, currentTs) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func (s *snapshot) iterTables(includeIneligible bool, f func(i *model.TableInfo)) { + tag := negative(s.currentTs) + var tableID int64 = -1 + s.tables.Ascend(func(i btree.Item) bool { + x := i.(versionedID) + if x.id != tableID && x.tag >= tag { + tableID = x.id + if x.target != nil && (includeIneligible || s.ineligibleTables.Get(newVersionedID(x.id, x.tag)) == nil) { + f(targetToTableInfo(x.target)) + } + } + return true + }) + return +} + +func (s *snapshot) iterPartitions(includeIneligible bool, f func(id int64, i *model.TableInfo)) { + tag := negative(s.currentTs) + var partitionID int64 = -1 + s.partitions.Ascend(func(i btree.Item) bool { + x := i.(versionedID) + if x.id != partitionID && x.tag >= tag { + partitionID = x.id + if x.target != nil && (includeIneligible || s.ineligibleTables.Get(newVersionedID(x.id, x.tag)) == nil) { + f(partitionID, targetToTableInfo(x.target)) + } + } + return true + }) + return +} + +func (s *snapshot) iterSchemas(f func(i *timodel.DBInfo)) { + tag := negative(s.currentTs) + var schemaID int64 = -1 + s.schemas.Ascend(func(i btree.Item) bool { + x := i.(versionedID) + if x.id != schemaID && x.tag >= tag { + schemaID = x.id + if x.target != nil { + f(targetToDBInfo(x.target)) + } + } + return true + }) +} + +func (s *snapshot) iterTableNames(f func(schema int64, table string, target int64)) { + tag := negative(s.currentTs) + var prefix int64 = -1 + entity := "" + s.tableNameToID.Ascend(func(i btree.Item) bool { + x := i.(versionedEntityName) + if (x.prefix != prefix || x.entity != entity) && x.tag >= tag { + prefix = x.prefix + entity = x.entity + if x.target > 0 { + f(prefix, entity, x.target) + } + } + return true + }) +} + +func (s *snapshot) iterSchemaNames(f func(schema string, target int64)) { + tag := negative(s.currentTs) + entity := "" + s.schemaNameToID.Ascend(func(i btree.Item) bool { + x := i.(versionedEntityName) + if x.entity != entity && x.tag >= tag { + entity = x.entity + if x.target > 0 { + f(entity, x.target) + } + } + return true + }) +} + +func (s *snapshot) tablesInSchema(schema string) (tables []int64) { + schemaID, ok := s.schemaIDByName(schema) + if !ok { + return + } + start := newVersionedEntityName(schemaID, "", 0) + end := newVersionedEntityName(schemaID+1, "", 0) + tag := negative(s.currentTs) + currTable := "" + s.tableNameToID.AscendRange(start, end, func(i btree.Item) bool { + x := i.(versionedEntityName) + if x.tag >= tag && x.entity != currTable { + currTable = x.entity + if x.target > 0 { + tables = append(tables, x.target) + } + } + return true + }) + return +} + +func (s *snapshot) drop() { + tag := negative(s.currentTs) + + schemas := make([]versionedID, 0, s.schemas.Len()) + var schemaID int64 = -1 + var schemaDroped bool = false + s.schemas.Ascend(func(i btree.Item) bool { + x := i.(versionedID) + if x.tag >= tag { + if x.id != schemaID { + schemaID = x.id + schemaDroped = false + } + if schemaDroped || x.target == nil { + schemas = append(schemas, newVersionedID(x.id, x.tag)) + } + schemaDroped = true + } + return true + }) + for _, vid := range schemas { + s.schemas.Delete(vid) + } + + tables := make([]versionedID, 0, s.tables.Len()) + var tableID int64 = -1 + var tableDroped bool = false + s.tables.Ascend(func(i btree.Item) bool { + x := i.(versionedID) + if x.tag >= tag { + if x.id != tableID { + tableID = x.id + tableDroped = false + } + if tableDroped || x.target == nil { + tables = append(tables, newVersionedID(x.id, x.tag)) + } + tableDroped = true + } + return true + }) + for _, vid := range tables { + x := s.tables.Delete(vid).(versionedID) + info := targetToTableInfo(x.target) + if info != nil { + ineligible := !info.IsEligible(s.forceReplicate) + if ineligible { + s.ineligibleTables.Delete(vid) + } + } else { + // Maybe the table is truncated. + s.truncatedTables.Delete(vid) + } + } + + partitions := make([]versionedID, 0, s.partitions.Len()) + var partitionID int64 = -1 + var partitionDroped bool = false + s.partitions.Ascend(func(i btree.Item) bool { + x := i.(versionedID) + if x.tag >= tag { + if x.id != partitionID { + partitionID = x.id + partitionDroped = false + } + if partitionDroped || x.target == nil { + partitions = append(partitions, newVersionedID(x.id, x.tag)) + } + partitionDroped = true + } + return true + }) + for _, vid := range partitions { + x := s.partitions.Delete(vid).(versionedID) + info := targetToTableInfo(x.target) + if info != nil { + ineligible := !info.IsEligible(s.forceReplicate) + if ineligible { + s.ineligibleTables.Delete(vid) + } + } + } + + schemaNames := make([]versionedEntityName, 0, s.schemaNameToID.Len()) + var schemaName string = "" + var schemaNameDroped bool = false + s.schemaNameToID.Ascend(func(i btree.Item) bool { + x := i.(versionedEntityName) + if x.tag >= tag { + if x.entity != schemaName { + schemaName = x.entity + schemaNameDroped = false + } + if schemaNameDroped || x.target < 0 { + schemaNames = append(schemaNames, newVersionedEntityName(x.prefix, x.entity, x.tag)) + } + schemaNameDroped = true + } + return true + }) + for _, vname := range schemaNames { + s.schemaNameToID.Delete(vname) + } + + tableNames := make([]versionedEntityName, 0, s.tableNameToID.Len()) + schemaID = -1 + var tableName string = "" + var tableNameDroped bool = false + s.tableNameToID.Ascend(func(i btree.Item) bool { + x := i.(versionedEntityName) + if x.tag >= tag { + if x.prefix != schemaID || x.entity != tableName { + schemaID = x.prefix + tableName = x.entity + tableNameDroped = false + } + if tableNameDroped || x.target < 0 { + tableNames = append(tableNames, newVersionedEntityName(x.prefix, x.entity, x.tag)) + } + tableNameDroped = true + } + return true + }) + for _, vname := range tableNames { + s.tableNameToID.Delete(vname) + } +} + +// Entity(schema or table) name with finish timestamp of the associated DDL job. +type versionedEntityName struct { + prefix int64 // schema ID if the entity is a table, or -1 if it's a schema. + entity string + tag uint64 // A transform of timestamp to reverse sort versions. + // the associated entity id, negative values are treated as invalid. + target int64 +} + +// ID with finish timestamp of the associated DDL job. +type versionedID struct { + id int64 + tag uint64 // A transform of timestamp to reverse sort versions. + // the associated entity pointer. + target interface{} +} + +func (v1 versionedEntityName) Less(than btree.Item) bool { + v2 := than.(versionedEntityName) + return v1.prefix < v2.prefix || (v1.prefix == v2.prefix && v1.entity < v2.entity) || (v1.prefix == v2.prefix && v1.entity == v2.entity && v1.tag < v2.tag) +} + +func (v1 versionedID) Less(than btree.Item) bool { + v2 := than.(versionedID) + return v1.id < v2.id || (v1.id == v2.id && v1.tag < v2.tag) +} + +// negative transforms `x` for reverse sorting based on it. +func negative(x uint64) uint64 { + return math.MaxUint64 - x +} + +// newVersionedEntityName creates an instance with target -1, which means it's deleted from +// the associated snapshot. +func newVersionedEntityName(prefix int64, entity string, tag uint64) versionedEntityName { + var target int64 = -1 + return versionedEntityName{prefix, entity, tag, target} +} + +// newVersionedID creates an instance with target nil, which means it's deleted from the +// associated snapshot. +func newVersionedID(id int64, tag uint64) versionedID { + var target interface{} = nil + return versionedID{id, tag, target} +} + +func targetToTableInfo(target interface{}) *model.TableInfo { + if target == nil { + return nil + } + return target.(*model.TableInfo) +} + +func targetToDBInfo(target interface{}) *timodel.DBInfo { + if target == nil { + return nil + } + return target.(*timodel.DBInfo) +} diff --git a/cdc/entry/schema/snapshot_test.go b/cdc/entry/schema/snapshot_test.go new file mode 100644 index 00000000000..107800f4041 --- /dev/null +++ b/cdc/entry/schema/snapshot_test.go @@ -0,0 +1,312 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "testing" + + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestTablesInSchema(t *testing.T) { + snap := NewEmptySnapshot(true) + require.Nil(t, snap.inner.createSchema(newDBInfo(1), 100)) + var vname versionedEntityName + + vname = newVersionedEntityName(1, "tb1", negative(80)) + vname.target = 1 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + + vname = newVersionedEntityName(1, "tb1", negative(90)) + vname.target = 2 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + + vname = newVersionedEntityName(1, "tb1", negative(110)) + vname.target = 3 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + + vname = newVersionedEntityName(1, "tb2", negative(100)) + vname.target = 4 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + + vname = newVersionedEntityName(1, "tb3", negative(120)) + vname.target = 5 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + + vname = newVersionedEntityName(2, "tb1", negative(80)) + vname.target = 6 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + + require.Equal(t, []int64{2, 4}, snap.inner.tablesInSchema("DB_1")) + + vname = newVersionedEntityName(1, "tb1", negative(130)) + vname.target = -1 + snap.inner.tableNameToID.ReplaceOrInsert(vname) + snap.inner.currentTs = 130 + require.Equal(t, []int64{4, 5}, snap.inner.tablesInSchema("DB_1")) +} + +func TestIterSchemas(t *testing.T) { + snap := NewEmptySnapshot(true) + require.Nil(t, snap.inner.createSchema(newDBInfo(1), 90)) + require.Nil(t, snap.inner.replaceSchema(newDBInfo(1), 100)) + require.Nil(t, snap.inner.createSchema(newDBInfo(2), 110)) + require.Nil(t, snap.inner.createSchema(newDBInfo(3), 90)) + snap.inner.currentTs = 100 + + var schemas []int64 = make([]int64, 0, 3) + snap.IterSchemas(func(i *timodel.DBInfo) { + schemas = append(schemas, i.ID) + }) + require.Equal(t, []int64{1, 3}, schemas) +} + +func TestSchema(t *testing.T) { + snap := NewEmptySnapshot(true) + + // createSchema fails if the schema ID or name already exist. + dbName := timodel.CIStr{O: "DB_1", L: "db_1"} + require.Nil(t, snap.inner.createSchema(&timodel.DBInfo{ID: 1, Name: dbName}, 100)) + require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 100)) + require.Error(t, snap.inner.createSchema(&timodel.DBInfo{ID: 1}, 110)) + require.Error(t, snap.inner.createSchema(&timodel.DBInfo{ID: 2, Name: dbName}, 120)) + snap1 := snap.Copy() + + // replaceSchema only success if the schema ID exists. + dbName = timodel.CIStr{O: "DB_2", L: "db_2"} + require.Error(t, snap.inner.replaceSchema(&timodel.DBInfo{ID: 2}, 130)) + require.Nil(t, snap.inner.replaceSchema(&timodel.DBInfo{ID: 1, Name: dbName}, 140)) + snap2 := snap.Copy() + + // dropSchema only success if the schema ID exists. + require.Error(t, snap.inner.dropSchema(2, 150)) + require.Nil(t, snap.inner.dropSchema(1, 170)) + snap3 := snap.Copy() + + var db *timodel.DBInfo + var ok bool + + // The schema and table should be available based on snap1. + db, ok = snap1.SchemaByID(1) + require.True(t, ok) + require.Equal(t, db.Name.O, "DB_1") + _, ok = snap1.TableIDByName("DB_1", "TB_11") + require.True(t, ok) + _, ok = snap1.PhysicalTableByID(11) + require.True(t, ok) + + // The schema and table should be available based on snap2, but with a different schema name. + db, ok = snap2.SchemaByID(1) + require.True(t, ok) + require.Equal(t, db.Name.O, "DB_2") + _, ok = snap2.TableIDByName("DB_2", "TB_11") + require.True(t, ok) + _, ok = snap2.PhysicalTableByID(11) + require.True(t, ok) + _, ok = snap2.TableIDByName("DB_1", "TB_11") + require.False(t, ok) + + // The schema and table should be unavailable based on snap3. + _, ok = snap3.SchemaByID(1) + require.False(t, ok) + _, ok = snap3.PhysicalTableByID(11) + require.False(t, ok) + _, ok = snap3.TableIDByName("DB_2", "TB_11") + require.False(t, ok) +} + +func TestTable(t *testing.T) { + var ok bool + for _, forceReplicate := range []bool{true, false} { + snap := NewEmptySnapshot(forceReplicate) + + // createTable should check whether the schema or table exist or not. + require.Error(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 100)) + snap.inner.createSchema(newDBInfo(1), 110) + require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 120)) + require.Error(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 130)) + _, ok = snap.PhysicalTableByID(11) + require.True(t, ok) + _, ok = snap.PhysicalTableByID(11 + 65536) + require.True(t, ok) + _, ok = snap.TableByName("DB_1", "TB_11") + require.True(t, ok) + if !forceReplicate { + require.True(t, snap.IsIneligibleTableID(11)) + require.True(t, snap.IsIneligibleTableID(11+65536)) + } + + // replaceTable should check whether the schema or table exist or not. + require.Error(t, snap.inner.replaceTable(newTbInfo(2, "DB_2", 11), 140)) + require.Error(t, snap.inner.replaceTable(newTbInfo(1, "DB_1", 12), 150)) + require.Nil(t, snap.inner.replaceTable(newTbInfo(1, "DB_1", 11), 160)) + _, ok = snap.PhysicalTableByID(11) + require.True(t, ok) + _, ok = snap.PhysicalTableByID(11 + 65536) + require.True(t, ok) + _, ok = snap.TableByName("DB_1", "TB_11") + require.True(t, ok) + if !forceReplicate { + require.True(t, snap.IsIneligibleTableID(11)) + require.True(t, snap.IsIneligibleTableID(11+65536)) + } + + // truncateTable should replace the old one. + require.Error(t, snap.inner.truncateTable(12, newTbInfo(1, "DB_1", 13), 170)) + require.Nil(t, snap.inner.truncateTable(11, newTbInfo(1, "DB_1", 12), 180)) + _, ok = snap.PhysicalTableByID(11) + require.False(t, ok) + _, ok = snap.PhysicalTableByID(11 + 65536) + require.False(t, ok) + require.True(t, snap.IsTruncateTableID(11)) + _, ok = snap.PhysicalTableByID(12) + require.True(t, ok) + _, ok = snap.PhysicalTableByID(12 + 65536) + require.True(t, ok) + _, ok = snap.TableByName("DB_1", "TB_12") + require.True(t, ok) + if !forceReplicate { + require.False(t, snap.IsIneligibleTableID(11)) + require.False(t, snap.IsIneligibleTableID(11+65536)) + require.True(t, snap.IsIneligibleTableID(12)) + require.True(t, snap.IsIneligibleTableID(12+65536)) + } + + // dropTable should check the table exists or not. + require.Error(t, snap.inner.dropTable(11, 190)) + require.Nil(t, snap.inner.dropTable(12, 200)) + _, ok = snap.PhysicalTableByID(12) + require.False(t, ok) + _, ok = snap.PhysicalTableByID(12 + 65536) + require.False(t, ok) + _, ok = snap.TableByName("DB_1", "TB_12") + require.False(t, ok) + if !forceReplicate { + require.False(t, snap.IsIneligibleTableID(12)) + require.False(t, snap.IsIneligibleTableID(12+65536)) + } + + // IterTables should get no available tables. + require.Equal(t, snap.TableCount(true), 0) + } +} + +func TestUpdatePartition(t *testing.T) { + var oldTb, newTb *model.TableInfo + var snap1, snap2 *Snapshot + var info *model.TableInfo + var ok bool + + snap := NewEmptySnapshot(false) + require.Nil(t, snap.inner.createSchema(newDBInfo(1), 100)) + + // updatePartition fails if the old table is not partitioned. + oldTb = newTbInfo(1, "DB_1", 11) + oldTb.Partition = nil + require.Nil(t, snap.inner.createTable(oldTb, 110)) + require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), 120)) + + // updatePartition fails if the new table is not partitioned. + require.Nil(t, snap.inner.dropTable(11, 130)) + require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 140)) + newTb = newTbInfo(1, "DB_1", 11) + newTb.Partition = nil + require.Error(t, snap.inner.updatePartition(newTb, 150)) + snap1 = snap.Copy() + + newTb = newTbInfo(1, "DB_1", 11) + newTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: 11 + 65536*2} + require.Nil(t, snap.inner.updatePartition(newTb, 160)) + snap2 = snap.Copy() + + info, _ = snap1.PhysicalTableByID(11) + require.Equal(t, info.Partition.Definitions[0].ID, int64(11+65536)) + _, ok = snap1.PhysicalTableByID(11 + 65536) + require.True(t, ok) + require.True(t, snap1.IsIneligibleTableID(11+65536)) + _, ok = snap1.PhysicalTableByID(11 + 65536*2) + require.False(t, ok) + require.False(t, snap1.IsIneligibleTableID(11+65536*2)) + + info, _ = snap2.PhysicalTableByID(11) + require.Equal(t, info.Partition.Definitions[0].ID, int64(11+65536*2)) + _, ok = snap2.PhysicalTableByID(11 + 65536) + require.False(t, ok) + require.False(t, snap2.IsIneligibleTableID(11+65536)) + _, ok = snap2.PhysicalTableByID(11 + 65536*2) + require.True(t, ok) + require.True(t, snap2.IsIneligibleTableID(11+65536*2)) +} + +func TestDrop(t *testing.T) { + snap := NewEmptySnapshot(false) + + require.Nil(t, snap.inner.createSchema(newDBInfo(1), 11)) + require.Nil(t, snap.inner.createSchema(newDBInfo(2), 12)) + require.Nil(t, snap.inner.replaceSchema(newDBInfo(2), 13)) + require.Nil(t, snap.inner.dropSchema(2, 14)) + + require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 3), 15)) + require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 4), 16)) + require.Nil(t, snap.inner.replaceTable(newTbInfo(1, "DB_1", 4), 17)) + require.Nil(t, snap.inner.truncateTable(4, newTbInfo(1, "DB_1", 5), 18)) + require.Nil(t, snap.inner.dropTable(5, 19)) + snap.Drop() + + // After the latest snapshot is dropped, check schema and table count. + require.Equal(t, 1, snap.inner.schemas.Len()) + require.Equal(t, 1, snap.inner.tables.Len()) + require.Equal(t, 1, snap.inner.schemaNameToID.Len()) + require.Equal(t, 1, snap.inner.tableNameToID.Len()) + require.Equal(t, 1, snap.inner.partitions.Len()) + require.Equal(t, 0, snap.inner.truncatedTables.Len()) + require.Equal(t, 2, snap.inner.ineligibleTables.Len()) +} + +func newDBInfo(id int64) *timodel.DBInfo { + return &timodel.DBInfo{ + ID: id, + Name: timodel.CIStr{ + O: fmt.Sprintf("DB_%d", id), + L: fmt.Sprintf("db_%d", id), + }, + } +} + +// newTbInfo constructs a test TableInfo with a partition and a sequence. +// The partition ID will be tableID + 65536. +func newTbInfo(schemaID int64, schemaName string, tableID int64) *model.TableInfo { + return &model.TableInfo{ + TableInfo: &timodel.TableInfo{ + ID: tableID, + Name: timodel.CIStr{ + O: fmt.Sprintf("TB_%d", tableID), + L: fmt.Sprintf("TB_%d", tableID), + }, + Partition: &timodel.PartitionInfo{ + Enable: true, + Definitions: []timodel.PartitionDefinition{{ID: 65536 + tableID}}, + }, + Sequence: &timodel.SequenceInfo{Start: 0}, + }, + SchemaID: schemaID, + TableName: model.TableName{ + Schema: schemaName, + Table: fmt.Sprintf("TB_%d", tableID), + }, + } +} diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 7e9d481ac11..651e80a6617 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" timeta "github.com/pingcap/tidb/meta" timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" @@ -32,685 +33,13 @@ import ( "go.uber.org/zap/zapcore" ) -// schemaSnapshot stores the source TiDB all schema information -// schemaSnapshot is a READ ONLY struct -type schemaSnapshot struct { - tableNameToID map[model.TableName]int64 - schemaNameToID map[string]int64 - - schemas map[int64]*timodel.DBInfo - tables map[int64]*model.TableInfo - partitionTable map[int64]*model.TableInfo - - // key is schemaID and value is tableIDs - tableInSchema map[int64][]int64 - - truncateTableID map[int64]struct{} - ineligibleTableID map[int64]struct{} - - currentTs uint64 - - // if forceReplicate is true, treat ineligible tables as eligible. - forceReplicate bool -} - -// SingleSchemaSnapshot is a single schema snapshot independent of schema storage -type SingleSchemaSnapshot = schemaSnapshot - -// HandleDDL handles the ddl job -func (s *SingleSchemaSnapshot) HandleDDL(job *timodel.Job) error { - return s.handleDDL(job) -} - -// PreTableInfo returns the table info which will be overwritten by the specified job -func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) { - switch job.Type { - case timodel.ActionCreateSchema, timodel.ActionModifySchemaCharsetAndCollate, timodel.ActionDropSchema: - return nil, nil - case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable: - // no pre table info - return nil, nil - case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable: - // get the table will be dropped - table, ok := s.TableByID(job.TableID) - if !ok { - return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(job.TableID) - } - return table, nil - case timodel.ActionRenameTables: - // DDL on multiple tables, ignore pre table info - return nil, nil - default: - binlogInfo := job.BinlogInfo - if binlogInfo == nil { - log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) - return nil, nil - } - tbInfo := binlogInfo.TableInfo - if tbInfo == nil { - log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) - return nil, nil - } - tableID := tbInfo.ID - table, ok := s.TableByID(tableID) - if !ok { - return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(job.TableID) - } - return table, nil - } -} - -// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta -func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) { - // meta is nil only in unit tests - if meta == nil { - snap := newEmptySchemaSnapshot(forceReplicate) - snap.currentTs = currentTs - return snap, nil - } - return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate) -} - -func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot { - return &schemaSnapshot{ - tableNameToID: make(map[model.TableName]int64), - schemaNameToID: make(map[string]int64), - - schemas: make(map[int64]*timodel.DBInfo), - tables: make(map[int64]*model.TableInfo), - partitionTable: make(map[int64]*model.TableInfo), - - tableInSchema: make(map[int64][]int64), - truncateTableID: make(map[int64]struct{}), - ineligibleTableID: make(map[int64]struct{}), - - forceReplicate: forceReplicate, - } -} - -func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) { - snap := newEmptySchemaSnapshot(forceReplicate) - dbinfos, err := meta.ListDatabases() - if err != nil { - return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) - } - for _, dbinfo := range dbinfos { - snap.schemas[dbinfo.ID] = dbinfo - snap.schemaNameToID[dbinfo.Name.O] = dbinfo.ID - } - for schemaID, dbinfo := range snap.schemas { - tableInfos, err := meta.ListTables(schemaID) - if err != nil { - return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) - } - snap.tableInSchema[schemaID] = make([]int64, 0, len(tableInfos)) - for _, tableInfo := range tableInfos { - snap.tableInSchema[schemaID] = append(snap.tableInSchema[schemaID], tableInfo.ID) - tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) - snap.tables[tableInfo.ID] = tableInfo - snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID - isEligible := tableInfo.IsEligible(forceReplicate) - if !isEligible { - snap.ineligibleTableID[tableInfo.ID] = struct{}{} - } - if pi := tableInfo.GetPartitionInfo(); pi != nil { - for _, partition := range pi.Definitions { - snap.partitionTable[partition.ID] = tableInfo - if !isEligible { - snap.ineligibleTableID[partition.ID] = struct{}{} - } - } - } - } - } - snap.currentTs = currentTs - return snap, nil -} - -func (s *schemaSnapshot) PrintStatus(logger func(msg string, fields ...zap.Field)) { - logger("[SchemaSnap] Start to print status", zap.Uint64("currentTs", s.currentTs)) - for id, dbInfo := range s.schemas { - logger("[SchemaSnap] --> Schemas", zap.Int64("schemaID", id), zap.Reflect("dbInfo", dbInfo)) - // check schemaNameToID - if schemaID, exist := s.schemaNameToID[dbInfo.Name.O]; !exist || schemaID != id { - logger("[SchemaSnap] ----> schemaNameToID item lost", zap.String("name", dbInfo.Name.O), zap.Int64("schemaNameToID", s.schemaNameToID[dbInfo.Name.O])) - } - } - if len(s.schemaNameToID) != len(s.schemas) { - logger("[SchemaSnap] schemaNameToID length mismatch schemas") - for schemaName, schemaID := range s.schemaNameToID { - logger("[SchemaSnap] --> schemaNameToID", zap.String("schemaName", schemaName), zap.Int64("schemaID", schemaID)) - } - } - for id, tableInfo := range s.tables { - logger("[SchemaSnap] --> Tables", zap.Int64("tableID", id), zap.Stringer("tableInfo", tableInfo)) - // check tableNameToID - if tableID, exist := s.tableNameToID[tableInfo.TableName]; !exist || tableID != id { - logger("[SchemaSnap] ----> tableNameToID item lost", zap.Stringer("name", tableInfo.TableName), zap.Int64("tableNameToID", s.tableNameToID[tableInfo.TableName])) - } - } - if len(s.tableNameToID) != len(s.tables) { - logger("[SchemaSnap] tableNameToID length mismatch tables") - for tableName, tableID := range s.tableNameToID { - logger("[SchemaSnap] --> tableNameToID", zap.Stringer("tableName", tableName), zap.Int64("tableID", tableID)) - } - } - for pid, table := range s.partitionTable { - logger("[SchemaSnap] --> Partitions", zap.Int64("partitionID", pid), zap.Int64("tableID", table.ID)) - } - truncateTableID := make([]int64, 0, len(s.truncateTableID)) - for id := range s.truncateTableID { - truncateTableID = append(truncateTableID, id) - } - logger("[SchemaSnap] TruncateTableIDs", zap.Int64s("ids", truncateTableID)) - - ineligibleTableID := make([]int64, 0, len(s.ineligibleTableID)) - for id := range s.ineligibleTableID { - ineligibleTableID = append(ineligibleTableID, id) - } - logger("[SchemaSnap] IneligibleTableIDs", zap.Int64s("ids", ineligibleTableID)) -} - -// Clone clones Storage -func (s *schemaSnapshot) Clone() *schemaSnapshot { - clone := *s - - tableNameToID := make(map[model.TableName]int64, len(s.tableNameToID)) - for k, v := range s.tableNameToID { - tableNameToID[k] = v - } - clone.tableNameToID = tableNameToID - - schemaNameToID := make(map[string]int64, len(s.schemaNameToID)) - for k, v := range s.schemaNameToID { - schemaNameToID[k] = v - } - clone.schemaNameToID = schemaNameToID - - schemas := make(map[int64]*timodel.DBInfo, len(s.schemas)) - for k, v := range s.schemas { - // DBInfo is readonly in TiCDC, shallow copy to reduce memory - schemas[k] = v.Copy() - } - clone.schemas = schemas - - tables := make(map[int64]*model.TableInfo, len(s.tables)) - for k, v := range s.tables { - tables[k] = v - } - clone.tables = tables - - tableInSchema := make(map[int64][]int64, len(s.tableInSchema)) - for k, v := range s.tableInSchema { - cloneV := make([]int64, len(v)) - copy(cloneV, v) - tableInSchema[k] = cloneV - } - clone.tableInSchema = tableInSchema - - partitionTable := make(map[int64]*model.TableInfo, len(s.partitionTable)) - for k, v := range s.partitionTable { - partitionTable[k] = v - } - clone.partitionTable = partitionTable - - truncateTableID := make(map[int64]struct{}, len(s.truncateTableID)) - for k, v := range s.truncateTableID { - truncateTableID[k] = v - } - clone.truncateTableID = truncateTableID - - ineligibleTableID := make(map[int64]struct{}, len(s.ineligibleTableID)) - for k, v := range s.ineligibleTableID { - ineligibleTableID[k] = v - } - clone.ineligibleTableID = ineligibleTableID - - return &clone -} - -// GetTableNameByID looks up a TableName with the given table id -func (s *schemaSnapshot) GetTableNameByID(id int64) (model.TableName, bool) { - tableInfo, ok := s.tables[id] - if !ok { - // Try partition, it could be a partition table. - partInfo, ok := s.partitionTable[id] - if !ok { - return model.TableName{}, false - } - // Must exists an table that contains the partition. - tableInfo = s.tables[partInfo.ID] - } - return tableInfo.TableName, true -} - -// GetTableIDByName returns the tableID by table schemaName and tableName -func (s *schemaSnapshot) GetTableIDByName(schemaName string, tableName string) (int64, bool) { - id, ok := s.tableNameToID[model.TableName{ - Schema: schemaName, - Table: tableName, - }] - return id, ok -} - -// GetTableByName queries a table by name, -// the second returned value is false if no table with the specified name is found. -func (s *schemaSnapshot) GetTableByName(schema, table string) (info *model.TableInfo, ok bool) { - id, ok := s.GetTableIDByName(schema, table) - if !ok { - return nil, ok - } - return s.TableByID(id) -} - -// SchemaByID returns the DBInfo by schema id -func (s *schemaSnapshot) SchemaByID(id int64) (val *timodel.DBInfo, ok bool) { - val, ok = s.schemas[id] - return -} - -// SchemaByTableID returns the schema ID by table ID -func (s *schemaSnapshot) SchemaByTableID(tableID int64) (*timodel.DBInfo, bool) { - tableInfo, ok := s.tables[tableID] - if !ok { - return nil, false - } - schemaID, ok := s.schemaNameToID[tableInfo.TableName.Schema] - if !ok { - return nil, false - } - return s.SchemaByID(schemaID) -} - -// TableByID returns the TableInfo by table id -func (s *schemaSnapshot) TableByID(id int64) (val *model.TableInfo, ok bool) { - val, ok = s.tables[id] - return -} - -// PhysicalTableByID returns the TableInfo by table id or partition ID. -func (s *schemaSnapshot) PhysicalTableByID(id int64) (val *model.TableInfo, ok bool) { - val, ok = s.tables[id] - if !ok { - val, ok = s.partitionTable[id] - } - return -} - -// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL -func (s *schemaSnapshot) IsTruncateTableID(id int64) bool { - _, ok := s.truncateTableID[id] - return ok -} - -// IsIneligibleTableID returns true if the table is ineligible -func (s *schemaSnapshot) IsIneligibleTableID(id int64) bool { - _, ok := s.ineligibleTableID[id] - return ok -} - -// FillSchemaName fills the schema name in ddl job -func (s *schemaSnapshot) FillSchemaName(job *timodel.Job) error { - if job.Type == timodel.ActionRenameTables { - // DDLs on multiple schema or tables, ignore them. - return nil - } - if job.Type == timodel.ActionCreateSchema || - job.Type == timodel.ActionDropSchema { - job.SchemaName = job.BinlogInfo.DBInfo.Name.O - return nil - } - dbInfo, exist := s.SchemaByID(job.SchemaID) - if !exist { - return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(job.SchemaID) - } - job.SchemaName = dbInfo.Name.O - return nil -} - -func (s *schemaSnapshot) dropSchema(id int64) error { - schema, ok := s.schemas[id] - if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(id) - } - - for _, tableID := range s.tableInSchema[id] { - tableName := s.tables[tableID].TableName - if pi := s.tables[tableID].GetPartitionInfo(); pi != nil { - for _, partition := range pi.Definitions { - delete(s.partitionTable, partition.ID) - } - } - delete(s.tables, tableID) - delete(s.tableNameToID, tableName) - } - - delete(s.schemas, id) - delete(s.tableInSchema, id) - delete(s.schemaNameToID, schema.Name.O) - - return nil -} - -func (s *schemaSnapshot) createSchema(db *timodel.DBInfo) error { - if _, ok := s.schemas[db.ID]; ok { - return cerror.ErrSnapshotSchemaExists.GenWithStackByArgs(db.Name, db.ID) - } - - s.schemas[db.ID] = db.Copy() - s.schemaNameToID[db.Name.O] = db.ID - s.tableInSchema[db.ID] = []int64{} - - log.Debug("create schema success, schema id", zap.String("name", db.Name.O), zap.Int64("id", db.ID)) - return nil -} - -func (s *schemaSnapshot) replaceSchema(db *timodel.DBInfo) error { - _, ok := s.schemas[db.ID] - if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStack("schema %s(%d) not found", db.Name, db.ID) - } - s.schemas[db.ID] = db.Copy() - s.schemaNameToID[db.Name.O] = db.ID - return nil -} - -func (s *schemaSnapshot) dropTable(id int64) error { - table, ok := s.tables[id] - if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id) - } - tableInSchema, ok := s.tableInSchema[table.SchemaID] - if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table(%d)'s schema", id) - } - - for i, tableID := range tableInSchema { - if tableID == id { - copy(tableInSchema[i:], tableInSchema[i+1:]) - s.tableInSchema[table.SchemaID] = tableInSchema[:len(tableInSchema)-1] - break - } - } - - tableName := s.tables[id].TableName - delete(s.tables, id) - if pi := table.GetPartitionInfo(); pi != nil { - for _, partition := range pi.Definitions { - delete(s.partitionTable, partition.ID) - delete(s.ineligibleTableID, partition.ID) - } - } - delete(s.tableNameToID, tableName) - delete(s.ineligibleTableID, id) - - log.Debug("drop table success", zap.String("name", table.Name.O), zap.Int64("id", id)) - return nil -} - -func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error { - id := tbl.ID - table, ok := s.tables[id] - if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id) - } - oldPi := table.GetPartitionInfo() - if oldPi == nil { - return cerror.ErrSnapshotTableNotFound.GenWithStack("table %d is not a partition table", id) - } - oldIDs := make(map[int64]struct{}, len(oldPi.Definitions)) - for _, p := range oldPi.Definitions { - oldIDs[p.ID] = struct{}{} - } - - newPi := tbl.GetPartitionInfo() - if newPi == nil { - return cerror.ErrSnapshotTableNotFound.GenWithStack("table %d is not a partition table", id) - } - s.tables[id] = tbl - for _, partition := range newPi.Definitions { - // update table info. - if _, ok := s.partitionTable[partition.ID]; ok { - log.Debug("add table partition success", - zap.String("name", tbl.Name.O), zap.Int64("tid", id), - zap.Int64("add partition id", partition.ID)) - } - s.partitionTable[partition.ID] = tbl - if !tbl.IsEligible(s.forceReplicate) { - s.ineligibleTableID[partition.ID] = struct{}{} - } - delete(oldIDs, partition.ID) - } - - // drop old partition. - for pid := range oldIDs { - s.truncateTableID[pid] = struct{}{} - delete(s.partitionTable, pid) - delete(s.ineligibleTableID, pid) - log.Debug("drop table partition success", - zap.String("name", tbl.Name.O), zap.Int64("tid", id), - zap.Int64("truncated partition id", pid)) - } - - return nil -} - -func (s *schemaSnapshot) createTable(table *model.TableInfo) error { - schema, ok := s.schemas[table.SchemaID] - if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", table.SchemaID) - } - tableInSchema, ok := s.tableInSchema[table.SchemaID] - if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStack("table's schema(%d)", table.SchemaID) - } - _, ok = s.tables[table.ID] - if ok { - return cerror.ErrSnapshotTableExists.GenWithStackByArgs(schema.Name, table.Name) - } - tableInSchema = append(tableInSchema, table.ID) - s.tableInSchema[table.SchemaID] = tableInSchema - - s.tables[table.ID] = table - if !table.IsEligible(s.forceReplicate) { - // Sequence is not supported yet, and always ineligible. - // Skip Warn to avoid confusion. - // See https://github.com/pingcap/tiflow/issues/4559 - if !table.IsSequence() { - log.Warn("this table is ineligible to replicate", - zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) - } - s.ineligibleTableID[table.ID] = struct{}{} - } - if pi := table.GetPartitionInfo(); pi != nil { - for _, partition := range pi.Definitions { - s.partitionTable[partition.ID] = table - if !table.IsEligible(s.forceReplicate) { - s.ineligibleTableID[partition.ID] = struct{}{} - } - } - } - s.tableNameToID[table.TableName] = table.ID - - log.Debug("create table success", zap.String("name", schema.Name.O+"."+table.Name.O), zap.Int64("id", table.ID)) - return nil -} - -// ReplaceTable replace the table by new tableInfo -func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error { - _, ok := s.tables[table.ID] - if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID) - } - s.tables[table.ID] = table - if !table.IsEligible(s.forceReplicate) { - // Sequence is not supported yet, and always ineligible. - // Skip Warn to avoid confusion. - // See https://github.com/pingcap/tiflow/issues/4559 - if !table.IsSequence() { - log.Warn("this table is ineligible to replicate", - zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) - } - s.ineligibleTableID[table.ID] = struct{}{} - } - if pi := table.GetPartitionInfo(); pi != nil { - for _, partition := range pi.Definitions { - s.partitionTable[partition.ID] = table - if !table.IsEligible(s.forceReplicate) { - s.ineligibleTableID[partition.ID] = struct{}{} - } - } - } - - return nil -} - -func (s *schemaSnapshot) handleDDL(job *timodel.Job) error { - if err := s.FillSchemaName(job); err != nil { - return errors.Trace(err) - } - getWrapTableInfo := func(job *timodel.Job) *model.TableInfo { - return model.WrapTableInfo(job.SchemaID, job.SchemaName, - job.BinlogInfo.FinishedTS, - job.BinlogInfo.TableInfo) - } - switch job.Type { - case timodel.ActionCreateSchema: - // get the DBInfo from job rawArgs - err := s.createSchema(job.BinlogInfo.DBInfo) - if err != nil { - return errors.Trace(err) - } - case timodel.ActionModifySchemaCharsetAndCollate: - err := s.replaceSchema(job.BinlogInfo.DBInfo) - if err != nil { - return errors.Trace(err) - } - case timodel.ActionDropSchema: - err := s.dropSchema(job.SchemaID) - if err != nil { - return errors.Trace(err) - } - case timodel.ActionRenameTable: - // first drop the table - err := s.dropTable(job.TableID) - if err != nil { - return errors.Trace(err) - } - // create table - err = s.createTable(getWrapTableInfo(job)) - if err != nil { - return errors.Trace(err) - } - case timodel.ActionRenameTables: - err := s.renameTables(job) - if err != nil { - return errors.Trace(err) - } - case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable: - err := s.createTable(getWrapTableInfo(job)) - if err != nil { - return errors.Trace(err) - } - case timodel.ActionDropTable, timodel.ActionDropView: - err := s.dropTable(job.TableID) - if err != nil { - return errors.Trace(err) - } - - case timodel.ActionTruncateTable: - // job.TableID is the old table id, different from table.ID - err := s.dropTable(job.TableID) - if err != nil { - return errors.Trace(err) - } - - err = s.createTable(getWrapTableInfo(job)) - if err != nil { - return errors.Trace(err) - } - - s.truncateTableID[job.TableID] = struct{}{} - case timodel.ActionTruncateTablePartition, timodel.ActionAddTablePartition, timodel.ActionDropTablePartition: - err := s.updatePartition(getWrapTableInfo(job)) - if err != nil { - return errors.Trace(err) - } - default: - binlogInfo := job.BinlogInfo - if binlogInfo == nil { - log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) - return nil - } - tbInfo := binlogInfo.TableInfo - if tbInfo == nil { - log.Warn("ignore a invalid DDL job", zap.Reflect("job", job)) - return nil - } - err := s.replaceTable(getWrapTableInfo(job)) - if err != nil { - return errors.Trace(err) - } - } - s.currentTs = job.BinlogInfo.FinishedTS - return nil -} - -func (s *schemaSnapshot) renameTables(job *timodel.Job) error { - var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 - var newTableNames, oldSchemaNames []*timodel.CIStr - err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &newTableNames, &oldTableIDs, &oldSchemaNames) - if err != nil { - return errors.Trace(err) - } - if len(job.BinlogInfo.MultipleTableInfos) < len(newTableNames) { - return cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID) - } - // NOTE: should handle failures in halfway better. - for _, tableID := range oldTableIDs { - if err := s.dropTable(tableID); err != nil { - return errors.Trace(err) - } - } - for i, tableInfo := range job.BinlogInfo.MultipleTableInfos { - newSchema, ok := s.SchemaByID(newSchemaIDs[i]) - if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i]) - } - newSchemaName := newSchema.Name.L - err = s.createTable(model.WrapTableInfo( - newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -// CloneTables return a clone of the existing tables. -func (s *schemaSnapshot) CloneTables() map[model.TableID]model.TableName { - mp := make(map[model.TableID]model.TableName, len(s.tables)) - - for id, table := range s.tables { - mp[id] = table.TableName - } - - return mp -} - -// Tables return a map between table id and table info -// the returned map must be READ-ONLY. Any modified of this map will lead to the internal state confusion in schema storage -func (s *schemaSnapshot) Tables() map[model.TableID]*model.TableInfo { - return s.tables -} - // SchemaStorage stores the schema information with multi-version type SchemaStorage interface { // GetSnapshot returns the snapshot which of ts is specified. // It may block caller when ts is larger than ResolvedTs. - GetSnapshot(ctx context.Context, ts uint64) (*SingleSchemaSnapshot, error) + GetSnapshot(ctx context.Context, ts uint64) (*schema.Snapshot, error) // GetLastSnapshot returns the last snapshot - GetLastSnapshot() *schemaSnapshot + GetLastSnapshot() *schema.Snapshot // HandleDDLJob creates a new snapshot in storage and handles the ddl job HandleDDLJob(job *timodel.Job) error // AdvanceResolvedTs advances the resolved @@ -723,7 +52,7 @@ type SchemaStorage interface { } type schemaStorageImpl struct { - snaps []*schemaSnapshot + snaps []*schema.Snapshot snapsMu sync.RWMutex gcTs uint64 resolvedTs uint64 @@ -739,18 +68,18 @@ func NewSchemaStorage( meta *timeta.Meta, startTs uint64, filter *filter.Filter, forceReplicate bool, id model.ChangeFeedID, ) (SchemaStorage, error) { - var snap *schemaSnapshot + var snap *schema.Snapshot var err error if meta == nil { - snap = newEmptySchemaSnapshot(forceReplicate) + snap = schema.NewEmptySnapshot(forceReplicate) } else { - snap, err = newSchemaSnapshotFromMeta(meta, startTs, forceReplicate) + snap, err = schema.NewSnapshotFromMeta(meta, startTs, forceReplicate) } if err != nil { return nil, errors.Trace(err) } schema := &schemaStorageImpl{ - snaps: []*schemaSnapshot{snap}, + snaps: []*schema.Snapshot{snap}, resolvedTs: startTs, filter: filter, forceReplicate: forceReplicate, @@ -759,7 +88,7 @@ func NewSchemaStorage( return schema, nil } -func (s *schemaStorageImpl) getSnapshot(ts uint64) (*schemaSnapshot, error) { +func (s *schemaStorageImpl) getSnapshot(ts uint64) (*schema.Snapshot, error) { gcTs := atomic.LoadUint64(&s.gcTs) if ts < gcTs { // Unexpected error, caller should fail immediately. @@ -773,7 +102,7 @@ func (s *schemaStorageImpl) getSnapshot(ts uint64) (*schemaSnapshot, error) { s.snapsMu.RLock() defer s.snapsMu.RUnlock() i := sort.Search(len(s.snaps), func(i int) bool { - return s.snaps[i].currentTs > ts + return s.snaps[i].CurrentTs() > ts }) if i <= 0 { // Unexpected error, caller should fail immediately. @@ -783,8 +112,8 @@ func (s *schemaStorageImpl) getSnapshot(ts uint64) (*schemaSnapshot, error) { } // GetSnapshot returns the snapshot which of ts is specified -func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schemaSnapshot, error) { - var snap *schemaSnapshot +func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema.Snapshot, error) { + var snap *schema.Snapshot // The infinite retry here is a temporary solution to the `ErrSchemaStorageUnresolved` caused by // DDL puller lagging too much. @@ -813,7 +142,7 @@ func isRetryable(err error) bool { } // GetLastSnapshot returns the last snapshot -func (s *schemaStorageImpl) GetLastSnapshot() *schemaSnapshot { +func (s *schemaStorageImpl) GetLastSnapshot() *schema.Snapshot { s.snapsMu.RLock() defer s.snapsMu.RUnlock() return s.snaps[len(s.snaps)-1] @@ -827,20 +156,20 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { } s.snapsMu.Lock() defer s.snapsMu.Unlock() - var snap *schemaSnapshot + var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] - if job.BinlogInfo.FinishedTS <= lastSnap.currentTs { + if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { log.Info("ignore foregone DDL", zap.Int64("jobID", job.ID), zap.String("DDL", job.Query), zap.String("changefeed", s.id), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS)) return nil } - snap = lastSnap.Clone() + snap = lastSnap.Copy() } else { - snap = newEmptySchemaSnapshot(s.forceReplicate) + snap = schema.NewEmptySnapshot(s.forceReplicate) } - if err := snap.handleDDL(job); err != nil { + if err := snap.HandleDDL(job); err != nil { log.Error("handle DDL failed", zap.String("DDL", job.Query), zap.Stringer("job", job), zap.Error(err), zap.String("changefeed", s.id), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS)) @@ -878,13 +207,13 @@ func (s *schemaStorageImpl) DoGC(ts uint64) (lastSchemaTs uint64) { defer s.snapsMu.Unlock() var startIdx int for i, snap := range s.snaps { - if snap.currentTs > ts { + if snap.CurrentTs() > ts { break } startIdx = i } if startIdx == 0 { - return s.snaps[0].currentTs + return s.snaps[0].CurrentTs() } if log.GetLevel() == zapcore.DebugLevel { log.Debug("Do GC in schema storage") @@ -893,13 +222,16 @@ func (s *schemaStorageImpl) DoGC(ts uint64) (lastSchemaTs uint64) { } } + // NOTE: Drop must be called to remove stale versions. + s.snaps[startIdx-1].Drop() + // copy the part of the slice that is needed instead of re-slicing it // to maximize efficiency of Go runtime GC. - newSnaps := make([]*schemaSnapshot, len(s.snaps)-startIdx) + newSnaps := make([]*schema.Snapshot, len(s.snaps)-startIdx) copy(newSnaps, s.snaps[startIdx:]) s.snaps = newSnaps - lastSchemaTs = s.snaps[0].currentTs + lastSchemaTs = s.snaps[0].CurrentTs() atomic.StoreUint64(&s.gcTs, lastSchemaTs) return } diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index 4d8765213c7..8e737ffed52 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -17,7 +17,6 @@ import ( "context" "encoding/json" "fmt" - "sort" "testing" "github.com/pingcap/errors" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" @@ -56,8 +56,8 @@ func TestSchema(t *testing.T) { Query: "create database test", } // reconstruct the local schema - snap := newEmptySchemaSnapshot(false) - err := snap.handleDDL(job) + snap := schema.NewEmptySnapshot(false) + err := snap.HandleDDL(job) require.Nil(t, err) _, exist := snap.SchemaByID(job.SchemaID) require.True(t, exist) @@ -71,7 +71,7 @@ func TestSchema(t *testing.T) { BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 3, DBInfo: dbInfo, FinishedTS: 124}, Query: "drop database test", } - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.Nil(t, err) _, exist = snap.SchemaByID(job.SchemaID) require.False(t, exist) @@ -85,9 +85,9 @@ func TestSchema(t *testing.T) { Query: "create database test", } - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.Nil(t, err) - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.True(t, errors.IsAlreadyExists(err)) // test schema drop schema error @@ -99,9 +99,9 @@ func TestSchema(t *testing.T) { BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, DBInfo: dbInfo, FinishedTS: 123}, Query: "drop database test", } - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.Nil(t, err) - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.True(t, errors.IsNotFound(err)) } @@ -198,9 +198,9 @@ func TestTable(t *testing.T) { jobs = append(jobs, job) // reconstruct the local schema - snap := newEmptySchemaSnapshot(false) + snap := schema.NewEmptySnapshot(false) for _, job := range jobs { - err := snap.handleDDL(job) + err := snap.HandleDDL(job) require.Nil(t, err) } @@ -208,7 +208,7 @@ func TestTable(t *testing.T) { _, ok := snap.SchemaByID(dbInfo.ID) require.True(t, ok) // check the historical table that constructed above whether in the table list of local schema - table, ok := snap.TableByID(tblInfo.ID) + table, ok := snap.PhysicalTableByID(tblInfo.ID) require.True(t, ok) require.Len(t, table.Columns, 1) require.Len(t, table.Indices, 1) @@ -236,13 +236,13 @@ func TestTable(t *testing.T) { require.Equal(t, preTableInfo.TableName, model.TableName{Schema: "Test", Table: "T"}) require.Equal(t, preTableInfo.ID, int64(2)) - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.Nil(t, err) - _, ok = snap.TableByID(tblInfo1.ID) + _, ok = snap.PhysicalTableByID(tblInfo1.ID) require.True(t, ok) - _, ok = snap.TableByID(2) + _, ok = snap.PhysicalTableByID(2) require.False(t, ok) // test ineligible tables @@ -263,22 +263,30 @@ func TestTable(t *testing.T) { require.Equal(t, preTableInfo.TableName, model.TableName{Schema: "Test", Table: "T"}) require.Equal(t, preTableInfo.ID, int64(9)) - err = snap.handleDDL(job) + err = snap.HandleDDL(job) require.Nil(t, err) - _, ok = snap.TableByID(tblInfo.ID) + _, ok = snap.PhysicalTableByID(tblInfo.ID) require.False(t, ok) // test ineligible tables require.False(t, snap.IsIneligibleTableID(9)) // drop schema - err = snap.dropSchema(3) + job = &timodel.Job{ + ID: 10, + State: timodel.JobStateSynced, + SchemaID: 3, + Type: timodel.ActionDropSchema, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 7, FinishedTS: 129}, + Query: "drop table " + dbName.O, + } + err = snap.DoHandleDDL(job) require.Nil(t, err) } func TestHandleDDL(t *testing.T) { - snap := newEmptySchemaSnapshot(false) + snap := schema.NewEmptySnapshot(false) dbName := timodel.NewCIStr("Test") colName := timodel.NewCIStr("A") tbName := timodel.NewCIStr("T") @@ -357,18 +365,18 @@ func TestHandleDDL(t *testing.T) { _, ok := snap.SchemaByID(dbInfo.ID) require.True(t, ok) case "createTable": - _, ok := snap.TableByID(tblInfo.ID) + _, ok := snap.PhysicalTableByID(tblInfo.ID) require.True(t, ok) case "renameTable": - tb, ok := snap.TableByID(tblInfo.ID) + tb, ok := snap.PhysicalTableByID(tblInfo.ID) require.True(t, ok) require.Equal(t, tblInfo.Name, tb.Name) case "addColumn", "truncateTable": - tb, ok := snap.TableByID(tblInfo.ID) + tb, ok := snap.PhysicalTableByID(tblInfo.ID) require.True(t, ok) require.Len(t, tb.Columns, 1) case "dropTable": - _, ok := snap.TableByID(tblInfo.ID) + _, ok := snap.PhysicalTableByID(tblInfo.ID) require.False(t, ok) case "dropSchema": _, ok := snap.SchemaByID(job.SchemaID) @@ -379,7 +387,7 @@ func TestHandleDDL(t *testing.T) { func TestHandleRenameTables(t *testing.T) { // Initial schema: db_1.table_1 and db_2.table_2. - snap := newEmptySchemaSnapshot(true) + snap := schema.NewEmptySnapshot(true) var i int64 for i = 1; i < 3; i++ { dbInfo := &timodel.DBInfo{ @@ -387,7 +395,15 @@ func TestHandleRenameTables(t *testing.T) { Name: timodel.NewCIStr(fmt.Sprintf("db_%d", i)), State: timodel.StatePublic, } - err := snap.createSchema(dbInfo) + job := &timodel.Job{ + ID: i, + State: timodel.JobStateSynced, + SchemaID: i, + Type: timodel.ActionCreateSchema, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: i, DBInfo: dbInfo, FinishedTS: uint64(i)}, + Query: fmt.Sprintf("create database %s", dbInfo.Name.O), + } + err := snap.HandleDDL(job) require.Nil(t, err) } for i = 1; i < 3; i++ { @@ -396,7 +412,16 @@ func TestHandleRenameTables(t *testing.T) { Name: timodel.NewCIStr(fmt.Sprintf("table_%d", i)), State: timodel.StatePublic, } - err := snap.createTable(model.WrapTableInfo(i, fmt.Sprintf("db_%d", i), 1, tblInfo)) + job := &timodel.Job{ + ID: i, + State: timodel.JobStateSynced, + SchemaID: i, + TableID: 10 + i, + Type: timodel.ActionCreateTable, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: i, TableInfo: tblInfo, FinishedTS: uint64(10 + i)}, + Query: "create table " + tblInfo.Name.O, + } + err := snap.HandleDDL(job) require.Nil(t, err) } @@ -431,24 +456,24 @@ func TestHandleRenameTables(t *testing.T) { testDoDDLAndCheck(t, snap, job, false) var ok bool - _, ok = snap.TableByID(13) + _, ok = snap.PhysicalTableByID(13) require.True(t, ok) - _, ok = snap.TableByID(14) + _, ok = snap.PhysicalTableByID(14) require.True(t, ok) - _, ok = snap.TableByID(11) + _, ok = snap.PhysicalTableByID(11) require.False(t, ok) - _, ok = snap.TableByID(12) + _, ok = snap.PhysicalTableByID(12) require.False(t, ok) - t1 := model.TableName{Schema: "db_2", Table: "x"} - t2 := model.TableName{Schema: "db_1", Table: "y"} - require.Equal(t, snap.tableNameToID[t1], int64(13)) - require.Equal(t, snap.tableNameToID[t2], int64(14)) - require.Equal(t, uint64(11112222), snap.currentTs) + n1, _ := snap.TableIDByName("db_2", "x") + require.Equal(t, n1, int64(13)) + n2, _ := snap.TableIDByName("db_1", "y") + require.Equal(t, n2, int64(14)) + require.Equal(t, uint64(11112222), snap.CurrentTs()) } -func testDoDDLAndCheck(t *testing.T, snap *schemaSnapshot, job *timodel.Job, isErr bool) { - err := snap.handleDDL(job) +func testDoDDLAndCheck(t *testing.T, snap *schema.Snapshot, job *timodel.Job, isErr bool) { + err := snap.HandleDDL(job) require.Equal(t, err != nil, isErr) } @@ -636,45 +661,45 @@ func TestMultiVersionStorage(t *testing.T) { require.Nil(t, err) _, exist := snap.SchemaByID(1) require.True(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.False(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.False(t, exist) snap, err = storage.GetSnapshot(ctx, 115) require.Nil(t, err) _, exist = snap.SchemaByID(1) require.True(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.True(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.False(t, exist) snap, err = storage.GetSnapshot(ctx, 125) require.Nil(t, err) _, exist = snap.SchemaByID(1) require.True(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.True(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.True(t, exist) snap, err = storage.GetSnapshot(ctx, 135) require.Nil(t, err) _, exist = snap.SchemaByID(1) require.True(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.False(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.True(t, exist) snap, err = storage.GetSnapshot(ctx, 140) require.Nil(t, err) _, exist = snap.SchemaByID(1) require.False(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.False(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.False(t, exist) lastSchemaTs := storage.DoGC(0) @@ -684,9 +709,9 @@ func TestMultiVersionStorage(t *testing.T) { require.Nil(t, err) _, exist = snap.SchemaByID(1) require.True(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.False(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.False(t, exist) storage.DoGC(115) _, err = storage.GetSnapshot(ctx, 100) @@ -695,9 +720,9 @@ func TestMultiVersionStorage(t *testing.T) { require.Nil(t, err) _, exist = snap.SchemaByID(1) require.True(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.True(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.False(t, exist) lastSchemaTs = storage.DoGC(155) @@ -709,9 +734,9 @@ func TestMultiVersionStorage(t *testing.T) { require.Nil(t, err) _, exist = snap.SchemaByID(1) require.False(t, exist) - _, exist = snap.TableByID(2) + _, exist = snap.PhysicalTableByID(2) require.False(t, exist) - _, exist = snap.TableByID(3) + _, exist = snap.PhysicalTableByID(3) require.False(t, exist) _, err = storage.GetSnapshot(ctx, 130) require.NotNil(t, err) @@ -743,58 +768,16 @@ func TestCreateSnapFromMeta(t *testing.T) { require.Nil(t, err) meta, err := kv.GetSnapshotMeta(store, ver.Ver) require.Nil(t, err) - snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false) + snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false) require.Nil(t, err) - _, ok := snap.GetTableByName("test", "simple_test1") + _, ok := snap.TableByName("test", "simple_test1") require.True(t, ok) - tableID, ok := snap.GetTableIDByName("test2", "simple_test5") + tableID, ok := snap.TableIDByName("test2", "simple_test5") require.True(t, ok) require.True(t, snap.IsIneligibleTableID(tableID)) dbInfo, ok := snap.SchemaByTableID(tableID) require.True(t, ok) require.Equal(t, dbInfo.Name.O, "test2") - require.Len(t, snap.tableInSchema, 3) -} - -func TestSnapshotClone(t *testing.T) { - store, err := mockstore.NewMockStore() - require.Nil(t, err) - defer store.Close() //nolint:errcheck - - session.SetSchemaLease(0) - session.DisableStats4Test() - domain, err := session.BootstrapSession(store) - require.Nil(t, err) - defer domain.Close() - domain.SetStatsUpdating(true) - tk := testkit.NewTestKit(t, store) - tk.MustExec("create database test2") - tk.MustExec("create table test.simple_test1 (id bigint primary key)") - tk.MustExec("create table test.simple_test2 (id bigint primary key)") - tk.MustExec("create table test2.simple_test3 (id bigint primary key)") - tk.MustExec("create table test2.simple_test4 (id bigint primary key)") - tk.MustExec("create table test2.simple_test5 (a bigint)") - ver, err := store.CurrentVersion(oracle.GlobalTxnScope) - require.Nil(t, err) - meta, err := kv.GetSnapshotMeta(store, ver.Ver) - require.Nil(t, err) - snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* forceReplicate */) - require.Nil(t, err) - - clone := snap.Clone() - require.Equal(t, clone.tableNameToID, snap.tableNameToID) - require.Equal(t, clone.schemaNameToID, snap.schemaNameToID) - require.Equal(t, clone.truncateTableID, snap.truncateTableID) - require.Equal(t, clone.ineligibleTableID, snap.ineligibleTableID) - require.Equal(t, clone.currentTs, snap.currentTs) - require.Equal(t, clone.forceReplicate, snap.forceReplicate) - require.Equal(t, len(clone.tables), len(snap.tables)) - require.Equal(t, len(clone.schemas), len(snap.schemas)) - require.Equal(t, len(clone.partitionTable), len(snap.partitionTable)) - - tableCount := len(snap.tables) - clone.tables = make(map[int64]*model.TableInfo) - require.Len(t, snap.tables, tableCount) } func TestExplicitTables(t *testing.T) { @@ -821,21 +804,21 @@ func TestExplicitTables(t *testing.T) { require.Nil(t, err) meta1, err := kv.GetSnapshotMeta(store, ver1.Ver) require.Nil(t, err) - snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */) + snap1, err := schema.NewSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */) require.Nil(t, err) meta2, err := kv.GetSnapshotMeta(store, ver2.Ver) require.Nil(t, err) - snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */) + snap2, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */) require.Nil(t, err) - snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */) + snap3, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */) require.Nil(t, err) - require.Equal(t, len(snap2.tables)-len(snap1.tables), 5) + require.Equal(t, snap2.TableCount(true)-snap1.TableCount(true), 5) // some system tables are also ineligible - require.GreaterOrEqual(t, len(snap2.ineligibleTableID), 4) + require.GreaterOrEqual(t, snap2.TableCount(false), 4) - require.Equal(t, len(snap3.tables)-len(snap1.tables), 5) - require.Len(t, snap3.ineligibleTableID, 0) + require.Equal(t, snap3.TableCount(true)-snap1.TableCount(true), 5) + require.Equal(t, snap3.TableCount(false), 36) } /* @@ -967,14 +950,14 @@ func TestSchemaStorage(t *testing.T) { ts := job.BinlogInfo.FinishedTS meta, err := kv.GetSnapshotMeta(store, ts) require.Nil(t, err) - snapFromMeta, err := newSchemaSnapshotFromMeta(meta, ts, false) + snapFromMeta, err := schema.NewSnapshotFromMeta(meta, ts, false) require.Nil(t, err) snapFromSchemaStore, err := schemaStorage.GetSnapshot(ctx, ts) require.Nil(t, err) - tidySchemaSnapshot(snapFromMeta) - tidySchemaSnapshot(snapFromSchemaStore) - require.Equal(t, snapFromMeta, snapFromSchemaStore) + s1 := snapFromMeta.DumpToString() + s2 := snapFromSchemaStore.DumpToString() + require.Equal(t, s1, s2) } } @@ -983,38 +966,6 @@ func TestSchemaStorage(t *testing.T) { } } -func tidySchemaSnapshot(snap *schemaSnapshot) { - for _, dbInfo := range snap.schemas { - if len(dbInfo.Tables) == 0 { - dbInfo.Tables = nil - } - } - for _, tableInfo := range snap.tables { - tableInfo.TableInfoVersion = 0 - if len(tableInfo.Columns) == 0 { - tableInfo.Columns = nil - } - if len(tableInfo.Indices) == 0 { - tableInfo.Indices = nil - } - if len(tableInfo.ForeignKeys) == 0 { - tableInfo.ForeignKeys = nil - } - } - // the snapshot from meta doesn't know which ineligible tables that have existed in history - // so we delete the ineligible tables which are already not exist - for tableID := range snap.ineligibleTableID { - if _, ok := snap.tables[tableID]; !ok { - delete(snap.ineligibleTableID, tableID) - } - } - // the snapshot from meta doesn't know which tables are truncated, so we just ignore it - snap.truncateTableID = nil - for _, v := range snap.tableInSchema { - sort.Slice(v, func(i, j int) bool { return v[i] < v[j] }) - } -} - func getAllHistoryDDLJob(storage tidbkv.Storage) ([]*timodel.Job, error) { s, err := session.CreateSession(storage) if err != nil { diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index c655ab9cc76..0a81d229245 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -19,7 +19,7 @@ import ( tidbkv "github.com/pingcap/tidb/kv" timeta "github.com/pingcap/tidb/meta" timodel "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -30,7 +30,7 @@ import ( ) type schemaWrap4Owner struct { - schemaSnapshot *entry.SingleSchemaSnapshot + schemaSnapshot *schema.Snapshot filter *filter.Filter config *config.ReplicaConfig @@ -52,7 +52,7 @@ func newSchemaWrap4Owner( return nil, errors.Trace(err) } } - schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, config.ForceReplicate) + schemaSnap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, config.ForceReplicate) if err != nil { return nil, errors.Trace(err) } @@ -74,13 +74,13 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID { if s.allPhysicalTablesCache != nil { return s.allPhysicalTablesCache } - tables := s.schemaSnapshot.Tables() - s.allPhysicalTablesCache = make([]model.TableID, 0, len(tables)) - for _, tblInfo := range tables { + // NOTE: it's better to pre-allocate the vector. However in the current implementation + // we can't know how many valid tables in the snapshot. + s.allPhysicalTablesCache = make([]model.TableID, 0) + s.schemaSnapshot.IterTables(true, func(tblInfo *model.TableInfo) { if s.shouldIgnoreTable(tblInfo) { - continue + return } - if pi := tblInfo.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { s.allPhysicalTablesCache = append(s.allPhysicalTablesCache, partition.ID) @@ -88,22 +88,18 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID { } else { s.allPhysicalTablesCache = append(s.allPhysicalTablesCache, tblInfo.ID) } - } + }) return s.allPhysicalTablesCache } // AllTableNames returns the table names of all tables that are being replicated. func (s *schemaWrap4Owner) AllTableNames() []model.TableName { - tables := s.schemaSnapshot.Tables() - names := make([]model.TableName, 0, len(tables)) - for _, tblInfo := range tables { - if s.shouldIgnoreTable(tblInfo) { - continue + names := make([]model.TableName, 0, len(s.allPhysicalTablesCache)) + s.schemaSnapshot.IterTables(true, func(tblInfo *model.TableInfo) { + if !s.shouldIgnoreTable(tblInfo) { + names = append(names, tblInfo.TableName) } - - names = append(names, tblInfo.TableName) - } - + }) return names } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 784499009eb..47f113f5b35 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -935,8 +935,8 @@ func (p *processor) getTableName(ctx cdcContext.Context, // after `rename table` DDL, since `rename table` keeps the tableID unchanged var tableName *model.TableName retry.Do(ctx, func() error { //nolint:errcheck - if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok { - tableName = &name + if x, ok := p.schemaStorage.GetLastSnapshot().PhysicalTableByID(tableID); ok { + tableName = &x.TableName return nil } return errors.Errorf("failed to get table name, fallback to use table id: %d", @@ -950,18 +950,15 @@ func (p *processor) getTableName(ctx cdcContext.Context, var markTableID model.TableID err := retry.Do(context.Background(), func() error { if tableName == nil { - name, exist := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID) + x, exist := p.schemaStorage.GetLastSnapshot().PhysicalTableByID(tableID) if !exist { return cerror.ErrProcessorTableNotFound. GenWithStack("normal table(%s)", tableID) } - tableName = &name + tableName = &x.TableName } - markTableSchemaName, markTableTableName := mark.GetMarkTableName( - tableName.Schema, tableName.Table) - tableInfo, exist := p.schemaStorage. - GetLastSnapshot(). - GetTableByName(markTableSchemaName, markTableTableName) + markTableSchemaName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table) + tableInfo, exist := p.schemaStorage.GetLastSnapshot().TableByName(markTableSchemaName, markTableTableName) if !exist { return cerror.ErrProcessorTableNotFound. GenWithStack("normal table(%s) and mark table not match",