Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-flush-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 15, 2022
2 parents e8287f5 + d62e2e7 commit c210381
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 88 deletions.
12 changes: 1 addition & 11 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type sorterNode struct {
eg *errgroup.Group
cancel context.CancelFunc

cleanup func(context.Context) error

// The latest resolved ts that sorter has received.
resolvedTs model.Ts

Expand Down Expand Up @@ -119,7 +117,6 @@ func (n *sorterNode) start(ctx pipeline.NodeContext, isTableActorMode bool, eg *
if err != nil {
return errors.Trace(err)
}
n.cleanup = levelSorter.CleanupFunc()
eventSorter = levelSorter
} else {
// Sorter dir has been set and checked when server starts.
Expand Down Expand Up @@ -304,15 +301,8 @@ func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) {
}
}

func (n *sorterNode) releaseResource(ctx context.Context, changefeedID string) {
func (n *sorterNode) releaseResource(_ context.Context, changefeedID string) {
defer tableMemoryHistogram.DeleteLabelValues(changefeedID)
if n.cleanup != nil {
// Clean up data when the table sorter is canceled.
err := n.cleanup(ctx)
if err != nil {
log.Warn("schedule table cleanup task failed", zap.Error(err))
}
}
// Since the flowController is implemented by `Cond`, it is not cancelable by a context
// the flowController will be blocked in a background goroutine,
// We need to abort the flowController manually in the nodeRunner
Expand Down
3 changes: 3 additions & 0 deletions cdc/sorter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (ldb *DBActor) Poll(ctx context.Context, tasks []actormsg.Message) bool {
ldb.iterQ.push(task.UID, task.TableID, task.IterReq)
requireIter = true
}
if task.Test != nil {
time.Sleep(task.Test.Sleep)
}
}

// Force write only if there is a task requires an iterator.
Expand Down
9 changes: 9 additions & 0 deletions cdc/sorter/leveldb/message/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package message

import (
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -44,6 +45,9 @@ type Task struct {
// Deletes all of the key-values in the range.
// reader -> leveldb and leveldb -> compactor
DeleteReq *DeleteRequest

// A test message.
Test *Test
}

// DeleteRequest a request to delete range.
Expand Down Expand Up @@ -72,6 +76,11 @@ type IterRequest struct {
IterCallback func(*LimitedIterator) `json:"-"` // Make Task JSON printable.
}

// Test is a message for testing actors.
type Test struct {
Sleep time.Duration
}

// Key is the key that is written to db.
type Key string

Expand Down
5 changes: 5 additions & 0 deletions cdc/sorter/leveldb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ func (r *reader) Poll(ctx context.Context, msgs []actormsg.Message) (running boo
// Update the max commit ts and resolved ts of all received events.
ts := msgs[i].SorterTask.ReadTs
r.state.advanceMaxTs(ts.MaxCommitTs, ts.MaxResolvedTs)

// Test only message.
if msgs[i].SorterTask.Test != nil {
time.Sleep(msgs[i].SorterTask.Test.Sleep)
}
}

// Length of buffered resolved events.
Expand Down
42 changes: 22 additions & 20 deletions cdc/sorter/leveldb/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Sorter struct {
writerActorID actor.ID

readerRouter *actor.Router
readerActorID actor.ID
ReaderActorID actor.ID

outputCh chan *model.PolymorphicEvent

Expand Down Expand Up @@ -174,7 +174,7 @@ func NewSorter(
writerRouter: writerRouter,
writerActorID: actorID,
readerRouter: readerRouter,
readerActorID: actorID,
ReaderActorID: actorID,
outputCh: outputCh,
}, nil
}
Expand All @@ -187,14 +187,18 @@ func (ls *Sorter) Run(ctx context.Context) error {
err = ctx.Err()
case err = <-ls.errCh:
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
atomic.StoreInt32(&ls.closed, 1)
// We should never lost message, make sure StopMessage is sent.
ctx1 := context.TODO()
// As the context can't be cancelled. SendB can only return an error
// ActorStopped or ActorNotFound, and they mean actors have closed.
_ = ls.writerRouter.SendB(
ctx, ls.writerActorID, actormsg.StopMessage())
ctx1, ls.writerActorID, actormsg.StopMessage())
_ = ls.readerRouter.SendB(
ctx, ls.readerActorID, actormsg.StopMessage())
ctx1, ls.ReaderActorID, actormsg.StopMessage())
ls.closedWg.Wait()

_ = ls.cleanup(ctx1)
return errors.Trace(err)
}

Expand Down Expand Up @@ -247,22 +251,20 @@ func (ls *Sorter) Output() <-chan *model.PolymorphicEvent {
//
// TODO: Consider if we are sending too many msgs here.
// It may waste CPU and be a bottleneck.
_ = ls.readerRouter.Send(ls.readerActorID, msg)
_ = ls.readerRouter.Send(ls.ReaderActorID, msg)
return ls.outputCh
}

// CleanupFunc returns a function that cleans up sorter's data.
func (ls *Sorter) CleanupFunc() func(context.Context) error {
return func(ctx context.Context) error {
task := message.Task{UID: ls.uid, TableID: ls.tableID}
task.DeleteReq = &message.DeleteRequest{
// We do not set task.Delete.Count, because we don't know
// how many key-value pairs in the range.
Range: [2][]byte{
encoding.EncodeTsKey(ls.uid, ls.tableID, 0),
encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0),
},
}
return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task))
// cleanup cleans up sorter's data.
func (ls *Sorter) cleanup(ctx context.Context) error {
task := message.Task{UID: ls.uid, TableID: ls.tableID}
task.DeleteReq = &message.DeleteRequest{
// We do not set task.Delete.Count, because we don't know
// how many key-value pairs in the range.
Range: [2][]byte{
encoding.EncodeTsKey(ls.uid, ls.tableID, 0),
encoding.EncodeTsKey(ls.uid, ls.tableID+1, 0),
},
}
return ls.dbRouter.SendB(ctx, ls.dbActorID, actormsg.SorterMessage(task))
}
46 changes: 22 additions & 24 deletions cdc/sorter/leveldb/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newTestSorter(name string, capacity int) (Sorter, actor.Mailbox) {
writerRouter: router,
writerActorID: mb.ID(),
readerRouter: router,
readerActorID: mb.ID(),
ReaderActorID: mb.ID(),
}
return s, mb
}
Expand Down Expand Up @@ -103,32 +103,12 @@ func TestOutput(t *testing.T) {
}, task.SorterTask)
}

func TestCleanupFunc(t *testing.T) {
t.Parallel()

s, mb := newTestSorter(t.Name(), 1)

fn := s.CleanupFunc()
require.Nil(t, fn(context.Background()))
task, ok := mb.Receive()
require.True(t, ok)
require.EqualValues(t,
message.Task{
UID: s.uid,
TableID: s.tableID,
DeleteReq: &message.DeleteRequest{
Range: [2][]byte{
encoding.EncodeTsKey(s.uid, s.tableID, 0),
encoding.EncodeTsKey(s.uid, s.tableID+1, 0),
},
},
}, task.SorterTask)
}

func TestRunAndReportError(t *testing.T) {
t.Parallel()

s, mb := newTestSorter(t.Name(), 2)
// Run exits with three messages
cap := 3
s, mb := newTestSorter(t.Name(), cap)
go func() {
time.Sleep(100 * time.Millisecond)
s.common.reportError(
Expand All @@ -143,6 +123,24 @@ func TestRunAndReportError(t *testing.T) {
msg, ok = mb.Receive()
require.True(t, ok)
require.EqualValues(t, actormsg.StopMessage(), msg)
// Cleanup
msg, ok = mb.Receive()
require.True(t, ok)
require.EqualValues(t,
message.Task{
UID: s.uid,
TableID: s.tableID,
DeleteReq: &message.DeleteRequest{
Range: [2][]byte{
encoding.EncodeTsKey(s.uid, s.tableID, 0),
encoding.EncodeTsKey(s.uid, s.tableID+1, 0),
},
},
}, msg.SorterTask)

// No more message.
msg, ok = mb.Receive()
require.False(t, ok)

// Must be nonblock.
s.common.reportError(
Expand Down
43 changes: 10 additions & 33 deletions cdc/sorter/leveldb/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/util/memory"
lsorter "github.com/pingcap/tiflow/cdc/sorter/leveldb"
"github.com/pingcap/tiflow/pkg/actor"
"github.com/pingcap/tiflow/pkg/actor/message"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/db"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -115,19 +114,6 @@ func (s *System) CompactScheduler() *lsorter.CompactScheduler {
return s.compactSched
}

// broadcase messages to actors in the router.
// Caveats it may lose messages quietly.
func (s *System) broadcast(ctx context.Context, router *actor.Router, msg message.Message) {
dbCount := s.cfg.Count
for id := 0; id < dbCount; id++ {
err := router.SendB(ctx, actor.ID(id), msg)
if err != nil {
log.Warn("broadcast message failed",
zap.Int("ID", id), zap.Any("message", msg))
}
}
}

// Start starts a system.
func (s *System) Start(ctx context.Context) error {
s.stateMu.Lock()
Expand Down Expand Up @@ -211,37 +197,26 @@ func (s *System) Stop() error {
}
s.state = sysStateStopped

// TODO caller should pass context.
deadline := time.Now().Add(1 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// Close actors
s.broadcast(ctx, s.DBRouter, message.StopMessage())
s.broadcast(ctx, s.WriterRouter, message.StopMessage())
s.broadcast(ctx, s.ReaderRouter, message.StopMessage())
s.broadcast(ctx, s.compactRouter, message.StopMessage())
// Close metrics goroutine.
close(s.closedCh)
// Wait actors and metrics goroutine.
s.closedWg.Wait()

// Stop systems.
err := s.dbSystem.Stop()
// Stop all actors and system to release resource.
err := s.WriterSystem.Stop()
if err != nil {
return errors.Trace(err)
}
err = s.WriterSystem.Stop()
err = s.ReaderSystem.Stop()
if err != nil {
return errors.Trace(err)
}
err = s.ReaderSystem.Stop()
// TODO: compact is not context-aware, it may block.
err = s.compactSystem.Stop()
if err != nil {
return errors.Trace(err)
}
err = s.compactSystem.Stop()
err = s.dbSystem.Stop()
if err != nil {
return errors.Trace(err)
}
// Close metrics goroutine.
close(s.closedCh)

// Close dbs.
for _, db := range s.dbs {
Expand All @@ -250,6 +225,8 @@ func (s *System) Stop() error {
log.Warn("db close error", zap.Error(err))
}
}
// Wait actors and metrics goroutine.
s.closedWg.Wait()
return nil
}

Expand Down
Loading

0 comments on commit c210381

Please sign in to comment.