Skip to content

Commit

Permalink
Merge pull request #5159 from iand/fix/event-hang
Browse files Browse the repository at this point in the history
fix(events): avoid potential hang when starting event listener
  • Loading branch information
magik6k authored Dec 9, 2020
2 parents 2ce5a29 + 696469a commit e1be89b
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
var log = logging.Logger("events")

// HeightHandler `curH`-`ts.Height` = `confidence`
type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
type RevertHandler func(ctx context.Context, ts *types.TipSet) error
type (
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)

type heightHandler struct {
confidence int
Expand All @@ -48,7 +50,7 @@ type Events struct {
tsc *tipSetCache
lk sync.Mutex

ready sync.WaitGroup
ready chan struct{}
readyOnce sync.Once

heightEvents
Expand Down Expand Up @@ -76,15 +78,16 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
}

e.ready.Add(1)

go e.listenHeadChanges(ctx)

e.ready.Wait()

// TODO: cleanup/gc goroutine
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}

return e
}
Expand All @@ -111,13 +114,21 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {

notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}

cur, ok := <-notifs // TODO: timeout?
if !ok {
return xerrors.Errorf("notification channel closed")
var cur []*api.HeadChange
var ok bool

// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}

if len(cur) != 1 {
Expand All @@ -134,8 +145,8 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {

e.readyOnce.Do(func() {
e.lastTs = cur[0].Val

e.ready.Done()
// Signal that we have seen first tipset
close(e.ready)
})

for notif := range notifs {
Expand Down

0 comments on commit e1be89b

Please sign in to comment.