diff --git a/lang/funcs/core/iter/map_func.go b/lang/funcs/core/iter/map_func.go index d2da865e3c..3b106c703d 100644 --- a/lang/funcs/core/iter/map_func.go +++ b/lang/funcs/core/iter/map_func.go @@ -625,12 +625,20 @@ func (obj *MapFunc) Stream(ctx context.Context) error { continue } - newFuncValue, ok := input.Struct()[mapArgNameFunction].(*fancyfunc.FuncValue) + value, exists := input.Struct()[mapArgNameFunction] + if !exists { + return fmt.Errorf("programming error, can't find edge") + } + + newFuncValue, ok := value.(*fancyfunc.FuncValue) if !ok { return fmt.Errorf("programming error, can't convert to *FuncValue") } - newInputList := input.Struct()[mapArgNameInputs] + newInputList, exists := input.Struct()[mapArgNameInputs] + if !exists { + return fmt.Errorf("programming error, can't find edge") + } // If we have a new function or the length of the input // list has changed, then we need to replace the diff --git a/lang/funcs/dage/dage.go b/lang/funcs/dage/dage.go index 9bb6ef427a..7627617779 100644 --- a/lang/funcs/dage/dage.go +++ b/lang/funcs/dage/dage.go @@ -50,9 +50,6 @@ type Engine struct { Debug bool Logf func(format string, v ...interface{}) - // Glitch: https://en.wikipedia.org/wiki/Reactive_programming#Glitches - Glitch bool // allow glitching? (more responsive, but less accurate) - // Callback can be specified as an alternative to using the Stream // method to get events. If the context on it is cancelled, then it must // shutdown quickly, because this means we are closing and want to @@ -91,6 +88,7 @@ type Engine struct { // nodeWaitMutex wraps access to the nodeWaitFns list. nodeWaitMutex *sync.Mutex + // streamChan is used to send notifications to the outside world. streamChan chan error loaded bool // are all of the funcs loaded? @@ -101,6 +99,20 @@ type Engine struct { // wakeChan contains a message when someone has asked for us to wake up. wakeChan chan struct{} + // ag is the aggregation channel which cues up outgoing events. + ag chan error + + // leafSend specifies if we should do an ag send because we have + // activity at a leaf. + leafSend bool + + // activity tracks nodes that are ready to send to ag. The main process + // loop decides if we have the correct set to do so. + activity map[*state]struct{} + + // activityMutex wraps access to the activity map. + activityMutex *sync.Mutex + // stats holds some statistics and other debugging information. stats *stats // guarded by statsMutex @@ -151,6 +163,11 @@ func (obj *Engine) Setup() error { obj.wakeChan = make(chan struct{}, 1) // hold up to one message + obj.ag = make(chan error) + + obj.activity = make(map[*state]struct{}) + obj.activityMutex = &sync.Mutex{} + obj.stats = &stats{ runningList: make(map[*state]struct{}), loadedList: make(map[*state]bool), @@ -224,10 +241,6 @@ func (obj *Engine) addVertex(f interfaces.Func) error { rwmutex: &sync.RWMutex{}, } - if len(sig.Ord) > 0 { - // since we accept input, better get our notification chan built - node.notify = make(chan struct{}) - } init := &interfaces.Init{ Hostname: obj.Hostname, @@ -559,7 +572,8 @@ func (obj *Engine) runNodeWaitFns() { // but that is the price we pay for being always available to unblock. // Importantly, re-running this resumes work in progress even if there was // caching, and that if interrupted, it'll be queued again so as to not drop a -// wakeChan notification! +// wakeChan notification! We know we've read all the pending incoming values, +// because the Stream reader call wake(). func (obj *Engine) process(ctx context.Context) (reterr error) { defer func() { // catch programming errors @@ -593,25 +607,20 @@ func (obj *Engine) process(ctx context.Context) (reterr error) { if !exists { panic(fmt.Sprintf("missing out degree in iterate: %s", f)) } + //outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex + //node.isLeaf = len(outgoing) == 0 + node.isLeaf = out == 0 // store - // XXX: this nodeLoaded stuff is crap + // TODO: the obj.loaded stuff isn't really consumed currently node.rwmutex.RLock() - nodeLoaded := node.loaded - node.rwmutex.RUnlock() - if !nodeLoaded { + if !node.loaded { loaded = false // we were wrong } + node.rwmutex.RUnlock() - // XXX: memoize this which we can do easily now since graph shape doesn't change in this loop! + // TODO: memoize since graph shape doesn't change in this loop! incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex - //outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex - //node.isLeaf = len(outgoing) == 0 - - node.rwmutex.Lock() - node.isLeaf = out == 0 - node.rwmutex.Unlock() - // no incoming edges, so no incoming data if len(incoming) == 0 { // we do this below if !node.closed { @@ -689,8 +698,7 @@ func (obj *Engine) process(ctx context.Context) (reterr error) { } if !ready || len(need) != 0 { - //return nil // not ready yet - break + continue // definitely continue, don't break here } // previously it was closed, skip sending @@ -698,7 +706,7 @@ func (obj *Engine) process(ctx context.Context) (reterr error) { continue } - // XXX: respect the info.Pure and info.Memo fields somewhere here... + // XXX: respect the info.Pure and info.Memo fields somewhere... // XXX: keep track of some state about who i sent to last before // being interrupted so that I can avoid resending to some nodes @@ -747,6 +755,50 @@ func (obj *Engine) process(ctx context.Context) (reterr error) { // It's okay if this section gets preempted and we re-run this bit here. obj.loaded = loaded // this gets reset when graph adds new nodes + if !loaded { + return nil + } + + // Check each leaf and make sure they're all ready to send, for us to + // send anything to ag channel. In addition, we need at least one send + // message from any of the valid isLeaf nodes. Since this only runs if + // everyone is loaded, we just need to check for activty leaf nodes. + obj.activityMutex.Lock() + for node := range obj.activity { + if obj.leafSend { + break // early + } + + if node.isLeaf { // calculated above in the previous loop + obj.leafSend = true + break + } + } + obj.activity = make(map[*state]struct{}) // clear + //clear(obj.activity) // new clear + obj.activityMutex.Unlock() + + if !obj.leafSend { + return nil + } + + select { + case obj.ag <- nil: // send to aggregate channel if we have events + obj.Logf("aggregated send") + obj.leafSend = false // reset + + case <-ctx.Done(): + obj.leafSend = true // since we skipped the ag send! + obj.wake() // interrupted, so queue again + return ctx.Err() + + default: + // XXX: should we even allow this default case? + // exit if we're not ready to send to ag + obj.leafSend = true // since we skipped the ag send! + obj.wake() // interrupted, so queue again + } + return nil } @@ -797,22 +849,6 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { }() } - // Aggregation channel and wait group. - ag := make(chan error) - wgAg := &sync.WaitGroup{} - - // close the aggregate channel when everyone is done with it... - wg.Add(1) - go func() { - defer wg.Done() - select { - case <-ctx.Done(): - } - // don't wait and close ag before we're really done with Run() - wgAg.Wait() // wait for last ag user to close - close(ag) // last one closes the ag channel - }() - // aggregate events channel wg.Add(1) go func() { @@ -822,13 +858,13 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { var err error var ok bool select { - case err, ok = <-ag: // aggregated channel + case err, ok = <-obj.ag: // aggregated channel if !ok { return // channel shutdown } } - // XXX: check obj.loaded first? + // TODO: check obj.loaded first? // now send event... if obj.Callback != nil { @@ -850,6 +886,12 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { } }() + // wgAg is a wait group that waits for all senders to the ag chan. + // Exceptionally, we don't close the ag channel until wgFor has also + // closed, because it can send to wg in process(). + wgAg := &sync.WaitGroup{} + wgFor := &sync.WaitGroup{} + // We need to keep the main loop running until everyone else has shut // down. When the top context closes, we wait for everyone to finish, // and then we shut down this main context. @@ -857,6 +899,7 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { mainCtx, mainCancel := context.WithCancel(context.Background()) // DON'T wrap parent, close on your own terms defer mainCancel() + // close the aggregate channel when everyone is done with it... wg.Add(1) go func() { defer wg.Done() @@ -864,25 +907,21 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { case <-ctx.Done(): } - // XXX RENAME wgAg because of its use here - wgAg.Wait() // wait until all the routines have closed - mainCancel() + // don't wait and close ag before we're really done with Run() + wgAg.Wait() // wait for last ag user to close + mainCancel() // only cancel after wgAg goroutines are done + wgFor.Wait() // wait for process loop to close before closing + close(obj.ag) // last one closes the ag channel }() wgFn := &sync.WaitGroup{} // wg for process function runner defer wgFn.Wait() // extra safety - // XXX XXX XXX - //go func() { // XXX: debugging to make sure we didn't forget to wake someone... - // for { - // obj.wake() // new value, so send wake up - // time.Sleep(3 * time.Second) - // } - //}() - // XXX XXX XXX - defer obj.runNodeWaitFns() // just in case + wgFor.Add(1) // make sure we wait for the below process loop to exit... + defer wgFor.Done() + // we start off "running", but we'll have an empty graph initially... for { @@ -1025,6 +1064,11 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { obj.loaded = false // reset this node.running = true + obj.statsMutex.Lock() + val, _ := obj.stats.inputList[node] // val is # or zero + obj.stats.inputList[node] = val // initialize to zero + obj.statsMutex.Unlock() + innerCtx, innerCancel := context.WithCancel(ctx) // wrap parent (not mainCtx) // we defer innerCancel() in the goroutine to cleanup! node.ctx = innerCtx @@ -1063,11 +1107,12 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { obj.statsMutex.Unlock() } if runErr != nil { + obj.Logf("Erroring func `%s`: %+v", node, err) // send to a aggregate channel // the first to error will cause ag to // shutdown, so make sure we can exit... select { - case ag <- runErr: // send to aggregate channel + case obj.ag <- runErr: // send to aggregate channel case <-node.ctx.Done(): } } @@ -1111,30 +1156,21 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { node.loaded = true // set *after* value is in :) //obj.Logf("func `%s` changed", node) node.rwmutex.Unlock() - obj.wake() // new value, so send wake up obj.statsMutex.Lock() obj.stats.loadedList[node] = true obj.statsMutex.Unlock() - // XXX: I think we need this read lock - // because we don't want to be adding a - // new vertex here but then missing to - // send an event to it because it - // started after we did the range... - - node.rwmutex.RLock() - isLeaf := node.isLeaf - node.rwmutex.RUnlock() - - // TODO: if shutdown, did we still want to do this? - if obj.Glitch || isLeaf { - select { - case ag <- nil: // send to aggregate channel - case <-node.ctx.Done(): - //return - } - } + // Send a message to tell our ag channel + // that we might have sent an aggregated + // message here. They should check if we + // are a leaf and if we glitch or not... + // Make sure we do this before the wake. + obj.activityMutex.Lock() + obj.activity[node] = struct{}{} + obj.activityMutex.Unlock() + + obj.wake() // new value, so send wake up } // end for @@ -1147,7 +1183,7 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { // nodes that never loaded will cause the engine to hang if !node.loaded { select { - case ag <- fmt.Errorf("func `%s` stopped before it was loaded", node): + case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node): case <-node.ctx.Done(): return } @@ -1160,7 +1196,6 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) { // Send new notifications in case any new edges are sending away // to these... They might have already missed the notifications! for k := range obj.resend { // resend TO these! - //obj.Graphviz("") // XXX DEBUG node, exists := obj.state[k] if !exists { continue @@ -1332,8 +1367,6 @@ type state struct { Func interfaces.Func name string // cache a name here for safer concurrency - notify chan struct{} // ping here when new input values exist - input chan types.Value // the top level type must be a struct output chan types.Value txn interfaces.Txn // API of graphTxn struct to pass to each function @@ -1375,6 +1408,8 @@ type stats struct { inputList map[*state]int64 } +// String implements the fmt.Stringer interface for printing out our collected +// statistics! func (obj *stats) String() string { // XXX: just build the lock into *stats instead of into our dage obj s := "stats:\n" diff --git a/lang/funcs/simple/channel_based_sink_func.go b/lang/funcs/simple/channel_based_sink_func.go index ec5c8a1b0a..9155e2d315 100644 --- a/lang/funcs/simple/channel_based_sink_func.go +++ b/lang/funcs/simple/channel_based_sink_func.go @@ -94,11 +94,15 @@ func (obj *ChannelBasedSinkFunc) Stream(ctx context.Context) error { return nil // can't output any more } - inputValue := input.Struct()[obj.EdgeName] - //if obj.last != nil && inputValue.Cmp(obj.last) == nil { - // continue // value didn't change, skip it - //} - obj.last = inputValue // store so we can send after this select + value, exists := input.Struct()[obj.EdgeName] + if !exists { + return fmt.Errorf("programming error, can't find edge") + } + + if obj.last != nil && value.Cmp(obj.last) == nil { + continue // value didn't change, skip it + } + obj.last = value // store so we can send after this select case <-ctx.Done(): return nil diff --git a/lang/funcs/simple/structs_call.go b/lang/funcs/simple/structs_call.go index 9f27bf278a..2d1c7acd2f 100644 --- a/lang/funcs/simple/structs_call.go +++ b/lang/funcs/simple/structs_call.go @@ -143,7 +143,12 @@ func (obj *CallFunc) Stream(ctx context.Context) error { fmt.Printf("XXX XXX XXX CALLFUNC(%p) GOT INPUT: %+v\n", obj, input) - newFuncValue, ok := input.Struct()[obj.EdgeName].(*fancyfunc.FuncValue) + value, exists := input.Struct()[obj.EdgeName] + if !exists { + return fmt.Errorf("programming error, can't find edge") + } + + newFuncValue, ok := value.(*fancyfunc.FuncValue) if !ok { return fmt.Errorf("programming error, can't convert to *FuncValue") } diff --git a/lang/interpret_test.go b/lang/interpret_test.go index b69156acf9..bf9979996e 100644 --- a/lang/interpret_test.go +++ b/lang/interpret_test.go @@ -1493,7 +1493,6 @@ func TestAstFunc2(t *testing.T) { Logf: func(format string, v ...interface{}) { logf("funcs: "+format, v...) }, - Glitch: false, // FIXME: verify this functionality is perfect! } logf("function engine initializing...") @@ -1592,7 +1591,7 @@ func TestAstFunc2(t *testing.T) { // sometimes the <-stream seems to constantly (or for a // long time?) win the races against the <-time.After(), // so add some limit to how many times we need to stream - max := 10 // XXX !!! + max := 1 Loop: for { select { @@ -1613,7 +1612,7 @@ func TestAstFunc2(t *testing.T) { break Loop } - case <-time.After(7 * time.Second): // blocked functions + case <-time.After(10 * time.Second): // blocked functions XXX !!! //t.Errorf("test #%d: unblocking because no event was sent by the function engine for a while", index) t.Logf("test #%d: unblocking because no event was sent by the function engine for a while", index) break Loop diff --git a/lang/lang.go b/lang/lang.go index 350dea89c9..7d93d3ed90 100644 --- a/lang/lang.go +++ b/lang/lang.go @@ -263,7 +263,6 @@ func (obj *Lang) Init() error { Logf: func(format string, v ...interface{}) { obj.Logf("funcs: "+format, v...) }, - Glitch: false, // FIXME: verify this functionality is perfect! } obj.Logf("function engine initializing...")