Skip to content

Commit

Permalink
tests: fix more events could be sent after a table is removed (#2569) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 21, 2021
1 parent 7a0530d commit eeecc30
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
defer c.rowsMu.Unlock()
var newRows []*model.RowChangedEvent
for _, row := range c.rows {
c.Assert(row.CommitTs, check.Greater, c.lastResolvedTs)
if row.CommitTs <= c.lastResolvedTs {
return c.lastResolvedTs, errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs)
}
if row.CommitTs > resolvedTs {
newRows = append(newRows, row)
}
Expand Down Expand Up @@ -146,20 +148,19 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
defer manager.Close(ctx)
goroutineNum := 100
goroutineNum := 200
var wg sync.WaitGroup
const ExitSignal = uint64(math.MaxUint64)

var maxResolvedTs uint64
tableSinks := make([]Sink, 0, goroutineNum)
closeChs := make([]chan struct{}, 0, goroutineNum)
runTableSink := func(index int64, sink Sink, startTs uint64, close chan struct{}) {
tableCancels := make([]context.CancelFunc, 0, goroutineNum)
runTableSink := func(ctx context.Context, index int64, sink Sink, startTs uint64) {
defer wg.Done()
ctx := context.Background()
lastResolvedTs := startTs
for {
select {
case <-close:
case <-ctx.Done():
return
default:
}
Expand All @@ -179,7 +180,9 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
c.Assert(err, check.IsNil)
}
_, err := sink.FlushRowChangedEvents(ctx, resolvedTs)
c.Assert(err, check.IsNil)
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
lastResolvedTs = resolvedTs
}
}
Expand All @@ -188,23 +191,25 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
go func() {
defer wg.Done()
// add three table and then remote one table
for i := 0; i < 200; i++ {
for i := 0; i < goroutineNum; i++ {
if i%4 != 3 {
// add table
table := manager.CreateTableSink(model.TableID(i), maxResolvedTs)
close := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
tableCancels = append(tableCancels, cancel)
tableSinks = append(tableSinks, table)
closeChs = append(closeChs, close)
atomic.AddUint64(&maxResolvedTs, 20)
wg.Add(1)
go runTableSink(int64(i), table, maxResolvedTs, close)
go runTableSink(ctx, int64(i), table, maxResolvedTs)
} else {
// remove table
table := tableSinks[0]
close(closeChs[0])
// note when a table is removed, no more data can be sent to the
// backend sink, so we cancel the context of this table sink.
tableCancels[0]()
c.Assert(table.Close(ctx), check.IsNil)
tableSinks = tableSinks[1:]
closeChs = closeChs[1:]
tableCancels = tableCancels[1:]
}
time.Sleep(10 * time.Millisecond)
}
Expand Down

0 comments on commit eeecc30

Please sign in to comment.