From b065db570fd479cba0bd9ddc0fc628d9b1a4c152 Mon Sep 17 00:00:00 2001 From: Matej Pavlovic Date: Mon, 20 Jun 2022 16:45:02 +0200 Subject: [PATCH] Fix debug mode ignoring events Signed-off-by: Matej Pavlovic --- node.go | 41 +++++++++++++++++++++++++++++------------ workers.go | 13 +++++++++---- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/node.go b/node.go index 8378fcf14..cf8966cff 100644 --- a/node.go +++ b/node.go @@ -266,11 +266,7 @@ func (n *Node) process(ctx context.Context) error { //nolint:gocyclo }) selectReactions = append(selectReactions, func(newEventsVal reflect.Value) { newEvents := newEventsVal.Interface().(*events.EventList) - if n.debugMode { - if n.debugOut != nil { - n.debugOut <- newEvents - } - } else if err := n.workItems.AddEvents(newEvents); err != nil { + if err := n.workItems.AddEvents(newEvents); err != nil { n.workErrNotifier.Fail(err) } }) @@ -352,7 +348,13 @@ func (n *Node) startModules(ctx context.Context, wg *sync.WaitGroup) { var err error for continueProcessing { - continueProcessing, err = n.processModuleEvents(ctx, m, workChan) + if n.debugMode { + // In debug mode, all produced events are routed to the debug output. + continueProcessing, err = n.processModuleEvents(ctx, m, workChan, n.debugOut) + } else { + // During normal operation, feed all produced events back into the event loop. + continueProcessing, err = n.processModuleEvents(ctx, m, workChan, n.eventsIn) + } if err != nil { n.workErrNotifier.Fail(fmt.Errorf("could not process PassiveModule (%v) events: %w", mID, err)) return @@ -371,7 +373,13 @@ func (n *Node) startModules(ctx context.Context, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() - n.importEvents(ctx, m.EventsOut()) + if n.debugMode { + // In debug mode, all produced events are routed to the debug output. + n.importEvents(ctx, m.EventsOut(), n.debugOut) + } else { + // During normal operation, feed all produced events back into the event loop. + n.importEvents(ctx, m.EventsOut(), n.eventsIn) + } }() default: n.workErrNotifier.Fail(fmt.Errorf("unknown module type: %T", m)) @@ -379,25 +387,34 @@ func (n *Node) startModules(ctx context.Context, wg *sync.WaitGroup) { } } -// importEvents reads events from eventsIn and writes them to the Node's central input channel (n.eventsIn) until -// - eventsIn is closed or +// importEvents reads events from eventSource and writes them to the eventSink until +// - eventSource is closed or // - ctx is canceled or // - an error occurred in the Node and was announced through the Node's workErrorNotifier. -func (n *Node) importEvents(ctx context.Context, eventsIn <-chan *events.EventList) { +func (n *Node) importEvents( + ctx context.Context, + eventSource <-chan *events.EventList, + eventSink chan<- *events.EventList, +) { for { // First, try to read events from the input. select { - case newEvents, ok := <-eventsIn: + case newEvents, ok := <-eventSource: // Return if input channel has been closed if !ok { return } + // Skip writing events if there is no event sink. + if eventSink == nil { + continue + } + // If input events have been read, try to write them to the Node's central input channel. select { - case n.eventsIn <- newEvents: + case eventSink <- newEvents: case <-ctx.Done(): return case <-n.workErrNotifier.ExitC(): diff --git a/workers.go b/workers.go index 96bbebcf1..2bc4854b8 100644 --- a/workers.go +++ b/workers.go @@ -29,7 +29,7 @@ func newWorkChans(modules modules.Modules) workChans { // strips off all associated follow-up Events, // and processes the bare content of the list using the passed PassiveModule. // processModuleEvents writes all the stripped off follow-up events along with any Events generated by the processing -// to the workItemInput channel, so they will be added to the workItems buffer for further processing. +// to the eventSink channel if it is not nil. // // If the Node is configured to use an Interceptor, after having removed all follow-up Events, // processModuleEvents passes the list of input Events to the Interceptor. @@ -45,6 +45,7 @@ func (n *Node) processModuleEvents( ctx context.Context, module modules.Module, eventSource <-chan *events.EventList, + eventSink chan<- *events.EventList, ) (bool, error) { var eventsIn *events.EventList var inputOpen bool @@ -103,16 +104,20 @@ func (n *Node) processModuleEvents( return true, nil } + // Skip writing output if there is no channel to write it to. + if eventSink == nil { + return true, nil + } + // Write output. select { - case n.eventsIn <- eventsOut: + case eventSink <- eventsOut: + return true, nil case <-ctx.Done(): return false, nil case <-n.workErrNotifier.ExitC(): return false, nil } - - return true, nil } func safelyApplyEventsPassive(