Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add GetActiveTopics support for event router #4743

Merged
merged 9 commits into from
Mar 7, 2022
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
default:
}

c.sink.emitCheckpointTs(ctx, checkpointTs)
c.sink.emitCheckpointTs(checkpointTs, c.schema.AllTableNames())
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down
89 changes: 85 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -70,7 +69,11 @@ type mockDDLSink struct {
// DDLSink
ddlExecuting *model.DDLEvent
ddlDone bool
checkpointTs model.Ts
mu struct {
sync.Mutex
checkpointTs model.Ts
currentTableNames []model.TableName
}
syncPoint model.Ts
syncPointHis []model.Ts

Expand Down Expand Up @@ -100,8 +103,17 @@ func (m *mockDDLSink) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64)
return nil
}

func (m *mockDDLSink) emitCheckpointTs(ctx cdcContext.Context, ts uint64) {
atomic.StoreUint64(&m.checkpointTs, ts)
func (m *mockDDLSink) emitCheckpointTs(ts uint64, tableNames []model.TableName) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.checkpointTs = ts
m.mu.currentTableNames = tableNames
}

func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []model.TableName) {
m.mu.Lock()
defer m.mu.Unlock()
return m.mu.checkpointTs, m.mu.currentTableNames
}

func (m *mockDDLSink) close(ctx context.Context) error {
Expand Down Expand Up @@ -296,6 +308,75 @@ func TestExecDDL(t *testing.T) {
require.Contains(t, state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, job.TableID)
}

func TestEmitCheckpointTs(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
// Creates a table, which will be deleted at the start-ts of the changefeed.
// It is expected that the changefeed DOES NOT replicate this table.
helper.DDL2Job("create database test0")
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
KVStorage: helper.Storage(),
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
PDClock: pdtime.NewClock4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})

cf, state, captures, tester := createChangefeed4Test(ctx, t)
defer cf.Close(ctx)
tickThreeTime := func() {
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
}
// pre check and initialize
tickThreeTime()
mockDDLSink := cf.sink.(*mockDDLSink)

require.Len(t, cf.schema.AllPhysicalTables(), 1)
require.Len(t, state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Operation, 0)
require.Len(t, state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, 0)
ts, names := mockDDLSink.getCheckpointTsAndTableNames()
require.Equal(t, ts, startTs)
require.Len(t, names, 1)

job = helper.DDL2Job("drop table test0.table0")
// ddl puller resolved ts grow up
mockDDLPuller := cf.ddlPuller.(*mockDDLPuller)
mockDDLPuller.resolvedTs = startTs
job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
// three tick to make sure all barriers set in initialize is handled
tickThreeTime()
require.Equal(t, state.Status.CheckpointTs, mockDDLPuller.resolvedTs)
// The ephemeral table should have left no trace in the schema cache
require.Len(t, cf.schema.AllPhysicalTables(), 0)

// executing the ddl finished
mockDDLSink.ddlDone = true
mockDDLPuller.resolvedTs += 1000
tickThreeTime()
require.Equal(t, state.Status.CheckpointTs, mockDDLPuller.resolvedTs)
ts, names = mockDDLSink.getCheckpointTsAndTableNames()
require.Equal(t, ts, mockDDLPuller.resolvedTs)
require.Len(t, names, 0)
}

func TestSyncPoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.SyncPointEnabled = true
Expand Down
24 changes: 18 additions & 6 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type DDLSink interface {
// emitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
emitCheckpointTs(ctx cdcContext.Context, ts uint64)
emitCheckpointTs(ts uint64, tableNames []model.TableName)
// emitDDLEvent emits DDL event and return true if the DDL is executed
// the DDL event will be sent to another goroutine and execute to downstream
// the caller of this function can call again and again until a true returned
Expand All @@ -58,7 +58,12 @@ type ddlSinkImpl struct {
lastSyncPoint model.Ts
syncPointStore sink.SyncpointStore

checkpointTs model.Ts
// It is used to record the checkpointTs and the names of the table at that time.
mu struct {
sync.Mutex
checkpointTs model.Ts
currentTableNames []model.TableName
}
ddlFinishedTs model.Ts
ddlSentTs model.Ts

Expand Down Expand Up @@ -144,12 +149,16 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
ctx.Throw(err)
return
case <-ticker.C:
checkpointTs := atomic.LoadUint64(&s.checkpointTs)
s.mu.Lock()
checkpointTs := s.mu.checkpointTs
if checkpointTs == 0 || checkpointTs <= lastCheckpointTs {
s.mu.Unlock()
continue
}
tables := s.mu.currentTableNames
s.mu.Unlock()
lastCheckpointTs = checkpointTs
if err := s.sink.EmitCheckpointTs(ctx, checkpointTs); err != nil {
if err := s.sink.EmitCheckpointTs(ctx, checkpointTs, tables); err != nil {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
ctx.Throw(errors.Trace(err))
return
}
Expand Down Expand Up @@ -178,8 +187,11 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
}()
}

func (s *ddlSinkImpl) emitCheckpointTs(ctx cdcContext.Context, ts uint64) {
atomic.StoreUint64(&s.checkpointTs, ts)
func (s *ddlSinkImpl) emitCheckpointTs(ts uint64, tableNames []model.TableName) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.checkpointTs = ts
s.mu.currentTableNames = tableNames
}

func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type mockSink struct {
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
func (m *mockSink) EmitCheckpointTs(_ context.Context, ts uint64, _ []model.TableName) error {
atomic.StoreUint64(&m.checkpointTs, ts)
return nil
}
Expand Down Expand Up @@ -92,9 +92,9 @@ func TestCheckpoint(t *testing.T) {
return nil
}, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(30))
}
ddlSink.emitCheckpointTs(ctx, 1)
ddlSink.emitCheckpointTs(1, nil)
require.Nil(t, waitCheckpointGrowingUp(mSink, 1))
ddlSink.emitCheckpointTs(ctx, 10)
ddlSink.emitCheckpointTs(10, nil)
require.Nil(t, waitCheckpointGrowingUp(mSink, 10))
}

Expand Down
22 changes: 22 additions & 0 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type schemaWrap4Owner struct {
config *config.ReplicaConfig

allPhysicalTablesCache []model.TableID
allTableNamesCache []model.TableName
ddlHandledTs model.Ts
}

Expand Down Expand Up @@ -85,11 +86,32 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID {
return s.allPhysicalTablesCache
}

// AllTableNames returns the table names of all tables that are being replicated.
// NOTICE: AllTableNames is not thread-safe.
func (s *schemaWrap4Owner) AllTableNames() []model.TableName {
if s.allTableNamesCache != nil {
return s.allTableNamesCache
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
}

tables := s.schemaSnapshot.Tables()
s.allTableNamesCache = make([]model.TableName, 0, len(tables))
for _, tblInfo := range tables {
if s.shouldIgnoreTable(tblInfo) {
continue
}

s.allTableNamesCache = append(s.allTableNamesCache, tblInfo.TableName)
}

return s.allTableNamesCache
}

func (s *schemaWrap4Owner) HandleDDL(job *timodel.Job) error {
if job.BinlogInfo.FinishedTS <= s.ddlHandledTs {
return nil
}
s.allPhysicalTablesCache = nil
s.allTableNamesCache = nil
err := s.schemaSnapshot.HandleDDL(job)
if err != nil {
return errors.Trace(err)
Expand Down
17 changes: 17 additions & 0 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ func TestAllPhysicalTables(t *testing.T) {
require.Equal(t, schema.AllPhysicalTables(), expectedTableIDs)
}

func TestAllTableNames(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
require.Nil(t, err)
require.Len(t, schema.AllTableNames(), 0)
// add normal table
job := helper.DDL2Job("create table test.t1(id int primary key)")
require.Nil(t, schema.HandleDDL(job))
require.Equal(t, []model.TableName{{Schema: "test", Table: "t1"}}, schema.AllTableNames())
// add ineligible table
require.Nil(t, schema.HandleDDL(helper.DDL2Job("create table test.t2(id int)")))
require.Equal(t, []model.TableName{{Schema: "test", Table: "t1"}}, schema.AllTableNames())
}

func TestIsIneligibleTableID(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, r
return resolvedTs, nil
}

func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {
panic("unreachable")
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.Table
return resolvedTs, err
}

func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
log.Debug("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts))
func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error {
log.Debug("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts), zap.Any("tables", tables))
return nil
}

Expand Down
Loading