From d62e2e70e4e558751f69cbefae66cddec6a8376a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 15 Mar 2022 17:05:52 +0800 Subject: [PATCH] sorter/leveldb(ticdc): fix system stop blocks forever (#4831) close pingcap/tiflow#4699 --- cdc/processor/pipeline/sorter.go | 12 +--- cdc/sorter/leveldb/leveldb.go | 3 + cdc/sorter/leveldb/message/task.go | 9 +++ cdc/sorter/leveldb/reader.go | 5 ++ cdc/sorter/leveldb/sorter.go | 42 +++++------ cdc/sorter/leveldb/sorter_test.go | 46 ++++++------ cdc/sorter/leveldb/system/system.go | 43 +++-------- cdc/sorter/leveldb/system/system_test.go | 92 ++++++++++++++++++++++++ 8 files changed, 164 insertions(+), 88 deletions(-) diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 81d279d80b1..eca42590a36 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -55,8 +55,6 @@ type sorterNode struct { eg *errgroup.Group cancel context.CancelFunc - cleanup func(context.Context) error - // The latest resolved ts that sorter has received. resolvedTs model.Ts @@ -119,7 +117,6 @@ func (n *sorterNode) start(ctx pipeline.NodeContext, isTableActorMode bool, eg * if err != nil { return errors.Trace(err) } - n.cleanup = levelSorter.CleanupFunc() eventSorter = levelSorter } else { // Sorter dir has been set and checked when server starts. @@ -304,15 +301,8 @@ func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) { } } -func (n *sorterNode) releaseResource(ctx context.Context, changefeedID string) { +func (n *sorterNode) releaseResource(_ context.Context, changefeedID string) { defer tableMemoryHistogram.DeleteLabelValues(changefeedID) - if n.cleanup != nil { - // Clean up data when the table sorter is canceled. - err := n.cleanup(ctx) - if err != nil { - log.Warn("schedule table cleanup task failed", zap.Error(err)) - } - } // Since the flowController is implemented by `Cond`, it is not cancelable by a context // the flowController will be blocked in a background goroutine, // We need to abort the flowController manually in the nodeRunner diff --git a/cdc/sorter/leveldb/leveldb.go b/cdc/sorter/leveldb/leveldb.go index 75a5c579463..8e9998e47ac 100644 --- a/cdc/sorter/leveldb/leveldb.go +++ b/cdc/sorter/leveldb/leveldb.go @@ -244,6 +244,9 @@ func (ldb *DBActor) Poll(ctx context.Context, tasks []actormsg.Message) bool { ldb.iterQ.push(task.UID, task.TableID, task.IterReq) requireIter = true } + if task.Test != nil { + time.Sleep(task.Test.Sleep) + } } // Force write only if there is a task requires an iterator. diff --git a/cdc/sorter/leveldb/message/task.go b/cdc/sorter/leveldb/message/task.go index d94951c7114..108df25a384 100644 --- a/cdc/sorter/leveldb/message/task.go +++ b/cdc/sorter/leveldb/message/task.go @@ -15,6 +15,7 @@ package message import ( "fmt" + "time" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" @@ -44,6 +45,9 @@ type Task struct { // Deletes all of the key-values in the range. // reader -> leveldb and leveldb -> compactor DeleteReq *DeleteRequest + + // A test message. + Test *Test } // DeleteRequest a request to delete range. @@ -72,6 +76,11 @@ type IterRequest struct { IterCallback func(*LimitedIterator) `json:"-"` // Make Task JSON printable. } +// Test is a message for testing actors. +type Test struct { + Sleep time.Duration +} + // Key is the key that is written to db. type Key string diff --git a/cdc/sorter/leveldb/reader.go b/cdc/sorter/leveldb/reader.go index 8ac4f356675..b55883198a5 100644 --- a/cdc/sorter/leveldb/reader.go +++ b/cdc/sorter/leveldb/reader.go @@ -405,6 +405,11 @@ func (r *reader) Poll(ctx context.Context, msgs []actormsg.Message) (running boo // Update the max commit ts and resolved ts of all received events. ts := msgs[i].SorterTask.ReadTs r.state.advanceMaxTs(ts.MaxCommitTs, ts.MaxResolvedTs) + + // Test only message. + if msgs[i].SorterTask.Test != nil { + time.Sleep(msgs[i].SorterTask.Test.Sleep) + } } // Length of buffered resolved events. diff --git a/cdc/sorter/leveldb/sorter.go b/cdc/sorter/leveldb/sorter.go index 669e9b7ac6e..f03f40f5d37 100644 --- a/cdc/sorter/leveldb/sorter.go +++ b/cdc/sorter/leveldb/sorter.go @@ -79,7 +79,7 @@ type Sorter struct { writerActorID actor.ID readerRouter *actor.Router - readerActorID actor.ID + ReaderActorID actor.ID outputCh chan *model.PolymorphicEvent @@ -174,7 +174,7 @@ func NewSorter( writerRouter: writerRouter, writerActorID: actorID, readerRouter: readerRouter, - readerActorID: actorID, + ReaderActorID: actorID, outputCh: outputCh, }, nil } @@ -187,14 +187,18 @@ func (ls *Sorter) Run(ctx context.Context) error { err = ctx.Err() case err = <-ls.errCh: } - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() atomic.StoreInt32(&ls.closed, 1) + // We should never lost message, make sure StopMessage is sent. + ctx1 := context.TODO() + // As the context can't be cancelled. SendB can only return an error + // ActorStopped or ActorNotFound, and they mean actors have closed. _ = ls.writerRouter.SendB( - ctx, ls.writerActorID, actormsg.StopMessage()) + ctx1, ls.writerActorID, actormsg.StopMessage()) _ = ls.readerRouter.SendB( - ctx, ls.readerActorID, actormsg.StopMessage()) + ctx1, ls.ReaderActorID, actormsg.StopMessage()) ls.closedWg.Wait() + + _ = ls.cleanup(ctx1) return errors.Trace(err) } @@ -247,22 +251,20 @@ func (ls *Sorter) Output() <-chan *model.PolymorphicEvent { // // TODO: Consider if we are sending too many msgs here. // It may waste CPU and be a bottleneck. - _ = ls.readerRouter.Send(ls.readerActorID, msg) + _ = ls.readerRouter.Send(ls.ReaderActorID, msg) return ls.outputCh } -// CleanupFunc returns a function that cleans up sorter's data. -func (ls *Sorter) CleanupFunc() func(context.Context) error { - return func(ctx context.Context) error { - task := message.Task{UID: ls.uid, TableID: ls.tableID} - task.DeleteReq = &message.DeleteRequest{ - // We do not set task.Delete.Count, because we don't know - // how many key-value pairs in the range. - Range: [2][]byte{ - encoding.EncodeTsKey(ls.uid, ls.tableID, 0), - encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0), - }, - } - return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task)) +// cleanup cleans up sorter's data. +func (ls *Sorter) cleanup(ctx context.Context) error { + task := message.Task{UID: ls.uid, TableID: ls.tableID} + task.DeleteReq = &message.DeleteRequest{ + // We do not set task.Delete.Count, because we don't know + // how many key-value pairs in the range. + Range: [2][]byte{ + encoding.EncodeTsKey(ls.uid, ls.tableID, 0), + encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0), + }, } + return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task)) } diff --git a/cdc/sorter/leveldb/sorter_test.go b/cdc/sorter/leveldb/sorter_test.go index 8fa6bcf485c..3cb286850dc 100644 --- a/cdc/sorter/leveldb/sorter_test.go +++ b/cdc/sorter/leveldb/sorter_test.go @@ -43,7 +43,7 @@ func newTestSorter(name string, capacity int) (Sorter, actor.Mailbox) { writerRouter: router, writerActorID: mb.ID(), readerRouter: router, - readerActorID: mb.ID(), + ReaderActorID: mb.ID(), } return s, mb } @@ -103,32 +103,12 @@ func TestOutput(t *testing.T) { }, task.SorterTask) } -func TestCleanupFunc(t *testing.T) { - t.Parallel() - - s, mb := newTestSorter(t.Name(), 1) - - fn := s.CleanupFunc() - require.Nil(t, fn(context.Background())) - task, ok := mb.Receive() - require.True(t, ok) - require.EqualValues(t, - message.Task{ - UID: s.uid, - TableID: s.tableID, - DeleteReq: &message.DeleteRequest{ - Range: [2][]byte{ - encoding.EncodeTsKey(s.uid, s.tableID, 0), - encoding.EncodeTsKey(s.uid, s.tableID+1, 0), - }, - }, - }, task.SorterTask) -} - func TestRunAndReportError(t *testing.T) { t.Parallel() - s, mb := newTestSorter(t.Name(), 2) + // Run exits with three messages + cap := 3 + s, mb := newTestSorter(t.Name(), cap) go func() { time.Sleep(100 * time.Millisecond) s.common.reportError( @@ -143,6 +123,24 @@ func TestRunAndReportError(t *testing.T) { msg, ok = mb.Receive() require.True(t, ok) require.EqualValues(t, actormsg.StopMessage(), msg) + // Cleanup + msg, ok = mb.Receive() + require.True(t, ok) + require.EqualValues(t, + message.Task{ + UID: s.uid, + TableID: s.tableID, + DeleteReq: &message.DeleteRequest{ + Range: [2][]byte{ + encoding.EncodeTsKey(s.uid, s.tableID, 0), + encoding.EncodeTsKey(s.uid, s.tableID+1, 0), + }, + }, + }, msg.SorterTask) + + // No more message. + msg, ok = mb.Receive() + require.False(t, ok) // Must be nonblock. s.common.reportError( diff --git a/cdc/sorter/leveldb/system/system.go b/cdc/sorter/leveldb/system/system.go index 25e344f6012..b8ac96471c7 100644 --- a/cdc/sorter/leveldb/system/system.go +++ b/cdc/sorter/leveldb/system/system.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/util/memory" lsorter "github.com/pingcap/tiflow/cdc/sorter/leveldb" "github.com/pingcap/tiflow/pkg/actor" - "github.com/pingcap/tiflow/pkg/actor/message" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/db" cerrors "github.com/pingcap/tiflow/pkg/errors" @@ -115,19 +114,6 @@ func (s *System) CompactScheduler() *lsorter.CompactScheduler { return s.compactSched } -// broadcase messages to actors in the router. -// Caveats it may lose messages quietly. -func (s *System) broadcast(ctx context.Context, router *actor.Router, msg message.Message) { - dbCount := s.cfg.Count - for id := 0; id < dbCount; id++ { - err := router.SendB(ctx, actor.ID(id), msg) - if err != nil { - log.Warn("broadcast message failed", - zap.Int("ID", id), zap.Any("message", msg)) - } - } -} - // Start starts a system. func (s *System) Start(ctx context.Context) error { s.stateMu.Lock() @@ -211,37 +197,26 @@ func (s *System) Stop() error { } s.state = sysStateStopped - // TODO caller should pass context. - deadline := time.Now().Add(1 * time.Second) - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() - // Close actors - s.broadcast(ctx, s.DBRouter, message.StopMessage()) - s.broadcast(ctx, s.WriterRouter, message.StopMessage()) - s.broadcast(ctx, s.ReaderRouter, message.StopMessage()) - s.broadcast(ctx, s.compactRouter, message.StopMessage()) - // Close metrics goroutine. - close(s.closedCh) - // Wait actors and metrics goroutine. - s.closedWg.Wait() - - // Stop systems. - err := s.dbSystem.Stop() + // Stop all actors and system to release resource. + err := s.WriterSystem.Stop() if err != nil { return errors.Trace(err) } - err = s.WriterSystem.Stop() + err = s.ReaderSystem.Stop() if err != nil { return errors.Trace(err) } - err = s.ReaderSystem.Stop() + // TODO: compact is not context-aware, it may block. + err = s.compactSystem.Stop() if err != nil { return errors.Trace(err) } - err = s.compactSystem.Stop() + err = s.dbSystem.Stop() if err != nil { return errors.Trace(err) } + // Close metrics goroutine. + close(s.closedCh) // Close dbs. for _, db := range s.dbs { @@ -250,6 +225,8 @@ func (s *System) Stop() error { log.Warn("db close error", zap.Error(err)) } } + // Wait actors and metrics goroutine. + s.closedWg.Wait() return nil } diff --git a/cdc/sorter/leveldb/system/system_test.go b/cdc/sorter/leveldb/system/system_test.go index 46961328dff..108c7cf7737 100644 --- a/cdc/sorter/leveldb/system/system_test.go +++ b/cdc/sorter/leveldb/system/system_test.go @@ -16,7 +16,12 @@ package system import ( "context" "testing" + "time" + "github.com/pingcap/tiflow/cdc/sorter/leveldb" + "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" + "github.com/pingcap/tiflow/pkg/actor" + actormsg "github.com/pingcap/tiflow/pkg/actor/message" "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" ) @@ -72,3 +77,90 @@ func TestDBActorID(t *testing.T) { require.Equal(t, id1, id2) require.Nil(t, sys.Stop()) } + +// Slow actor should not block system.Stop() forever. +func TestSystemStopSlowly(t *testing.T) { + t.Parallel() + ctx := context.Background() + cfg := config.GetDefaultServerConfig().Clone().Debug.DB + cfg.Count = 2 + + sys := NewSystem(t.TempDir(), 1, cfg) + require.Nil(t, sys.Start(ctx)) + msg := message.Task{Test: &message.Test{Sleep: 2 * time.Second}} + sys.DBRouter.Broadcast(ctx, actormsg.SorterMessage(msg)) + require.Nil(t, sys.Stop()) +} + +// Mailbox full should not cause system.Stop() being blocked forever. +func TestSystemStopMailboxFull(t *testing.T) { + t.Parallel() + ctx := context.Background() + cfg := config.GetDefaultServerConfig().Clone().Debug.DB + cfg.Count = 2 + + sys := NewSystem(t.TempDir(), 1, cfg) + require.Nil(t, sys.Start(ctx)) + msg := message.Task{Test: &message.Test{Sleep: 2 * time.Second}} + sys.DBRouter.Broadcast(ctx, actormsg.SorterMessage(msg)) + for { + err := sys.DBRouter.Send(actor.ID(1), actormsg.TickMessage()) + if err != nil { + break + } + } + require.Nil(t, sys.Stop()) +} + +func TestSystemStopWithManyTablesAndFewStragglers(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cfg := config.GetDefaultServerConfig().Clone().Debug.DB + cfg.Count = 8 + + sys := NewSystem(t.TempDir(), 1, cfg) + require.Nil(t, sys.Start(ctx)) + + ss := make([]*leveldb.Sorter, 0, 1000) + scancels := make([]context.CancelFunc, 0, 1000) + for i := uint64(0); i < 1000; i++ { + dbActorID := sys.DBActorID(i) + s, err := leveldb.NewSorter( + ctx, int64(i), i, sys.DBRouter, dbActorID, + sys.WriterSystem, sys.WriterRouter, + sys.ReaderSystem, sys.ReaderRouter, + sys.CompactScheduler(), cfg) + require.Nil(t, err) + ss = append(ss, s) + sctx, scancel := context.WithCancel(ctx) + scancels = append(scancels, scancel) + go func() { + _ = s.Run(sctx) + }() + } + time.Sleep(500 * time.Millisecond) + + // Close 100 tables ahead. + for i := 0; i < 100; i++ { + scancels[i]() + } + // 10 stragglers + for i := 100; i < 110; i++ { + id := ss[i].ReaderActorID + sleep := message.Task{Test: &message.Test{Sleep: 2 * time.Second}} + require.Nil(t, sys.ReaderRouter.SendB(ctx, id, actormsg.SorterMessage(sleep))) + if i%2 == 0 { + continue + } + // Make it channel full. + for { + err := sys.ReaderRouter.Send(id, actormsg.SorterMessage(message.Task{})) + if err != nil { + break + } + } + } + // Close system. + require.Nil(t, sys.Stop()) +}