Skip to content

Commit

Permalink
notifier(engine): fix Flush deadloop in closed notifier (#7509)
Browse files Browse the repository at this point in the history
close #7506
  • Loading branch information
amyangfei authored Nov 2, 2022
1 parent 500ff82 commit de6ea32
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 80 deletions.
2 changes: 2 additions & 0 deletions engine/pkg/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (n *Notifier[T]) Flush(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-n.closeCh:
return nil
case <-n.synchronizeCh:
// Checks the queue size after each iteration
// of run().
Expand Down
10 changes: 10 additions & 0 deletions engine/pkg/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,13 @@ func TestReceiverClose(t *testing.T) {
}
n.Close()
}

func TestFlushWithClosedNotifier(t *testing.T) {
t.Parallel()

n := NewNotifier[int]()
n.Notify(1)
n.Close()
err := n.Flush(context.Background())
require.Nil(t, err)
}
49 changes: 18 additions & 31 deletions engine/servermaster/executor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ type ExecutorManager interface {
HasExecutor(executorID string) bool
ListExecutors() []*ormModel.Executor
GetAddr(executorID model.ExecutorID) (string, bool)
Start(ctx context.Context) error
Stop()
Run(ctx context.Context) error

// WatchExecutors returns a snapshot of all online executors plus
// a stream of events describing changes that happen to the executors
Expand Down Expand Up @@ -282,41 +281,29 @@ func (e *Executor) statusEqual(status model.ExecutorStatus) bool {
return e.status == status
}

// Start implements ExecutorManager.Start. It starts a background goroutine to
// check whether all executors are alive periodically.
func (e *ExecutorManagerImpl) Start(ctx context.Context) error {
// Run implements ExecutorManager.Run
func (e *ExecutorManagerImpl) Run(ctx context.Context) error {
if err := e.resetExecutors(ctx); err != nil {
return perrors.Errorf("failed to reset executors: %v", err)
}

e.wg.Add(1)
go func() {
defer e.wg.Done()
ticker := time.NewTicker(e.keepAliveInterval)
defer func() {
ticker.Stop()
log.Info("check executor alive finished")
}()
for {
select {
case <-ticker.C:
err := e.checkAliveImpl()
if err != nil {
log.Info("check alive meet error", zap.Error(err))
}
case <-ctx.Done():
return
ticker := time.NewTicker(e.keepAliveInterval)
defer func() {
ticker.Stop()
e.notifier.Close()
log.Info("executor manager exited")
}()
for {
select {
case <-ctx.Done():
return perrors.Trace(ctx.Err())
case <-ticker.C:
err := e.checkAliveImpl()
if err != nil {
log.Info("check alive meet error", zap.Error(err))
}
}
}()

return nil
}

// Stop implements ExecutorManager.Stop
func (e *ExecutorManagerImpl) Stop() {
e.wg.Wait()
e.notifier.Close()
}
}

func (e *ExecutorManagerImpl) checkAliveImpl() error {
Expand Down
20 changes: 15 additions & 5 deletions engine/servermaster/executor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ func TestExecutorManager(t *testing.T) {
metaClient.EXPECT().QueryExecutors(gomock.Any()).Times(1).Return([]*ormModel.Executor{}, nil)
metaClient.EXPECT().DeleteExecutor(gomock.Any(), executor.ID).Times(1).Return(nil)

mgr.Start(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
mgr.Run(ctx)
}()

require.Eventually(t, func() bool {
return mgr.ExecutorCount(model.Running) == 0
Expand All @@ -94,7 +99,7 @@ func TestExecutorManager(t *testing.T) {
require.True(t, ErrUnknownExecutor.Is(err))

cancel()
mgr.Stop()
wg.Wait()
}

func TestExecutorManagerWatch(t *testing.T) {
Expand Down Expand Up @@ -204,7 +209,12 @@ func TestExecutorManagerWatch(t *testing.T) {
metaClient.EXPECT().DeleteExecutor(gomock.Any(), executorID1).Times(1).Return(nil)
metaClient.EXPECT().DeleteExecutor(gomock.Any(), executorID2).Times(1).Return(nil)

mgr.Start(ctx)
var mgrWg sync.WaitGroup
mgrWg.Add(1)
go func() {
defer mgrWg.Done()
mgr.Run(ctx)
}()

// mgr.Start will reset executors first, so there will be two online events.
event = <-stream.C
Expand All @@ -220,8 +230,8 @@ func TestExecutorManagerWatch(t *testing.T) {
Addr: "127.0.0.1:10002",
}, event)

require.Equal(t, 0, mgr.ExecutorCount(model.Running))
var wg sync.WaitGroup
require.Equal(t, 0, mgr.ExecutorCount(model.Running))
cancel1 := bgExecutorHeartbeat(ctx, &wg, executorID1)
cancel2 := bgExecutorHeartbeat(ctx, &wg, executorID2)
require.Equal(t, 2, mgr.ExecutorCount(model.Running))
Expand Down Expand Up @@ -250,5 +260,5 @@ func TestExecutorManagerWatch(t *testing.T) {
}, event)

cancel()
mgr.Stop()
mgrWg.Wait()
}
55 changes: 16 additions & 39 deletions engine/servermaster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,30 +424,6 @@ func (s *Server) ReportExecutorWorkload(
return &pb.ExecWorkloadResponse{}, nil
}

// Stop and clean resources.
// TODO: implement stop gracefully.
func (s *Server) Stop() {
if s.mockGrpcServer != nil {
s.mockGrpcServer.Stop()
}
// in some tests this fields is not initialized
if s.masterCli != nil {
s.masterCli.Close()
}
if s.frameMetaClient != nil {
s.frameMetaClient.Close()
}
if s.frameworkClientConn != nil {
s.frameworkClientConn.Close()
}
if s.businessClientConn != nil {
s.businessClientConn.Close()
}
if s.executorManager != nil {
s.executorManager.Stop()
}
}

// Run the server master.
func (s *Server) Run(ctx context.Context) error {
err := s.registerMetaStore(ctx)
Expand All @@ -460,20 +436,13 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Trace(err)
}

// resourceManagerService relies on meta store
s.initResourceManagerService()

if err := broker.PreCheckConfig(s.cfg.Storage); err != nil {
return err
}

// executorMetaClient needs to be initialized after frameMetaClient is initialized.
s.executorManager = NewExecutorManagerImpl(s.frameMetaClient, s.cfg.KeepAliveTTL, s.cfg.KeepAliveInterval)

// ResourceManagerService should be initialized after registerMetaStore.
// FIXME: We should do these work inside NewServer.
s.initResourceManagerService()
s.scheduler = scheduler.NewScheduler(
s.executorManager,
s.resourceManagerService)

wg, ctx := errgroup.WithContext(ctx)

wg.Go(func() error {
Expand Down Expand Up @@ -833,6 +802,18 @@ func (s *Server) runLeaderService(ctx context.Context) (err error) {
log.Info("job manager exited")
}()

// The following member variables are used in leader only and released after
// the leader is resigned, so initialize these variables in this function,
// instead of initializing them in the NewServer or Server.Run

// executorMetaClient needs to be initialized after frameMetaClient is initialized.
s.executorManager = NewExecutorManagerImpl(s.frameMetaClient, s.cfg.KeepAliveTTL, s.cfg.KeepAliveInterval)

// ResourceManagerService should be initialized after registerMetaStore.
s.scheduler = scheduler.NewScheduler(
s.executorManager,
s.resourceManagerService)

s.gcRunner = externRescManager.NewGCRunner(s.frameMetaClient, executorClients, &s.cfg.Storage)
s.gcCoordinator = externRescManager.NewGCCoordinator(s.executorManager, s.jobManager, s.frameMetaClient, s.gcRunner)

Expand All @@ -847,11 +828,7 @@ func (s *Server) runLeaderService(ctx context.Context) (err error) {
})

errg.Go(func() error {
defer func() {
s.executorManager.Stop()
log.Info("executor manager exited")
}()
return s.executorManager.Start(errgCtx)
return s.executorManager.Run(errgCtx)
})

errg.Go(func() error {
Expand Down
5 changes: 0 additions & 5 deletions engine/servermaster/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,6 @@ type mockExecutorManager struct {
count map[model.ExecutorStatus]int
}

func (m *mockExecutorManager) Stop() {
}

func (m *mockExecutorManager) ExecutorCount(status model.ExecutorStatus) int {
m.executorMu.RLock()
defer m.executorMu.RUnlock()
Expand Down Expand Up @@ -232,7 +229,6 @@ func TestCollectMetric(t *testing.T) {

cancel()
wg.Wait()
s.Stop()
}

func testCustomedPrometheusMetrics(t *testing.T, addr string) {
Expand Down Expand Up @@ -322,5 +318,4 @@ func TestHTTPErrorHandler(t *testing.T) {

cancel()
wg.Wait()
s.Stop()
}

0 comments on commit de6ea32

Please sign in to comment.