Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4605
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
liuzix authored and ti-chi-bot committed Feb 24, 2022
1 parent 75dfaa2 commit 9dfbfd6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,14 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio
}

func (h *defaultEventHandle) Unregister() {
<<<<<<< HEAD
if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) {
=======
if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) {
// call synchronize so that the returning of Unregister cannot race
// with the calling of the errorHandler, if an error is already being processed.
h.worker.synchronize()
>>>>>>> 79be93739 (pkg/workerpool(cdc): fix race between handling error and unregistering handler (#4605))
// already cancelled
return
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/workerpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,42 @@ func (s *workerPoolSuite) TestCancelTimer(c *check.C) {
c.Assert(err, check.ErrorMatches, "context canceled")
}

<<<<<<< HEAD
func (s *workerPoolSuite) TestTimer(c *check.C) {
defer testleak.AfterTest(c)()
=======
func TestErrorAndCancelRace(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

pool := newDefaultPoolImpl(&defaultHasher{}, 4)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
return pool.Run(ctx)
})

var racedVar int
handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error {
return errors.New("fake")
}).OnExit(func(err error) {
time.Sleep(100 * time.Millisecond)
racedVar++
})

err := handle.AddEvent(ctx, 0)
require.NoError(t, err)

time.Sleep(50 * time.Millisecond)
handle.Unregister()
racedVar++

cancel()
err = errg.Wait()
require.Regexp(t, "context canceled", err)
}

func TestTimer(t *testing.T) {
>>>>>>> 79be93739 (pkg/workerpool(cdc): fix race between handling error and unregistering handler (#4605))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand Down

0 comments on commit 9dfbfd6

Please sign in to comment.