Skip to content

Commit

Permalink
context: refine error handler (#1468) (#1701)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored May 26, 2021
1 parent 2e9812c commit c8e5331
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 73 deletions.
16 changes: 8 additions & 8 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type TablePipeline interface {
Status() TableStatus
// Cancel stops this table pipeline immediately and destroy all resources created by this table pipeline
Cancel()
// Wait waits for all node destroyed and returns errors
Wait() []error
// Wait waits for table pipeline destroyed
Wait()
}

type tablePipelineImpl struct {
Expand Down Expand Up @@ -125,9 +125,9 @@ func (t *tablePipelineImpl) Cancel() {
t.cancel()
}

// Wait waits for all node destroyed and returns errors
func (t *tablePipelineImpl) Wait() []error {
return t.p.Wait()
// Wait waits for table pipeline destroyed
func (t *tablePipelineImpl) Wait() {
t.p.Wait()
}

// NewTablePipeline creates a table pipeline
Expand All @@ -145,7 +145,7 @@ func NewTablePipeline(ctx context.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
targetTs model.Ts) (context.Context, TablePipeline) {
targetTs model.Ts) TablePipeline {
ctx, cancel := context.WithCancel(ctx)
tablePipeline := &tablePipelineImpl{
tableID: tableID,
Expand All @@ -154,7 +154,7 @@ func NewTablePipeline(ctx context.Context,
cancel: cancel,
}

ctx, p := pipeline.NewPipeline(ctx, 500*time.Millisecond)
p := pipeline.NewPipeline(ctx, 500*time.Millisecond)
p.AppendNode(ctx, "puller", newPullerNode(changefeedID, credential, kvStorage, limitter, tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, changefeedID, tableName, tableID))
p.AppendNode(ctx, "mounter", newMounterNode(mounter))
Expand All @@ -165,5 +165,5 @@ func NewTablePipeline(ctx context.Context,
tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs)
p.AppendNode(ctx, "sink", tablePipeline.sinkNode)
tablePipeline.p = p
return ctx, tablePipeline
return tablePipeline
}
18 changes: 11 additions & 7 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,15 @@ func (p *processor) createTablePipelineImpl(ctx context.Context, tableID model.T
SchemaStorage: p.schemaStorage,
Config: p.changefeed.Info.Config,
})
cdcCtx = cdccontext.WithErrorHandler(cdcCtx, func(err error) error {
if cerror.ErrTableProcessorStoppedSafely.Equal(err) ||
errors.Cause(errors.Cause(err)) == context.Canceled {
return nil
}
p.sendError(err)
return nil
})

kvStorage, err := util.KVStorageFromCtx(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -680,7 +689,7 @@ func (p *processor) createTablePipelineImpl(ctx context.Context, tableID model.T
}
sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs)

_, table := tablepipeline.NewTablePipeline(
table := tablepipeline.NewTablePipeline(
cdcCtx,
p.changefeed.ID,
p.credential,
Expand All @@ -698,12 +707,7 @@ func (p *processor) createTablePipelineImpl(ctx context.Context, tableID model.T
p.wg.Add(1)
p.metricSyncTableNumGauge.Inc()
go func() {
for _, err := range table.Wait() {
if cerror.ErrTableProcessorStoppedSafely.Equal(err) || errors.Cause(err) == context.Canceled {
continue
}
p.sendError(err)
}
table.Wait()
p.wg.Done()
p.metricSyncTableNumGauge.Dec()
log.Debug("Table pipeline exited", zap.Int64("tableID", tableID),
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (m *mockTablePipeline) Cancel() {
m.canceled = true
}

func (m *mockTablePipeline) Wait() []error {
func (m *mockTablePipeline) Wait() {
panic("not implemented") // TODO: Implement
}

Expand Down
21 changes: 16 additions & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package context

import (
"context"
"log"

"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/pkg/config"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

// Vars contains some vars which can be used anywhere in a pipeline
Expand Down Expand Up @@ -73,7 +75,13 @@ func (ctx *rootContext) Vars() *Vars {
return ctx.vars
}

func (ctx *rootContext) Throw(error) { /* do nothing */ }
func (ctx *rootContext) Throw(err error) {
if err == nil {
return
}
// make sure all error has been catched
log.Panic("an error has escaped, please report a bug", zap.Error(err))
}

type stdContext struct {
stdCtx context.Context
Expand Down Expand Up @@ -104,11 +112,13 @@ func WithCancel(ctx Context) (Context, context.CancelFunc) {

type throwContext struct {
Context
f func(error)
f func(error) error
}

// WithErrorHandler creates a new context that can watch the Throw function
func WithErrorHandler(ctx Context, f func(error)) Context {
// if the function `f` specified in WithErrorHandler returns an error,
// the error will be thrown to the parent context.
func WithErrorHandler(ctx Context, f func(error) error) Context {
return &throwContext{
Context: ctx,
f: f,
Expand All @@ -119,6 +129,7 @@ func (ctx *throwContext) Throw(err error) {
if err == nil {
return
}
ctx.f(err)
ctx.Context.Throw(err)
if err := ctx.f(err); err != nil {
ctx.Context.Throw(err)
}
}
37 changes: 31 additions & 6 deletions pkg/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func (s *contextSuite) TestThrow(c *check.C) {
stdCtx := context.Background()
ctx := NewContext(stdCtx, &Vars{})
ctx, cancel := WithCancel(ctx)
ctx = WithErrorHandler(ctx, func(err error) {
ctx = WithErrorHandler(ctx, func(err error) error {
c.Assert(err.Error(), check.Equals, "mock error")
cancel()
return nil
})
ctx.Throw(nil)
ctx.Throw(errors.New("mock error"))
Expand All @@ -98,22 +99,46 @@ func (s *contextSuite) TestThrowCascade(c *check.C) {
stdCtx := context.Background()
ctx := NewContext(stdCtx, &Vars{})
ctx, cancel := WithCancel(ctx)
var errNum1, errNum2 int
ctx = WithErrorHandler(ctx, func(err error) {
var errNum1, errNum2, errNum3 int
ctx = WithErrorHandler(ctx, func(err error) error {
if err.Error() == "mock error" {
errNum1++
} else if err.Error() == "mock error2" {
errNum2++
} else {
c.Fail()
}
return nil
})
ctx2 := WithErrorHandler(ctx, func(err error) {
errNum2++
c.Assert(err.Error(), check.Equals, "mock error2")
ctx2 := WithErrorHandler(ctx, func(err error) error {
if err.Error() == "mock error2" {
errNum2++
return err
} else if err.Error() == "mock error3" {
errNum3++
} else {
c.Fail()
}
return nil
})
ctx2.Throw(errors.New("mock error2"))
ctx2.Throw(errors.New("mock error3"))
ctx.Throw(errors.New("mock error"))
c.Assert(errNum1, check.Equals, 1)
c.Assert(errNum2, check.Equals, 2)
c.Assert(errNum3, check.Equals, 1)
cancel()
<-ctx.Done()
}

func (s *contextSuite) TestThrowPanic(c *check.C) {
defer testleak.AfterTest(c)()
defer func() {
panicMsg := recover()
c.Assert(panicMsg, check.Equals, "an error has escaped, please report a bug{error 26 0 mock error}")
}()
stdCtx := context.Background()
ctx := NewContext(stdCtx, &Vars{})
ctx.Throw(nil)
ctx.Throw(errors.New("mock error"))
}
52 changes: 21 additions & 31 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,48 +34,47 @@ type Pipeline struct {
header headRunner
runners []runner
runnersWg sync.WaitGroup
errors []error
errorsMu sync.Mutex
closeMu sync.Mutex
isClosed bool
}

// NewPipeline creates a new pipeline
func NewPipeline(ctx context.Context, tickDuration time.Duration) (context.Context, *Pipeline) {
func NewPipeline(ctx context.Context, tickDuration time.Duration) *Pipeline {
header := make(headRunner, 4)
runners := make([]runner, 0, 16)
runners = append(runners, header)
p := &Pipeline{
header: header,
runners: runners,
}
ctx = context.WithErrorHandler(ctx, func(err error) {
p.addError(err)
p.close()
})
go func() {
var tickCh <-chan time.Time
if tickDuration > 0 {
ticker := time.NewTicker(tickDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.SendToFirstNode(TickMessage()) //nolint:errcheck
case <-ctx.Done():
p.close()
return
}
}
tickCh = ticker.C
} else {
<-ctx.Done()
p.close()
tickCh = make(chan time.Time)
}
for {
select {
case <-tickCh:
p.SendToFirstNode(TickMessage()) //nolint:errcheck
case <-ctx.Done():
p.close()
return
}
}
}()
return ctx, p
return p
}

// AppendNode appends the node to the pipeline
func (p *Pipeline) AppendNode(ctx context.Context, name string, node Node) {
ctx = context.WithErrorHandler(ctx, func(err error) error {
p.close()
return err
})
lastRunner := p.runners[len(p.runners)-1]
runner := newNodeRunner(name, node, lastRunner)
p.runners = append(p.runners, runner)
Expand All @@ -92,7 +91,7 @@ func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runne
}()
err := runner.run(ctx)
if err != nil {
p.addError(err)
ctx.Throw(err)
log.Error("found error when running the node", zap.String("name", runner.getName()), zap.Error(err))
}
}
Expand All @@ -118,16 +117,7 @@ func (p *Pipeline) close() {
}
}

func (p *Pipeline) addError(err error) {
p.errorsMu.Lock()
defer p.errorsMu.Unlock()
p.errors = append(p.errors, err)
}

// Wait all the nodes exited and return the errors found from nodes
func (p *Pipeline) Wait() []error {
// Wait all the nodes exited
func (p *Pipeline) Wait() {
p.runnersWg.Wait()
p.errorsMu.Lock()
defer p.errorsMu.Unlock()
return p.errors
}
Loading

0 comments on commit c8e5331

Please sign in to comment.