Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Improve multi node test coverage #185

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 15 additions & 134 deletions query/graphql/planner/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ package planner
import (
"errors"

"log"

"github.com/sourcenetwork/defradb/core"
)

Expand Down Expand Up @@ -85,17 +83,6 @@ type parallelNode struct { // serialNode?
doc map[string]interface{}
}

// func (n *selectTopNode) Init() error { return n.plan.Init() }
// func (n *selectTopNode) Start() error { return n.plan.Start() }
// func (n *selectTopNode) Next() (bool, error) { return n.plan.Next() }
// func (n *selectTopNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
// func (n *selectTopNode) Values() map[string]interface{} { return n.plan.Values() }
// func (n *selectTopNode) Close() {
// if n.plan != nil {
// n.plan.Close()
// }
// }

func (p *parallelNode) applyToPlans(fn func(n planNode) error) error {
for _, plan := range p.children {
if err := fn(plan); err != nil {
Expand All @@ -118,13 +105,10 @@ func (p *parallelNode) Start() error {
}

func (p *parallelNode) Spans(spans core.Spans) {
err := p.applyToPlans(func(n planNode) error {
_ = p.applyToPlans(func(n planNode) error {
n.Spans(spans)
return nil
})
if err != nil {
log.Print("applying spans to plans failed : ", err)
}
}

func (p *parallelNode) Close() error {
Expand Down Expand Up @@ -287,20 +271,6 @@ _version: commitSelectTopNode(append)
*/

func (p *parallelNode) Values() map[string]interface{} {
// result := make(map[string]interface{})
// for i, plan := range p.children {
// if doc := plan.Values(); doc != nil {
// switch plan.(type) {
// case mergeNode:
// for k, v := range doc {
// p.result[k] = v
// }
// case appendNode:
// p.result[p.childFields[i]] = doc
// }
// }
// }

return p.doc
}

Expand All @@ -316,10 +286,6 @@ func (p *parallelNode) AddChild(field string, node planNode) error {
return nil
}

func (p *parallelNode) SetMultiScanner(ms *multiScanNode) {
p.multiscan = ms
}

/*
user {
friends {
Expand Down Expand Up @@ -409,62 +375,8 @@ Select {
}
]}
}



*/

// document
// func (s *selectNode) addSubPlan(field string, plan planNode) error {
// var multinode MultiNode
// var multiscan *multiScanNode
// switch src := s.source.(type) {
// case MultiNode:
// multinode = src
// multiscan = multinode.Source().(*multiScanNode)
// case *scanNode:
// // we have a simple scanNode as our source
// // if the new sub plan is a MergePlan, then just replace the
// // source
// // if its an append plan, then we need to create MultiNode
// s.source = plan
// return nil
// default: // no existing multinode, and our current source is a complex non scanNode node.
// // get original scanNode
// origScan := s.p.walkAndFindPlanType(plan, &scanNode{}).(*scanNode)
// if origScan == nil {
// return errors.New("Failed to find original scan node in plan graph")
// }
// // create our new multiscanner
// multiscan = &multiScanNode{scanNode: origScan}
// multiscan.addReader()
// // create multinode
// multinode = &parallelNode{
// multiscan: multiscan,
// }
// // replace our current source internal scanNode with our new multiscanner
// if err := s.p.walkAndReplacePlan(src, origScan, multiscan); err != nil {
// return err
// }
// // add our newly updated source to the multinode
// if err := multinode.AddChild("", src); err != nil {
// return err
// }
// s.source = multinode
// }

// // if we've got here, then we have an instanciated multinode ready to add
// // our new plan to
// multiscan.addReader()
// scan := multiscan.Source()
// // replace our current source internal scanNode with our new multiscanner
// if err := s.p.walkAndReplacePlan(plan, scan, multiscan); err != nil {
// return err
// }
// // add our newly updated source to the multinode
// return multinode.AddChild("", plan)
// }

// @todo: Document AddSubPlan method
func (s *selectNode) addSubPlan(field string, plan planNode) error {
src := s.source
Expand Down Expand Up @@ -530,56 +442,25 @@ func (s *selectNode) addSubPlan(field string, plan planNode) error {
return err
}

// harder case. two possibilities:
// A) We have a internal multiscanNode on our MultiNode
// B) We don't. Which means we have a scanNode as a child of MultiNode, and we need
// to replace it with the updated MergeNode
// We have a internal multiscanNode on our MultiNode
case mergeNode:
if ms := node.Source(); s != nil { // yes, we have a multiscan node. Case A)
multiscan := ms.(*multiScanNode)
// replace our new node internal scanNode with our existing multiscanner
if err := s.p.walkAndReplacePlan(plan, multiscan.Source(), multiscan); err != nil {
return err
}
multiscan.addReader()
// add our newly updated plan to the multinode
if err := node.AddChild(field, plan); err != nil {
return err
}
} else { // no multiscan, case B)
children := node.Children()
// index 0 is always a scan node if there is no multiscanner
// origScan is going to match the internal MergeNode scanner
origScan := children[0].(*scanNode)

// create our new multiscanner
multiscan := &multiScanNode{scanNode: origScan}
node.SetMultiScanner(multiscan)
// replace the origal mergePlan scanner with the new multiscan
if err := s.p.walkAndReplacePlan(plan, multiscan.Source(), multiscan); err != nil {
return err
}
multiscan.addReader()
// replace our origina scanNode in the mulitnode with our new MergeNode
children[0] = plan
multiscan, sourceIsMultiscan := node.Source().(*multiScanNode)
if !sourceIsMultiscan {
return errors.New("Merge node source must be a multiScanNode")
}

// replace our new node internal scanNode with our existing multiscanner
if err := s.p.walkAndReplacePlan(plan, multiscan.Source(), multiscan); err != nil {
return err
}
multiscan.addReader()
// add our newly updated plan to the multinode
if err := node.AddChild(field, plan); err != nil {
return err
}
default:
return errors.New("Sub plan needs to be either a MergeNode or an AppendNode")
}
}
return nil
}

// func (n *selectNode) addSubMergePlan(plan mergePlan) error {
// return nil
// }

// func (n *selectNode) addSubAppendPlan(plan appendPlan) error {
// return nil
// }

// func (p *Planner) parallelNode() (*parallelNode, error) {
// mp := &parallelNode{
// children: nodes,
// }
// }