Skip to content

Commit

Permalink
Add array and map data type to process map output
Browse files Browse the repository at this point in the history
  • Loading branch information
krhubert committed Dec 2, 2019
1 parent 7b8d255 commit 8c6b914
Show file tree
Hide file tree
Showing 4 changed files with 531 additions and 116 deletions.
70 changes: 53 additions & 17 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
"errors"
"fmt"
"math/rand"

Expand Down Expand Up @@ -129,7 +130,7 @@ func (s *Orchestrator) executeNode(wf *process.Process, n *process.Process_Node,
return s.processTask(n.Key, task, wf, exec, event, data)
} else if m := n.GetMap(); m != nil {
var err error
data, err = s.processMap(m, wf, exec, data)
data, err = s.processMap(m.Outputs, wf, exec, data)
if err != nil {
return err
}
Expand All @@ -153,30 +154,65 @@ func (s *Orchestrator) executeNode(wf *process.Process, n *process.Process_Node,
return nil
}

func (s *Orchestrator) processMap(mapping *process.Process_Node_Map, wf *process.Process, exec *execution.Execution, data *types.Struct) (*types.Struct, error) {
func (s *Orchestrator) processMap(outputs map[string]*process.Process_Node_Map_Output, wf *process.Process, exec *execution.Execution, data *types.Struct) (*types.Struct, error) {
result := &types.Struct{
Fields: make(map[string]*types.Value),
}
for _, output := range mapping.Outputs {
if ref := output.GetRef(); ref != nil {
node, err := wf.FindNode(ref.NodeKey)
for key, output := range outputs {
value, err := s.outputToValue(output, wf, exec, data)
if err != nil {
return nil, err
}
result.Fields[key] = value
}
return result, nil
}

func (s *Orchestrator) outputToValue(output *process.Process_Node_Map_Output, wf *process.Process, exec *execution.Execution, data *types.Struct) (*types.Value, error) {
switch v := output.GetValue().(type) {
case *process.Process_Node_Map_Output_Null_:
return &types.Value{Kind: &types.Value_NullValue{NullValue: types.NullValue_NULL_VALUE}}, nil
case *process.Process_Node_Map_Output_StringConst:
return &types.Value{Kind: &types.Value_StringValue{StringValue: v.StringConst}}, nil
case *process.Process_Node_Map_Output_DoubleConst:
return &types.Value{Kind: &types.Value_NumberValue{NumberValue: v.DoubleConst}}, nil
case *process.Process_Node_Map_Output_BoolConst:
return &types.Value{Kind: &types.Value_BoolValue{BoolValue: v.BoolConst}}, nil
case *process.Process_Node_Map_Output_Map_:
out, err := s.processMap(v.Map.Outputs, wf, exec, data)
if err != nil {
return nil, err
}
return &types.Value{Kind: &types.Value_StructValue{StructValue: out}}, nil
case *process.Process_Node_Map_Output_List_:
var values []*types.Value
for i := range v.List.Outputs {
value, err := s.outputToValue(v.List.Outputs[i], wf, exec, data)
if err != nil {
return nil, err
}
if node.GetTask() != nil {
value, err := s.resolveInput(wf.Hash, exec, ref.NodeKey, ref.Key)
if err != nil {
return nil, err
}
result.Fields[output.Key] = value
} else {
result.Fields[output.Key] = data.Fields[ref.Key]
}
} else if constant := output.GetConstant(); constant != nil {
result.Fields[output.Key] = constant

values = append(values, value)
}
return &types.Value{
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{
Values: values,
},
},
}, nil
case *process.Process_Node_Map_Output_Ref:
node, err := wf.FindNode(v.Ref.NodeKey)
if err != nil {
return nil, err
}
if node.GetTask() != nil {
return s.resolveInput(wf.Hash, exec, v.Ref.NodeKey, v.Ref.Key)
}
return data.Fields[v.Ref.Key], nil
default:
return nil, errors.New("unknown output")
}
return result, nil
}

func (s *Orchestrator) resolveInput(wfHash hash.Hash, exec *execution.Execution, nodeKey string, outputKey string) (*types.Value, error) {
Expand Down
Loading

0 comments on commit 8c6b914

Please sign in to comment.