Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-flush-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Feb 23, 2022
2 parents 9279488 + 036c6ae commit 29cfb10
Show file tree
Hide file tree
Showing 28 changed files with 9,877 additions and 4,822 deletions.
11 changes: 9 additions & 2 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func verifyCreateChangefeedConfig(
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.Storage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.Storage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -204,7 +204,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
// VerifyTables catalog tables specified by ReplicaConfig into
// eligible (has an unique index or primary key) and ineligible tables.
func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -222,6 +224,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
Expand Down
52 changes: 32 additions & 20 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type schemaSnapshot struct {

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
// if forceReplicate is true, treat ineligible tables as eligible.
forceReplicate bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -101,17 +101,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) {
// meta is nil only in unit tests
if meta == nil {
snap := newEmptySchemaSnapshot(explicitTables)
snap := newEmptySchemaSnapshot(forceReplicate)
snap.currentTs = currentTs
return snap, nil
}
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate)
}

func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -124,12 +124,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
forceReplicate: forceReplicate,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -149,7 +149,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible(explicitTables)
isEligible := tableInfo.IsEligible(forceReplicate)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
zap.Int64("add partition id", partition.ID))
}
s.partitionTable[partition.ID] = tbl
if !tbl.IsEligible(s.explicitTables) {
if !tbl.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
delete(oldIDs, partition.ID)
Expand Down Expand Up @@ -512,14 +512,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
s.tableInSchema[table.SchemaID] = tableInSchema

s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -537,14 +543,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -715,7 +727,7 @@ type schemaStorageImpl struct {
resolvedTs uint64

filter *filter.Filter
explicitTables bool
forceReplicate bool
}

// NewSchemaStorage creates a new schema storage
Expand All @@ -734,7 +746,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
forceReplicate: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -814,7 +826,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot(s.explicitTables)
snap = newEmptySchemaSnapshot(s.forceReplicate)
}
if err := snap.handleDDL(job); err != nil {
return errors.Trace(err)
Expand Down
10 changes: 5 additions & 5 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func TestSnapshotClone(t *testing.T) {
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* explicitTables */)
snap, err := newSchemaSnapshotFromMeta(meta, ver.Ver, false /* forceReplicate */)
require.Nil(t, err)

clone := snap.Clone()
Expand All @@ -784,7 +784,7 @@ func TestSnapshotClone(t *testing.T) {
require.Equal(t, clone.truncateTableID, snap.truncateTableID)
require.Equal(t, clone.ineligibleTableID, snap.ineligibleTableID)
require.Equal(t, clone.currentTs, snap.currentTs)
require.Equal(t, clone.explicitTables, snap.explicitTables)
require.Equal(t, clone.forceReplicate, snap.forceReplicate)
require.Equal(t, len(clone.tables), len(snap.tables))
require.Equal(t, len(clone.schemas), len(snap.schemas))
require.Equal(t, len(clone.partitionTable), len(snap.partitionTable))
Expand Down Expand Up @@ -818,13 +818,13 @@ func TestExplicitTables(t *testing.T) {
require.Nil(t, err)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
require.Nil(t, err)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* explicitTables */)
snap1, err := newSchemaSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */)
require.Nil(t, err)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
require.Nil(t, err)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* explicitTables */)
snap2, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */)
require.Nil(t, err)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* explicitTables */)
snap3, err := newSchemaSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */)
require.Nil(t, err)

require.Equal(t, len(snap2.tables)-len(snap1.tables), 5)
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if ti.IsSequence() {
return false
}
if forceReplicate {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,19 @@ func TestTableInfoGetterFuncs(t *testing.T) {
info = WrapTableInfo(1, "test", 0, &tbl)
require.False(t, info.IsEligible(false))
require.True(t, info.IsEligible(true))

// View is eligible.
tbl.View = &timodel.ViewInfo{}
info = WrapTableInfo(1, "test", 0, &tbl)
require.True(t, info.IsView())
require.True(t, info.IsEligible(false))

// Sequence is ineligible.
tbl.Sequence = &timodel.SequenceInfo{}
info = WrapTableInfo(1, "test", 0, &tbl)
require.True(t, info.IsSequence())
require.False(t, info.IsEligible(false))
require.False(t, info.IsEligible(true))
}

func TestTableInfoClone(t *testing.T) {
Expand Down
46 changes: 11 additions & 35 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,48 +116,24 @@ func (s *schemaWrap4Owner) BuildDDLEvent(job *timodel.Job) (*model.DDLEvent, err
return ddlEvent, nil
}

func (s *schemaWrap4Owner) SinkTableInfos() []*model.SimpleTableInfo {
var sinkTableInfos []*model.SimpleTableInfo
for tableID := range s.schemaSnapshot.CloneTables() {
tblInfo, ok := s.schemaSnapshot.TableByID(tableID)
if !ok {
log.Panic("table not found for table ID", zap.Int64("tid", tableID))
}
if s.shouldIgnoreTable(tblInfo) {
continue
}
dbInfo, ok := s.schemaSnapshot.SchemaByTableID(tableID)
if !ok {
log.Panic("schema not found for table ID", zap.Int64("tid", tableID))
}

// TODO separate function for initializing SimpleTableInfo
sinkTableInfo := new(model.SimpleTableInfo)
sinkTableInfo.Schema = dbInfo.Name.O
sinkTableInfo.TableID = tableID
sinkTableInfo.Table = tblInfo.TableName.Table
sinkTableInfo.ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols()))
for i, colInfo := range tblInfo.Cols() {
sinkTableInfo.ColumnInfo[i] = new(model.ColumnInfo)
sinkTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
sinkTableInfos = append(sinkTableInfos, sinkTableInfo)
}
return sinkTableInfos
}

func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool {
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
func (s *schemaWrap4Owner) shouldIgnoreTable(t *model.TableInfo) bool {
schemaName := t.TableName.Schema
tableName := t.TableName.Table
if s.filter.ShouldIgnoreTable(schemaName, tableName) {
return true
}
if s.config.Cyclic.IsEnabled() && mark.IsMarkTable(schemaName, tableName) {
// skip the mark table if cyclic is enabled
return true
}
if !tableInfo.IsEligible(s.config.ForceReplicate) {
log.Warn("skip ineligible table", zap.Int64("tid", tableInfo.ID), zap.Stringer("table", tableInfo.TableName))
if !t.IsEligible(s.config.ForceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !t.IsSequence() {
log.Warn("skip ineligible table",
zap.Int64("tableID", t.ID), zap.Stringer("tableName", t.TableName))
}
return true
}
return false
Expand Down
24 changes: 0 additions & 24 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,3 @@ func TestBuildDDLEvent(t *testing.T) {
},
})
}

func TestSinkTableInfos(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
require.Nil(t, err)
// add normal table
job := helper.DDL2Job("create table test.t1(id int primary key)")
tableIDT1 := job.BinlogInfo.TableInfo.ID
require.Nil(t, schema.HandleDDL(job))
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
require.Nil(t, schema.HandleDDL(job))
require.Equal(t, schema.SinkTableInfos(), []*model.SimpleTableInfo{
{
Schema: "test",
Table: "t1",
TableID: tableIDT1,
ColumnInfo: []*model.ColumnInfo{{Name: "id", Type: mysql.TypeLong}},
},
})
}
Loading

0 comments on commit 29cfb10

Please sign in to comment.