Skip to content

Commit

Permalink
schema(cdc): refactor schema snapshot to reduce memory usage (#5144)
Browse files Browse the repository at this point in the history
close #1386
  • Loading branch information
hicqu authored Apr 26, 2022
1 parent 65d1bbf commit be428b7
Show file tree
Hide file tree
Showing 8 changed files with 1,611 additions and 867 deletions.
12 changes: 6 additions & 6 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
Expand Down Expand Up @@ -215,25 +215,25 @@ func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if err != nil {
return nil, nil, errors.Trace(err)
}
snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */)
if err != nil {
return nil, nil, errors.Trace(err)
}

for _, tableInfo := range snap.Tables() {
snap.IterTables(true, func(tableInfo *model.TableInfo) {
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
return
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
return
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
eligibleTables = append(eligibleTables, tableInfo.TableName)
}
}
})
return
}
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
err := scheamStorage.HandleDDLJob(job)
require.Nil(t, err)
}
tableInfo, ok := scheamStorage.GetLastSnapshot().GetTableByName("test", tc.tableName)
tableInfo, ok := scheamStorage.GetLastSnapshot().TableByName("test", tc.tableName)
require.True(t, ok)
if tableInfo.IsCommonHandle {
// we can check this log to make sure if the clustered-index is enabled
Expand Down
Loading

0 comments on commit be428b7

Please sign in to comment.