Skip to content

Commit

Permalink
fix: avoid potential hang when starting event listener
Browse files Browse the repository at this point in the history
It was possible for NewEvents to never return, blocked on waiting for a WaitGroup to be done.
The call to Done was in a goroutine that could exit before reaching the Done call.

Replace the WaitGroup with a channel that is closed to signal that initialisation is complete.
Also, while we are waiting on the channel, wait on the context so we can exit clealy if the
context is canceled.
  • Loading branch information
iand committed Dec 9, 2020
1 parent a999e41 commit 696469a
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
var log = logging.Logger("events")

// HeightHandler `curH`-`ts.Height` = `confidence`
type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
type RevertHandler func(ctx context.Context, ts *types.TipSet) error
type (
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)

type heightHandler struct {
confidence int
Expand All @@ -48,7 +50,7 @@ type Events struct {
tsc *tipSetCache
lk sync.Mutex

ready sync.WaitGroup
ready chan struct{}
readyOnce sync.Once

heightEvents
Expand Down Expand Up @@ -76,15 +78,16 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
}

e.ready.Add(1)

go e.listenHeadChanges(ctx)

e.ready.Wait()

// TODO: cleanup/gc goroutine
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}

return e
}
Expand All @@ -111,13 +114,21 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {

notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}

cur, ok := <-notifs // TODO: timeout?
if !ok {
return xerrors.Errorf("notification channel closed")
var cur []*api.HeadChange
var ok bool

// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}

if len(cur) != 1 {
Expand All @@ -134,8 +145,8 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {

e.readyOnce.Do(func() {
e.lastTs = cur[0].Val

e.ready.Done()
// Signal that we have seen first tipset
close(e.ready)
})

for notif := range notifs {
Expand Down

0 comments on commit 696469a

Please sign in to comment.