Skip to content

Commit

Permalink
XXX basic graph engine works fast!
Browse files Browse the repository at this point in the history
wooo huge milestone
  • Loading branch information
purpleidea committed Jul 27, 2023
1 parent 488bebd commit 0a21e5d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 89 deletions.
12 changes: 10 additions & 2 deletions lang/funcs/core/iter/map_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,20 @@ func (obj *MapFunc) Stream(ctx context.Context) error {
continue
}

newFuncValue, ok := input.Struct()[mapArgNameFunction].(*fancyfunc.FuncValue)
value, exists := input.Struct()[mapArgNameFunction]
if !exists {
return fmt.Errorf("programming error, can't find edge")
}

newFuncValue, ok := value.(*fancyfunc.FuncValue)
if !ok {
return fmt.Errorf("programming error, can't convert to *FuncValue")
}

newInputList := input.Struct()[mapArgNameInputs]
newInputList, exists := input.Struct()[mapArgNameInputs]
if !exists {
return fmt.Errorf("programming error, can't find edge")
}

// If we have a new function or the length of the input
// list has changed, then we need to replace the
Expand Down
189 changes: 112 additions & 77 deletions lang/funcs/dage/dage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type Engine struct {
Debug bool
Logf func(format string, v ...interface{})

// Glitch: https://en.wikipedia.org/wiki/Reactive_programming#Glitches
Glitch bool // allow glitching? (more responsive, but less accurate)

// Callback can be specified as an alternative to using the Stream
// method to get events. If the context on it is cancelled, then it must
// shutdown quickly, because this means we are closing and want to
Expand Down Expand Up @@ -91,6 +88,7 @@ type Engine struct {
// nodeWaitMutex wraps access to the nodeWaitFns list.
nodeWaitMutex *sync.Mutex

// streamChan is used to send notifications to the outside world.
streamChan chan error

loaded bool // are all of the funcs loaded?
Expand All @@ -101,6 +99,20 @@ type Engine struct {
// wakeChan contains a message when someone has asked for us to wake up.
wakeChan chan struct{}

// ag is the aggregation channel which cues up outgoing events.
ag chan error

// leafSend specifies if we should do an ag send because we have
// activity at a leaf.
leafSend bool

// activity tracks nodes that are ready to send to ag. The main process
// loop decides if we have the correct set to do so.
activity map[*state]struct{}

// activityMutex wraps access to the activity map.
activityMutex *sync.Mutex

// stats holds some statistics and other debugging information.
stats *stats // guarded by statsMutex

Expand Down Expand Up @@ -151,6 +163,11 @@ func (obj *Engine) Setup() error {

obj.wakeChan = make(chan struct{}, 1) // hold up to one message

obj.ag = make(chan error)

obj.activity = make(map[*state]struct{})
obj.activityMutex = &sync.Mutex{}

obj.stats = &stats{
runningList: make(map[*state]struct{}),
loadedList: make(map[*state]bool),
Expand Down Expand Up @@ -224,10 +241,6 @@ func (obj *Engine) addVertex(f interfaces.Func) error {

rwmutex: &sync.RWMutex{},
}
if len(sig.Ord) > 0 {
// since we accept input, better get our notification chan built
node.notify = make(chan struct{})
}

init := &interfaces.Init{
Hostname: obj.Hostname,
Expand Down Expand Up @@ -559,7 +572,8 @@ func (obj *Engine) runNodeWaitFns() {
// but that is the price we pay for being always available to unblock.
// Importantly, re-running this resumes work in progress even if there was
// caching, and that if interrupted, it'll be queued again so as to not drop a
// wakeChan notification!
// wakeChan notification! We know we've read all the pending incoming values,
// because the Stream reader call wake().
func (obj *Engine) process(ctx context.Context) (reterr error) {
defer func() {
// catch programming errors
Expand Down Expand Up @@ -593,25 +607,20 @@ func (obj *Engine) process(ctx context.Context) (reterr error) {
if !exists {
panic(fmt.Sprintf("missing out degree in iterate: %s", f))
}
//outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex
//node.isLeaf = len(outgoing) == 0
node.isLeaf = out == 0 // store

// XXX: this nodeLoaded stuff is crap
// TODO: the obj.loaded stuff isn't really consumed currently
node.rwmutex.RLock()
nodeLoaded := node.loaded
node.rwmutex.RUnlock()
if !nodeLoaded {
if !node.loaded {
loaded = false // we were wrong
}
node.rwmutex.RUnlock()

// XXX: memoize this which we can do easily now since graph shape doesn't change in this loop!
// TODO: memoize since graph shape doesn't change in this loop!
incoming := obj.graph.IncomingGraphVertices(f) // []pgraph.Vertex

//outgoing := obj.graph.OutgoingGraphVertices(f) // []pgraph.Vertex
//node.isLeaf = len(outgoing) == 0

node.rwmutex.Lock()
node.isLeaf = out == 0
node.rwmutex.Unlock()

// no incoming edges, so no incoming data
if len(incoming) == 0 { // we do this below
if !node.closed {
Expand Down Expand Up @@ -689,16 +698,15 @@ func (obj *Engine) process(ctx context.Context) (reterr error) {
}

if !ready || len(need) != 0 {
//return nil // not ready yet
break
continue // definitely continue, don't break here
}

// previously it was closed, skip sending
if node.closed {
continue
}

// XXX: respect the info.Pure and info.Memo fields somewhere here...
// XXX: respect the info.Pure and info.Memo fields somewhere...

// XXX: keep track of some state about who i sent to last before
// being interrupted so that I can avoid resending to some nodes
Expand Down Expand Up @@ -747,6 +755,50 @@ func (obj *Engine) process(ctx context.Context) (reterr error) {
// It's okay if this section gets preempted and we re-run this bit here.
obj.loaded = loaded // this gets reset when graph adds new nodes

if !loaded {
return nil
}

// Check each leaf and make sure they're all ready to send, for us to
// send anything to ag channel. In addition, we need at least one send
// message from any of the valid isLeaf nodes. Since this only runs if
// everyone is loaded, we just need to check for activty leaf nodes.
obj.activityMutex.Lock()
for node := range obj.activity {
if obj.leafSend {
break // early
}

if node.isLeaf { // calculated above in the previous loop
obj.leafSend = true
break
}
}
obj.activity = make(map[*state]struct{}) // clear
//clear(obj.activity) // new clear
obj.activityMutex.Unlock()

if !obj.leafSend {
return nil
}

select {
case obj.ag <- nil: // send to aggregate channel if we have events
obj.Logf("aggregated send")
obj.leafSend = false // reset

case <-ctx.Done():
obj.leafSend = true // since we skipped the ag send!
obj.wake() // interrupted, so queue again
return ctx.Err()

default:
// XXX: should we even allow this default case?
// exit if we're not ready to send to ag
obj.leafSend = true // since we skipped the ag send!
obj.wake() // interrupted, so queue again
}

return nil
}

Expand Down Expand Up @@ -797,22 +849,6 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
}()
}

// Aggregation channel and wait group.
ag := make(chan error)
wgAg := &sync.WaitGroup{}

// close the aggregate channel when everyone is done with it...
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
}
// don't wait and close ag before we're really done with Run()
wgAg.Wait() // wait for last ag user to close
close(ag) // last one closes the ag channel
}()

// aggregate events channel
wg.Add(1)
go func() {
Expand All @@ -822,13 +858,13 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
var err error
var ok bool
select {
case err, ok = <-ag: // aggregated channel
case err, ok = <-obj.ag: // aggregated channel
if !ok {
return // channel shutdown
}
}

// XXX: check obj.loaded first?
// TODO: check obj.loaded first?

// now send event...
if obj.Callback != nil {
Expand All @@ -850,39 +886,42 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
}
}()

// wgAg is a wait group that waits for all senders to the ag chan.
// Exceptionally, we don't close the ag channel until wgFor has also
// closed, because it can send to wg in process().
wgAg := &sync.WaitGroup{}
wgFor := &sync.WaitGroup{}

// We need to keep the main loop running until everyone else has shut
// down. When the top context closes, we wait for everyone to finish,
// and then we shut down this main context.
//mainCtx, mainCancel := context.WithCancel(ctx) // wrap parent
mainCtx, mainCancel := context.WithCancel(context.Background()) // DON'T wrap parent, close on your own terms
defer mainCancel()

// close the aggregate channel when everyone is done with it...
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
}

// XXX RENAME wgAg because of its use here
wgAg.Wait() // wait until all the routines have closed
mainCancel()
// don't wait and close ag before we're really done with Run()
wgAg.Wait() // wait for last ag user to close
mainCancel() // only cancel after wgAg goroutines are done
wgFor.Wait() // wait for process loop to close before closing
close(obj.ag) // last one closes the ag channel
}()

wgFn := &sync.WaitGroup{} // wg for process function runner
defer wgFn.Wait() // extra safety

// XXX XXX XXX
//go func() { // XXX: debugging to make sure we didn't forget to wake someone...
// for {
// obj.wake() // new value, so send wake up
// time.Sleep(3 * time.Second)
// }
//}()
// XXX XXX XXX

defer obj.runNodeWaitFns() // just in case

wgFor.Add(1) // make sure we wait for the below process loop to exit...
defer wgFor.Done()

// we start off "running", but we'll have an empty graph initially...
for {

Expand Down Expand Up @@ -1025,6 +1064,11 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
obj.loaded = false // reset this
node.running = true

obj.statsMutex.Lock()
val, _ := obj.stats.inputList[node] // val is # or zero
obj.stats.inputList[node] = val // initialize to zero
obj.statsMutex.Unlock()

innerCtx, innerCancel := context.WithCancel(ctx) // wrap parent (not mainCtx)
// we defer innerCancel() in the goroutine to cleanup!
node.ctx = innerCtx
Expand Down Expand Up @@ -1063,11 +1107,12 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
obj.statsMutex.Unlock()
}
if runErr != nil {
obj.Logf("Erroring func `%s`: %+v", node, err)
// send to a aggregate channel
// the first to error will cause ag to
// shutdown, so make sure we can exit...
select {
case ag <- runErr: // send to aggregate channel
case obj.ag <- runErr: // send to aggregate channel
case <-node.ctx.Done():
}
}
Expand Down Expand Up @@ -1111,30 +1156,21 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
node.loaded = true // set *after* value is in :)
//obj.Logf("func `%s` changed", node)
node.rwmutex.Unlock()
obj.wake() // new value, so send wake up

obj.statsMutex.Lock()
obj.stats.loadedList[node] = true
obj.statsMutex.Unlock()

// XXX: I think we need this read lock
// because we don't want to be adding a
// new vertex here but then missing to
// send an event to it because it
// started after we did the range...

node.rwmutex.RLock()
isLeaf := node.isLeaf
node.rwmutex.RUnlock()

// TODO: if shutdown, did we still want to do this?
if obj.Glitch || isLeaf {
select {
case ag <- nil: // send to aggregate channel
case <-node.ctx.Done():
//return
}
}
// Send a message to tell our ag channel
// that we might have sent an aggregated
// message here. They should check if we
// are a leaf and if we glitch or not...
// Make sure we do this before the wake.
obj.activityMutex.Lock()
obj.activity[node] = struct{}{}
obj.activityMutex.Unlock()

obj.wake() // new value, so send wake up

} // end for

Expand All @@ -1147,7 +1183,7 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
// nodes that never loaded will cause the engine to hang
if !node.loaded {
select {
case ag <- fmt.Errorf("func `%s` stopped before it was loaded", node):
case obj.ag <- fmt.Errorf("func `%s` stopped before it was loaded", node):
case <-node.ctx.Done():
return
}
Expand All @@ -1160,7 +1196,6 @@ func (obj *Engine) Run(ctx context.Context) (reterr error) {
// Send new notifications in case any new edges are sending away
// to these... They might have already missed the notifications!
for k := range obj.resend { // resend TO these!
//obj.Graphviz("") // XXX DEBUG
node, exists := obj.state[k]
if !exists {
continue
Expand Down Expand Up @@ -1332,8 +1367,6 @@ type state struct {
Func interfaces.Func
name string // cache a name here for safer concurrency

notify chan struct{} // ping here when new input values exist

input chan types.Value // the top level type must be a struct
output chan types.Value
txn interfaces.Txn // API of graphTxn struct to pass to each function
Expand Down Expand Up @@ -1375,6 +1408,8 @@ type stats struct {
inputList map[*state]int64
}

// String implements the fmt.Stringer interface for printing out our collected
// statistics!
func (obj *stats) String() string {
// XXX: just build the lock into *stats instead of into our dage obj
s := "stats:\n"
Expand Down
Loading

0 comments on commit 0a21e5d

Please sign in to comment.