Skip to content

Commit

Permalink
pkg/workerpool(cdc): fix race between handling error and unregisterin…
Browse files Browse the repository at this point in the history
…g handler (#4605)

close #4447
  • Loading branch information
liuzix authored Feb 24, 2022
1 parent c019e9a commit 79be937
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio

func (h *defaultEventHandle) Unregister() {
if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) {
// call synchronize so that the returning of Unregister cannot race
// with the calling of the errorHandler, if an error is already being processed.
h.worker.synchronize()
// already cancelled
return
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/workerpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,36 @@ func TestCancelTimer(t *testing.T) {
require.Regexp(t, "context canceled", err)
}

func TestErrorAndCancelRace(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

pool := newDefaultPoolImpl(&defaultHasher{}, 4)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
return pool.Run(ctx)
})

var racedVar int
handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error {
return errors.New("fake")
}).OnExit(func(err error) {
time.Sleep(100 * time.Millisecond)
racedVar++
})

err := handle.AddEvent(ctx, 0)
require.NoError(t, err)

time.Sleep(50 * time.Millisecond)
handle.Unregister()
racedVar++

cancel()
err = errg.Wait()
require.Regexp(t, "context canceled", err)
}

func TestTimer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down

0 comments on commit 79be937

Please sign in to comment.