Skip to content

Commit

Permalink
Fix debug mode ignoring events
Browse files Browse the repository at this point in the history
Signed-off-by: Matej Pavlovic <[email protected]>
  • Loading branch information
matejpavlovic committed Jun 20, 2022
1 parent 7b9bb3f commit b065db5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
41 changes: 29 additions & 12 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,7 @@ func (n *Node) process(ctx context.Context) error { //nolint:gocyclo
})
selectReactions = append(selectReactions, func(newEventsVal reflect.Value) {
newEvents := newEventsVal.Interface().(*events.EventList)
if n.debugMode {
if n.debugOut != nil {
n.debugOut <- newEvents
}
} else if err := n.workItems.AddEvents(newEvents); err != nil {
if err := n.workItems.AddEvents(newEvents); err != nil {
n.workErrNotifier.Fail(err)
}
})
Expand Down Expand Up @@ -352,7 +348,13 @@ func (n *Node) startModules(ctx context.Context, wg *sync.WaitGroup) {
var err error

for continueProcessing {
continueProcessing, err = n.processModuleEvents(ctx, m, workChan)
if n.debugMode {
// In debug mode, all produced events are routed to the debug output.
continueProcessing, err = n.processModuleEvents(ctx, m, workChan, n.debugOut)
} else {
// During normal operation, feed all produced events back into the event loop.
continueProcessing, err = n.processModuleEvents(ctx, m, workChan, n.eventsIn)
}
if err != nil {
n.workErrNotifier.Fail(fmt.Errorf("could not process PassiveModule (%v) events: %w", mID, err))
return
Expand All @@ -371,33 +373,48 @@ func (n *Node) startModules(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
n.importEvents(ctx, m.EventsOut())
if n.debugMode {
// In debug mode, all produced events are routed to the debug output.
n.importEvents(ctx, m.EventsOut(), n.debugOut)
} else {
// During normal operation, feed all produced events back into the event loop.
n.importEvents(ctx, m.EventsOut(), n.eventsIn)
}
}()
default:
n.workErrNotifier.Fail(fmt.Errorf("unknown module type: %T", m))
}
}
}

// importEvents reads events from eventsIn and writes them to the Node's central input channel (n.eventsIn) until
// - eventsIn is closed or
// importEvents reads events from eventSource and writes them to the eventSink until
// - eventSource is closed or
// - ctx is canceled or
// - an error occurred in the Node and was announced through the Node's workErrorNotifier.
func (n *Node) importEvents(ctx context.Context, eventsIn <-chan *events.EventList) {
func (n *Node) importEvents(
ctx context.Context,
eventSource <-chan *events.EventList,
eventSink chan<- *events.EventList,
) {
for {

// First, try to read events from the input.
select {
case newEvents, ok := <-eventsIn:
case newEvents, ok := <-eventSource:

// Return if input channel has been closed
if !ok {
return
}

// Skip writing events if there is no event sink.
if eventSink == nil {
continue
}

// If input events have been read, try to write them to the Node's central input channel.
select {
case n.eventsIn <- newEvents:
case eventSink <- newEvents:
case <-ctx.Done():
return
case <-n.workErrNotifier.ExitC():
Expand Down
13 changes: 9 additions & 4 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newWorkChans(modules modules.Modules) workChans {
// strips off all associated follow-up Events,
// and processes the bare content of the list using the passed PassiveModule.
// processModuleEvents writes all the stripped off follow-up events along with any Events generated by the processing
// to the workItemInput channel, so they will be added to the workItems buffer for further processing.
// to the eventSink channel if it is not nil.
//
// If the Node is configured to use an Interceptor, after having removed all follow-up Events,
// processModuleEvents passes the list of input Events to the Interceptor.
Expand All @@ -45,6 +45,7 @@ func (n *Node) processModuleEvents(
ctx context.Context,
module modules.Module,
eventSource <-chan *events.EventList,
eventSink chan<- *events.EventList,
) (bool, error) {
var eventsIn *events.EventList
var inputOpen bool
Expand Down Expand Up @@ -103,16 +104,20 @@ func (n *Node) processModuleEvents(
return true, nil
}

// Skip writing output if there is no channel to write it to.
if eventSink == nil {
return true, nil
}

// Write output.
select {
case n.eventsIn <- eventsOut:
case eventSink <- eventsOut:
return true, nil
case <-ctx.Done():
return false, nil
case <-n.workErrNotifier.ExitC():
return false, nil
}

return true, nil
}

func safelyApplyEventsPassive(
Expand Down

0 comments on commit b065db5

Please sign in to comment.