Skip to content

Commit

Permalink
XXX: attempted fix for #72
Browse files Browse the repository at this point in the history
makes everything much slower...

i suspect there's an underlying issue with:

const exited before it loaded stuff...
  • Loading branch information
purpleidea committed Aug 7, 2023
1 parent 1da3112 commit 021bce7
Showing 1 changed file with 53 additions and 45 deletions.
98 changes: 53 additions & 45 deletions lang/funcs/dage/dage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,52 +1266,60 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
defer node.wg.Done()
defer wgAg.Done()

for value := range node.output { // read from channel
if value == nil {
// bug!
obj.Logf("func `%s` got nil value", node)
panic("got nil value")
}

obj.tableMutex.RLock()
cached, exists := obj.table[f]
obj.tableMutex.RUnlock()
if !exists { // first value received
// RACE: do this AFTER value is present!
//node.loaded = true // not yet please
obj.Logf("func `%s` started", node)
} else if value.Cmp(cached) == nil {
// skip if new value is same as previous
// if this happens often, it *might* be
// a bug in the function implementation
// FIXME: do we need to disable engine
// caching when using hysteresis?
obj.Logf("func `%s` skipped", node)
continue
}
obj.tableMutex.Lock()
obj.table[f] = value // save the latest
obj.tableMutex.Unlock()
node.rwmutex.Lock()
node.loaded = true // set *after* value is in :)
//obj.Logf("func `%s` changed", node)
node.rwmutex.Unlock()

obj.statsMutex.Lock()
obj.stats.loadedList[node] = true
obj.statsMutex.Unlock()

// Send a message to tell our ag channel
// that we might have sent an aggregated
// message here. They should check if we
// are a leaf and if we glitch or not...
// Make sure we do this before the wake.
obj.activityMutex.Lock()
obj.activity[node] = struct{}{}
obj.activityMutex.Unlock()

obj.wake() // new value, so send wake up
//for value := range node.output { // read from channel
for {
select {
case value, ok := <-node.output:
if !ok {
break
}
if value == nil {
// bug!
obj.Logf("func `%s` got nil value", node)
panic("got nil value")
}

obj.tableMutex.RLock()
cached, exists := obj.table[f]
obj.tableMutex.RUnlock()
if !exists { // first value received
// RACE: do this AFTER value is present!
//node.loaded = true // not yet please
obj.Logf("func `%s` started", node)
} else if value.Cmp(cached) == nil {
// skip if new value is same as previous
// if this happens often, it *might* be
// a bug in the function implementation
// FIXME: do we need to disable engine
// caching when using hysteresis?
obj.Logf("func `%s` skipped", node)
continue
}
obj.tableMutex.Lock()
obj.table[f] = value // save the latest
obj.tableMutex.Unlock()
node.rwmutex.Lock()
node.loaded = true // set *after* value is in :)
//obj.Logf("func `%s` changed", node)
node.rwmutex.Unlock()

obj.statsMutex.Lock()
obj.stats.loadedList[node] = true
obj.statsMutex.Unlock()

// Send a message to tell our ag channel
// that we might have sent an aggregated
// message here. They should check if we
// are a leaf and if we glitch or not...
// Make sure we do this before the wake.
obj.activityMutex.Lock()
obj.activity[node] = struct{}{}
obj.activityMutex.Unlock()

obj.wake("new value") // new value, so send wake up
case <-ctx.Done(): // XXX ???
return
} // end select
} // end for

// no more output values are coming...
Expand Down

0 comments on commit 021bce7

Please sign in to comment.