Skip to content

Commit

Permalink
pipeline(ticdc): Init table actor (#4140)
Browse files Browse the repository at this point in the history
ref #3881
  • Loading branch information
sdojjy authored Feb 22, 2022
1 parent efc0658 commit 97c5305
Show file tree
Hide file tree
Showing 16 changed files with 696 additions and 150 deletions.
27 changes: 19 additions & 8 deletions cdc/processor/pipeline/actor_node_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ type actorNodeContext struct {
tickMessageThreshold int32
// noTickMessageCount is the count of pipeline message that no tick message is sent to actor
noTickMessageCount int32
tableName string
throw func(error)
}

func newContext(stdCtx sdtContext.Context,
tableName string,
tableActorRouter *actor.Router,
tableActorID actor.ID,
changefeedVars *context.ChangefeedVars,
globalVars *context.GlobalVars) *actorNodeContext {
globalVars *context.GlobalVars,
throw func(error)) *actorNodeContext {
return &actorNodeContext{
Context: stdCtx,
outputCh: make(chan pipeline.Message, defaultOutputChannelSize),
Expand All @@ -58,6 +62,8 @@ func newContext(stdCtx sdtContext.Context,
globalVars: globalVars,
tickMessageThreshold: messagesPerTick,
noTickMessageCount: 0,
tableName: tableName,
throw: throw,
}
}

Expand All @@ -74,18 +80,23 @@ func (c *actorNodeContext) ChangefeedVars() *context.ChangefeedVars {
}

func (c *actorNodeContext) Throw(err error) {
if err == nil {
return
}
log.Error("puller stopped", zap.Error(err))
_ = c.tableActorRouter.SendB(c, c.tableActorID, message.StopMessage())
// node error will be reported to processor, and then processor cancel table
c.throw(err)
}

// SendToNextNode send msg to the outputCh and notify the actor system,
// to reduce the actor message, only send tick message per threshold
func (c *actorNodeContext) SendToNextNode(msg pipeline.Message) {
c.outputCh <- msg
c.trySendTickMessage()
select {
// if the processor context is cancelled, return directly
// otherwise processor tick loop will be blocked if the chan is full, because actor is topped
case <-c.Context.Done():
log.Info("context is canceled",
zap.String("tableName", c.tableName),
zap.String("changefeed", c.changefeedVars.ID))
case c.outputCh <- msg:
c.trySendTickMessage()
}
}

func (c *actorNodeContext) TrySendToNextNode(msg pipeline.Message) bool {
Expand Down
41 changes: 13 additions & 28 deletions cdc/processor/pipeline/actor_node_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestContext(t *testing.T) {
t.Parallel()
ctx := newContext(sdtContext.TODO(), nil, 1, &context.ChangefeedVars{ID: "zzz"}, &context.GlobalVars{})
ctx := newContext(sdtContext.TODO(), t.Name(), nil, 1, &context.ChangefeedVars{ID: "zzz"}, &context.GlobalVars{}, throwDoNothing)
require.NotNil(t, ctx.GlobalVars())
require.Equal(t, "zzz", ctx.ChangefeedVars().ID)
require.Equal(t, actor.ID(1), ctx.tableActorID)
Expand All @@ -43,7 +43,7 @@ func TestContext(t *testing.T) {

func TestTryGetProcessedMessageFromChan(t *testing.T) {
t.Parallel()
ctx := newContext(sdtContext.TODO(), nil, 1, nil, nil)
ctx := newContext(sdtContext.TODO(), t.Name(), nil, 1, nil, nil, throwDoNothing)
ctx.outputCh = make(chan pipeline.Message, 1)
require.Nil(t, ctx.tryGetProcessedMessage())
ctx.outputCh <- pipeline.TickMessage()
Expand All @@ -54,36 +54,18 @@ func TestTryGetProcessedMessageFromChan(t *testing.T) {

func TestThrow(t *testing.T) {
t.Parallel()
ctx, cancel := sdtContext.WithCancel(sdtContext.TODO())
sys := system.NewSystem()
defer func() {
cancel()
require.Nil(t, sys.Stop())
}()

require.Nil(t, sys.Start(ctx))
actorID := sys.ActorID("abc", 1)
mb := actor.NewMailbox(actorID, defaultOutputChannelSize)
ch := make(chan message.Message, defaultOutputChannelSize)
fa := &forwardActor{ch: ch}
require.Nil(t, sys.System().Spawn(mb, fa))
actorContext := newContext(ctx, sys.Router(), actorID, nil, nil)
a := 0
tf := func(err error) { a++ }
actorContext := newContext(sdtContext.TODO(), t.Name(), nil, 1, nil, nil, tf)
actorContext.Throw(nil)
time.Sleep(100 * time.Millisecond)
require.Equal(t, 0, len(ch))
require.Equal(t, 1, a)
actorContext.Throw(errors.New("error"))
tick := time.After(500 * time.Millisecond)
select {
case <-tick:
t.Fatal("timeout")
case m := <-ch:
require.Equal(t, message.TypeStop, m.Tp)
}
require.Equal(t, 2, a)
}

func TestActorNodeContextTrySendToNextNode(t *testing.T) {
t.Parallel()
ctx := newContext(sdtContext.TODO(), nil, 1, &context.ChangefeedVars{ID: "zzz"}, &context.GlobalVars{})
ctx := newContext(sdtContext.TODO(), t.Name(), nil, 1, &context.ChangefeedVars{ID: "zzz"}, &context.GlobalVars{}, throwDoNothing)
ctx.outputCh = make(chan pipeline.Message, 1)
require.True(t, ctx.TrySendToNextNode(pipeline.BarrierMessage(1)))
require.False(t, ctx.TrySendToNextNode(pipeline.BarrierMessage(1)))
Expand All @@ -102,12 +84,12 @@ func TestSendToNextNodeNoTickMessage(t *testing.T) {
}()

require.Nil(t, sys.Start(ctx))
actorID := sys.ActorID("abc", 1)
actorID := sys.ActorID()
mb := actor.NewMailbox(actorID, defaultOutputChannelSize)
ch := make(chan message.Message, defaultOutputChannelSize)
fa := &forwardActor{ch: ch}
require.Nil(t, sys.System().Spawn(mb, fa))
actorContext := newContext(ctx, sys.Router(), actorID, nil, nil)
actorContext := newContext(ctx, t.Name(), sys.Router(), actorID, &context.ChangefeedVars{ID: "abc"}, &context.GlobalVars{}, throwDoNothing)
actorContext.setTickMessageThreshold(2)
actorContext.SendToNextNode(pipeline.BarrierMessage(1))
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -157,3 +139,6 @@ func wait(t *testing.T, timeout time.Duration, f func()) {
t.Fatal("Timed out")
}
}

func throwDoNothing(_ error) {
}
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/cyclic_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type cyclicMarkNode struct {
isTableActorMode bool
}

func newCyclicMarkNode(markTableID model.TableID) pipeline.Node {
func newCyclicMarkNode(markTableID model.TableID) *cyclicMarkNode {
return &cyclicMarkNode{
markTableID: markTableID,
unknownReplicaIDEvents: make(map[model.Ts][]*model.PolymorphicEvent),
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestCyclicMarkNode(t *testing.T) {

// table actor
for _, tc := range testCases {
ctx := newCyclicNodeContext(newContext(context.TODO(), nil, 1, &cdcContext.ChangefeedVars{
ctx := newCyclicNodeContext(newContext(context.TODO(), "a.test", nil, 1, &cdcContext.ChangefeedVars{
Info: &model.ChangeFeedInfo{
Config: &config.ReplicaConfig{
Cyclic: &config.CyclicConfig{
Expand All @@ -203,8 +203,8 @@ func TestCyclicMarkNode(t *testing.T) {
},
},
},
}, nil))
n := newCyclicMarkNode(markTableID).(*cyclicMarkNode)
}, nil, throwDoNothing))
n := newCyclicMarkNode(markTableID)
err := n.Init(ctx)
require.Nil(t, err)
output := []*model.RowChangedEvent{}
Expand Down
12 changes: 8 additions & 4 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type pullerNode struct {
func newPullerNode(
tableID model.TableID, replicaInfo *model.TableReplicaInfo,
tableName, changefeed string,
) pipeline.Node {
) *pullerNode {
return &pullerNode{
tableID: tableID,
replicaInfo: replicaInfo,
Expand All @@ -61,10 +61,10 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
}

func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
return n.InitWithWaitGroup(ctx, new(errgroup.Group))
return n.start(ctx, new(errgroup.Group), false, nil)
}

func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Group) error {
func (n *pullerNode) start(ctx pipeline.NodeContext, wg *errgroup.Group, isActorMode bool, sorter *sorterNode) error {
n.wg = wg
ctxC, cancel := context.WithCancel(ctx)
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
Expand Down Expand Up @@ -95,7 +95,11 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr
continue
}
pEvent := model.NewPolymorphicEvent(rawKV)
ctx.SendToNextNode(pipeline.PolymorphicEventMessage(pEvent))
if isActorMode {
sorter.handleRawEvent(ctx, pEvent)
} else {
ctx.SendToNextNode(pipeline.PolymorphicEventMessage(pEvent))
}
}
}
})
Expand Down
24 changes: 18 additions & 6 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,18 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target

func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
return n.InitWithReplicaConfig(false, ctx.ChangefeedVars().Info.Config)
n.initWithReplicaConfig(false, ctx.ChangefeedVars().Info.Config)
return nil
}

func (n *sinkNode) InitWithReplicaConfig(isTableActorMode bool, replicaConfig *config.ReplicaConfig) error {
n.replicaConfig = replicaConfig
func (n *sinkNode) initWithReplicaConfig(isTableActorMode bool, replicaConfig *config.ReplicaConfig) {
n.isTableActorMode = isTableActorMode
return nil
n.replicaConfig = replicaConfig
}

// stop is called when sink receives a stop command or checkpointTs reaches targetTs.
Expand Down Expand Up @@ -351,15 +352,26 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pipeline.Message) (boo
}
}
case pipeline.MessageTypeBarrier:
n.barrierTs = msg.BarrierTs
if err := n.flushSink(ctx, n.resolvedTs); err != nil {
if err := n.updateBarrierTs(ctx, msg.BarrierTs); err != nil {
return false, errors.Trace(err)
}
}
return true, nil
}

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, n.resolvedTs); err != nil {
return errors.Trace(err)
}
return nil
}

func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
return n.releaseResource(ctx)
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStatusStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func TestStatus(t *testing.T) {

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)))
require.Equal(t, TableStatusInitializing, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{}}), nil)))
Expand All @@ -160,6 +161,7 @@ func TestStatus(t *testing.T) {

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)))
require.Equal(t, TableStatusInitializing, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)))
Expand Down
Loading

0 comments on commit 97c5305

Please sign in to comment.