diff --git a/pkg/workerpool/pool_impl.go b/pkg/workerpool/pool_impl.go index 97af0aaec33..254a2df9f1f 100644 --- a/pkg/workerpool/pool_impl.go +++ b/pkg/workerpool/pool_impl.go @@ -158,7 +158,14 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio } func (h *defaultEventHandle) Unregister() { +<<<<<<< HEAD if !atomic.CompareAndSwapInt32(&h.isCancelled, 0, 1) { +======= + if !atomic.CompareAndSwapInt32(&h.status, handleRunning, handleCancelled) { + // call synchronize so that the returning of Unregister cannot race + // with the calling of the errorHandler, if an error is already being processed. + h.worker.synchronize() +>>>>>>> 79be93739 (pkg/workerpool(cdc): fix race between handling error and unregistering handler (#4605)) // already cancelled return } diff --git a/pkg/workerpool/pool_test.go b/pkg/workerpool/pool_test.go index 729c20d7a31..cd5678bda79 100644 --- a/pkg/workerpool/pool_test.go +++ b/pkg/workerpool/pool_test.go @@ -271,8 +271,42 @@ func (s *workerPoolSuite) TestCancelTimer(c *check.C) { c.Assert(err, check.ErrorMatches, "context canceled") } +<<<<<<< HEAD func (s *workerPoolSuite) TestTimer(c *check.C) { defer testleak.AfterTest(c)() +======= +func TestErrorAndCancelRace(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + pool := newDefaultPoolImpl(&defaultHasher{}, 4) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return pool.Run(ctx) + }) + + var racedVar int + handle := pool.RegisterEvent(func(ctx context.Context, event interface{}) error { + return errors.New("fake") + }).OnExit(func(err error) { + time.Sleep(100 * time.Millisecond) + racedVar++ + }) + + err := handle.AddEvent(ctx, 0) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + handle.Unregister() + racedVar++ + + cancel() + err = errg.Wait() + require.Regexp(t, "context canceled", err) +} + +func TestTimer(t *testing.T) { +>>>>>>> 79be93739 (pkg/workerpool(cdc): fix race between handling error and unregistering handler (#4605)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel()