Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add GetActiveTopics support for event router #4743

Merged
merged 9 commits into from
Mar 7, 2022
18 changes: 17 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type changefeed struct {
// ddlEventCache is not nil when the changefeed is executing a DDL event asynchronously
// After the DDL event has been executed, ddlEventCache will be set to nil.
ddlEventCache *model.DDLEvent
// currentTableNames is the table names that the changefeed is watching.
// And it contains only the tables of the ddl that have been processed.
// The ones that have not been executed yet do not have.
currentTableNames []model.TableName

errCh chan error
// cancel the running goroutine start by `DDLPuller`
Expand Down Expand Up @@ -189,7 +193,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
default:
}

c.sink.emitCheckpointTs(ctx, checkpointTs)
// This means that the cached DDL has been executed,
// and we need to use the latest table names.
if c.currentTableNames == nil {
c.currentTableNames = c.schema.AllTableNames()
}
c.sink.emitCheckpointTs(checkpointTs, c.currentTableNames)
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -522,6 +531,10 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
if err != nil {
return false, errors.Trace(err)
}
// We can't use the latest schema directly,
// we need to make sure we receive the ddl before we start or stop broadcasting checkpoint ts.
// So let's remember the name of the table before processing and cache the DDL.
c.currentTableNames = c.schema.AllTableNames()
err = c.schema.HandleDDL(job)
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -549,6 +562,9 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
}
if done {
c.ddlEventCache = nil
// It has expired.
// We should use the latest table names now.
c.currentTableNames = nil
}
return done, nil
}
Expand Down
93 changes: 89 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -70,7 +69,11 @@ type mockDDLSink struct {
// DDLSink
ddlExecuting *model.DDLEvent
ddlDone bool
checkpointTs model.Ts
mu struct {
sync.Mutex
checkpointTs model.Ts
currentTableNames []model.TableName
}
syncPoint model.Ts
syncPointHis []model.Ts

Expand Down Expand Up @@ -100,8 +103,17 @@ func (m *mockDDLSink) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64)
return nil
}

func (m *mockDDLSink) emitCheckpointTs(ctx cdcContext.Context, ts uint64) {
atomic.StoreUint64(&m.checkpointTs, ts)
func (m *mockDDLSink) emitCheckpointTs(ts uint64, tableNames []model.TableName) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.checkpointTs = ts
m.mu.currentTableNames = tableNames
}

func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []model.TableName) {
m.mu.Lock()
defer m.mu.Unlock()
return m.mu.checkpointTs, m.mu.currentTableNames
}

func (m *mockDDLSink) close(ctx context.Context) error {
Expand Down Expand Up @@ -296,6 +308,79 @@ func TestExecDDL(t *testing.T) {
require.Contains(t, state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, job.TableID)
}

func TestEmitCheckpointTs(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
// Creates a table, which will be deleted at the start-ts of the changefeed.
// It is expected that the changefeed DOES NOT replicate this table.
helper.DDL2Job("create database test0")
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
KVStorage: helper.Storage(),
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
PDClock: pdtime.NewClock4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})

cf, state, captures, tester := createChangefeed4Test(ctx, t)
defer cf.Close(ctx)
tickThreeTime := func() {
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
}
// pre check and initialize
tickThreeTime()
mockDDLSink := cf.sink.(*mockDDLSink)

require.Len(t, cf.schema.AllTableNames(), 1)
require.Len(t, state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Operation, 0)
require.Len(t, state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, 0)
ts, names := mockDDLSink.getCheckpointTsAndTableNames()
require.Equal(t, ts, startTs)
require.Len(t, names, 1)

job = helper.DDL2Job("drop table test0.table0")
// ddl puller resolved ts grow up
mockDDLPuller := cf.ddlPuller.(*mockDDLPuller)
mockDDLPuller.resolvedTs = startTs
job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
// three tick to make sure all barriers set in initialize is handled
tickThreeTime()
require.Equal(t, state.Status.CheckpointTs, mockDDLPuller.resolvedTs)
// The ephemeral table should have left no trace in the schema cache
require.Len(t, cf.schema.AllTableNames(), 0)
// We can't use the new schema because the ddl hasn't been executed yet.
ts, names = mockDDLSink.getCheckpointTsAndTableNames()
require.Equal(t, ts, mockDDLPuller.resolvedTs)
require.Len(t, names, 1)

// executing the ddl finished
mockDDLSink.ddlDone = true
mockDDLPuller.resolvedTs += 1000
tickThreeTime()
require.Equal(t, state.Status.CheckpointTs, mockDDLPuller.resolvedTs)
ts, names = mockDDLSink.getCheckpointTsAndTableNames()
require.Equal(t, ts, mockDDLPuller.resolvedTs)
require.Len(t, names, 0)
}

func TestSyncPoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.SyncPointEnabled = true
Expand Down
24 changes: 18 additions & 6 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type DDLSink interface {
// emitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
emitCheckpointTs(ctx cdcContext.Context, ts uint64)
emitCheckpointTs(ts uint64, tableNames []model.TableName)
// emitDDLEvent emits DDL event and return true if the DDL is executed
// the DDL event will be sent to another goroutine and execute to downstream
// the caller of this function can call again and again until a true returned
Expand All @@ -58,7 +58,12 @@ type ddlSinkImpl struct {
lastSyncPoint model.Ts
syncPointStore sink.SyncpointStore

checkpointTs model.Ts
// It is used to record the checkpointTs and the names of the table at that time.
mu struct {
sync.Mutex
checkpointTs model.Ts
currentTableNames []model.TableName
}
ddlFinishedTs model.Ts
ddlSentTs model.Ts

Expand Down Expand Up @@ -144,12 +149,16 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
ctx.Throw(err)
return
case <-ticker.C:
checkpointTs := atomic.LoadUint64(&s.checkpointTs)
s.mu.Lock()
checkpointTs := s.mu.checkpointTs
if checkpointTs == 0 || checkpointTs <= lastCheckpointTs {
s.mu.Unlock()
continue
}
tables := s.mu.currentTableNames
s.mu.Unlock()
lastCheckpointTs = checkpointTs
if err := s.sink.EmitCheckpointTs(ctx, checkpointTs); err != nil {
if err := s.sink.EmitCheckpointTs(ctx, checkpointTs, tables); err != nil {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
ctx.Throw(errors.Trace(err))
return
}
Expand Down Expand Up @@ -178,8 +187,11 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
}()
}

func (s *ddlSinkImpl) emitCheckpointTs(ctx cdcContext.Context, ts uint64) {
atomic.StoreUint64(&s.checkpointTs, ts)
func (s *ddlSinkImpl) emitCheckpointTs(ts uint64, tableNames []model.TableName) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.checkpointTs = ts
s.mu.currentTableNames = tableNames
}

func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type mockSink struct {
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
func (m *mockSink) EmitCheckpointTs(_ context.Context, ts uint64, _ []model.TableName) error {
atomic.StoreUint64(&m.checkpointTs, ts)
return nil
}
Expand Down Expand Up @@ -92,9 +92,9 @@ func TestCheckpoint(t *testing.T) {
return nil
}, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(30))
}
ddlSink.emitCheckpointTs(ctx, 1)
ddlSink.emitCheckpointTs(1, nil)
require.Nil(t, waitCheckpointGrowingUp(mSink, 1))
ddlSink.emitCheckpointTs(ctx, 10)
ddlSink.emitCheckpointTs(10, nil)
require.Nil(t, waitCheckpointGrowingUp(mSink, 10))
}

Expand Down
15 changes: 15 additions & 0 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID {
return s.allPhysicalTablesCache
}

// AllTableNames returns the table names of all tables that are being replicated.
func (s *schemaWrap4Owner) AllTableNames() []model.TableName {
tables := s.schemaSnapshot.Tables()
names := make([]model.TableName, 0, len(tables))
for _, tblInfo := range tables {
if s.shouldIgnoreTable(tblInfo) {
continue
}

names = append(names, tblInfo.TableName)
}

return names
}

func (s *schemaWrap4Owner) HandleDDL(job *timodel.Job) error {
if job.BinlogInfo.FinishedTS <= s.ddlHandledTs {
return nil
Expand Down
17 changes: 17 additions & 0 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ func TestAllPhysicalTables(t *testing.T) {
require.Equal(t, schema.AllPhysicalTables(), expectedTableIDs)
}

func TestAllTableNames(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
require.Nil(t, err)
require.Len(t, schema.AllTableNames(), 0)
// add normal table
job := helper.DDL2Job("create table test.t1(id int primary key)")
require.Nil(t, schema.HandleDDL(job))
require.Equal(t, []model.TableName{{Schema: "test", Table: "t1"}}, schema.AllTableNames())
// add ineligible table
require.Nil(t, schema.HandleDDL(helper.DDL2Job("create table test.t2(id int)")))
require.Equal(t, []model.TableName{{Schema: "test", Table: "t1"}}, schema.AllTableNames())
}

func TestIsIneligibleTableID(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, r
return resolvedTs, nil
}

func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {
panic("unreachable")
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.Table
return resolvedTs, err
}

func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
log.Debug("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts))
func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error {
log.Debug("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts), zap.Any("tables", tables))
return nil
}

Expand Down
Loading