Skip to content

Commit

Permalink
pkg/errctx(engine): avoid creating goroutinue in errctx (#6061)
Browse files Browse the repository at this point in the history
ref #6013
  • Loading branch information
sleepymole authored Jun 30, 2022
1 parent ef49f2a commit 2c0d06e
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 91 deletions.
3 changes: 2 additions & 1 deletion engine/executor/cvs/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func newCvsTask(ctx *dcontext.Context, _workerID frameModel.WorkerID, masterID f
func (task *cvsTask) InitImpl(ctx context.Context) error {
log.L().Info("init the task ", zap.Any("task id :", task.ID()))
task.setStatusCode(frameModel.WorkerStatusNormal)
ctx, task.cancelFn = context.WithCancel(ctx)
// Don't use the ctx from the caller. Caller may cancel the ctx after InitImpl returns.
ctx, task.cancelFn = context.WithCancel(context.Background())
go func() {
err := task.Receive(ctx)
if err != nil {
Expand Down
22 changes: 14 additions & 8 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"time"

"github.com/gin-gonic/gin"
"go.uber.org/zap"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

runtime "github.com/pingcap/tiflow/engine/executor/worker"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/model"
Expand Down Expand Up @@ -158,7 +158,8 @@ func (d *DefaultBaseJobMaster) Logger() *zap.Logger {

// Init implements BaseJobMaster.Init
func (d *DefaultBaseJobMaster) Init(ctx context.Context) error {
ctx = d.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

if err := d.worker.doPreInit(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -191,7 +192,8 @@ func (d *DefaultBaseJobMaster) Init(ctx context.Context) error {

// Poll implements BaseJobMaster.Poll
func (d *DefaultBaseJobMaster) Poll(ctx context.Context) error {
ctx = d.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

if err := d.master.doPoll(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -262,7 +264,8 @@ func (d *DefaultBaseJobMaster) CreateWorker(workerType WorkerType, config Worker

// UpdateStatus delegates the UpdateStatus of inner worker
func (d *DefaultBaseJobMaster) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error {
ctx = d.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

return d.worker.UpdateStatus(ctx, status)
}
Expand All @@ -284,7 +287,8 @@ func (d *DefaultBaseJobMaster) JobMasterID() frameModel.MasterID {

// UpdateJobStatus implements BaseJobMaster.UpdateJobStatus
func (d *DefaultBaseJobMaster) UpdateJobStatus(ctx context.Context, status frameModel.WorkerStatus) error {
ctx = d.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

return d.worker.UpdateStatus(ctx, status)
}
Expand All @@ -300,7 +304,8 @@ func (d *DefaultBaseJobMaster) IsBaseJobMaster() {

// SendMessage delegates the SendMessage or inner worker
func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error {
ctx = d.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

// master will use WorkerHandle to send message
return d.worker.SendMessage(ctx, topic, message, nonblocking)
Expand All @@ -313,7 +318,8 @@ func (d *DefaultBaseJobMaster) IsMasterReady() bool {

// Exit implements BaseJobMaster.Exit
func (d *DefaultBaseJobMaster) Exit(ctx context.Context, status frameModel.WorkerStatus, err error) error {
ctx = d.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

var err1 error
switch status.Code {
Expand Down
30 changes: 19 additions & 11 deletions engine/framework/fake/fake_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
framework.BaseWorker

init bool
cancel context.CancelFunc
closed int32
status *dummyWorkerStatus
config *WorkerConfig
Expand All @@ -77,44 +78,47 @@ type (
)

type dummyWorkerStatus struct {
sync.RWMutex
rwm sync.RWMutex
BusinessID int `json:"business-id"`
Tick int64 `json:"tick"`
Checkpoint *workerCheckpoint `json:"checkpoint"`
}

func (s *dummyWorkerStatus) tick() {
s.Lock()
defer s.Unlock()
s.rwm.Lock()
defer s.rwm.Unlock()
s.Tick++
}

func (s *dummyWorkerStatus) getEtcdCheckpoint() workerCheckpoint {
s.RLock()
defer s.RUnlock()
s.rwm.RLock()
defer s.rwm.RUnlock()
return *s.Checkpoint
}

func (s *dummyWorkerStatus) setEtcdCheckpoint(ckpt *workerCheckpoint) {
s.Lock()
defer s.Unlock()
s.rwm.Lock()
defer s.rwm.Unlock()
s.Checkpoint = ckpt
}

func (s *dummyWorkerStatus) Marshal() ([]byte, error) {
s.RLock()
defer s.RUnlock()
s.rwm.RLock()
defer s.rwm.RUnlock()
return json.Marshal(s)
}

func (s *dummyWorkerStatus) Unmarshal(data []byte) error {
return json.Unmarshal(data, s)
}

func (d *dummyWorker) InitImpl(ctx context.Context) error {
func (d *dummyWorker) InitImpl(_ context.Context) error {
if !d.init {
if d.config.EtcdWatchEnable {
// Don't use the ctx from the caller, because it may be cancelled by the caller after InitImpl() returns.
ctx, cancel := context.WithCancel(context.Background())
d.bgRunEtcdWatcher(ctx)
d.cancel = cancel
}
d.init = true
d.setStatusCode(frameModel.WorkerStatusNormal)
Expand Down Expand Up @@ -205,7 +209,11 @@ func (d *dummyWorker) OnMasterMessage(topic p2p.Topic, message p2p.MessageValue)
}

func (d *dummyWorker) CloseImpl(ctx context.Context) error {
atomic.StoreInt32(&d.closed, 1)
if atomic.CompareAndSwapInt32(&d.closed, 0, 1) {
if d.cancel != nil {
d.cancel()
}
}
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions engine/framework/internal/master/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func (m *WorkerManager) InitAfterRecover(ctx context.Context) (retErr error) {
}
}()

ctx = m.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := m.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

m.mu.Lock()
if m.state != workerManagerLoadingMeta {
Expand Down Expand Up @@ -297,7 +298,8 @@ func (m *WorkerManager) Tick(ctx context.Context) error {

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
ctx = m.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel = m.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

for {
var event *masterEvent
Expand Down
17 changes: 11 additions & 6 deletions engine/framework/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/atomic"
"go.uber.org/dig"
"go.uber.org/zap"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/engine/client"
pb "github.com/pingcap/tiflow/engine/enginepb"
"github.com/pingcap/tiflow/engine/framework/config"
Expand Down Expand Up @@ -286,7 +286,8 @@ func (m *DefaultBaseMaster) Logger() *zap.Logger {

// Init implements BaseMaster.Init
func (m *DefaultBaseMaster) Init(ctx context.Context) error {
ctx = m.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := m.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

isInit, err := m.doInit(ctx)
if err != nil {
Expand Down Expand Up @@ -408,7 +409,8 @@ func (m *DefaultBaseMaster) registerMessageHandlers(ctx context.Context) error {

// Poll implements BaseMaster.Poll
func (m *DefaultBaseMaster) Poll(ctx context.Context) error {
ctx = m.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := m.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

if err := m.doPoll(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -578,8 +580,9 @@ func (m *DefaultBaseMaster) CreateWorker(
zap.Any("resources", resources),
zap.String("master-id", m.id))

ctx := m.errCenter.WithCancelOnFirstError(context.Background())
quotaCtx, cancel := context.WithTimeout(ctx, createWorkerWaitQuotaTimeout)
errCtx, cancel := m.errCenter.WithCancelOnFirstError(context.Background())
defer cancel()
quotaCtx, cancel := context.WithTimeout(errCtx, createWorkerWaitQuotaTimeout)
defer cancel()
if err := m.createWorkerQuota.Consume(quotaCtx); err != nil {
return "", derror.WrapError(derror.ErrMasterConcurrencyExceeded, err)
Expand All @@ -595,7 +598,9 @@ func (m *DefaultBaseMaster) CreateWorker(
m.createWorkerQuota.Release()
}()

requestCtx, cancel := context.WithTimeout(ctx, createWorkerTimeout)
errCtx, cancel := m.errCenter.WithCancelOnFirstError(context.Background())
defer cancel()
requestCtx, cancel := context.WithTimeout(errCtx, createWorkerTimeout)
defer cancel()

resp, err := m.serverMasterClient.ScheduleTask(requestCtx, &pb.ScheduleTaskRequest{
Expand Down
19 changes: 12 additions & 7 deletions engine/framework/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/dig"
"go.uber.org/zap"

"github.com/pingcap/errors"
"github.com/pingcap/log"
runtime "github.com/pingcap/tiflow/engine/executor/worker"
"github.com/pingcap/tiflow/engine/framework/config"
frameErrors "github.com/pingcap/tiflow/engine/framework/internal/errors"
Expand Down Expand Up @@ -212,7 +212,8 @@ func (w *DefaultBaseWorker) Workload() model.RescUnit {

// Init implements BaseWorker.Init
func (w *DefaultBaseWorker) Init(ctx context.Context) error {
ctx = w.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

if err := w.doPreInit(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -341,7 +342,8 @@ func (w *DefaultBaseWorker) doPoll(ctx context.Context) error {

// Poll implements BaseWorker.Poll
func (w *DefaultBaseWorker) Poll(ctx context.Context) error {
ctx = w.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

if err := w.doPoll(ctx); err != nil {
return err
Expand Down Expand Up @@ -436,7 +438,8 @@ func (w *DefaultBaseWorker) Logger() *zap.Logger {
// Note that if the master cannot handle the notifications fast enough, notifications
// can be lost.
func (w *DefaultBaseWorker) UpdateStatus(ctx context.Context, status frameModel.WorkerStatus) error {
ctx = w.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

w.workerStatus.Code = status.Code
w.workerStatus.ErrorMessage = status.ErrorMessage
Expand All @@ -456,7 +459,8 @@ func (w *DefaultBaseWorker) SendMessage(
nonblocking bool,
) error {
var err error
ctx = w.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx)
defer cancel()
if nonblocking {
_, err = w.messageSender.SendToNode(ctx, w.masterClient.MasterNode(), topic, message)
} else {
Expand All @@ -467,7 +471,8 @@ func (w *DefaultBaseWorker) SendMessage(

// OpenStorage implements BaseWorker.OpenStorage
func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resourcemeta.ResourceID) (broker.Handle, error) {
ctx = w.errCenter.WithCancelOnFirstError(ctx)
ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx)
defer cancel()
return w.resourceBroker.OpenStorage(ctx, w.projectInfo, w.id, w.masterID, resourcePath)
}

Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/example/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (w *exampleWorker) Tick(ctx context.Context) error {
count := w.work.tickCount
w.work.mu.Unlock()

storage, err := w.OpenStorage(nil, "/local/example")
storage, err := w.OpenStorage(ctx, "/local/example")
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 2c0d06e

Please sign in to comment.