Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update process structure: rename process's key to name and move node.*.key into node #1536

Merged
merged 4 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 0 additions & 97 deletions database/process_db_test.go

This file was deleted.

8 changes: 4 additions & 4 deletions e2e/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ var testProcessHash hash.Hash

func testProcess(t *testing.T) {
req := &pb.CreateProcessRequest{
Key: "test-process",
Name: "test-process",
Nodes: []*process.Process_Node{
{
Key: "n0",
Type: &process.Process_Node_Event_{
Event: &process.Process_Node_Event{
Key: "n0",
InstanceHash: testInstanceHash,
EventKey: "test_service_ready",
},
},
},
{
Key: "n1",
Type: &process.Process_Node_Task_{
Task: &process.Process_Node_Task{
Key: "n1",
InstanceHash: testInstanceHash,
TaskKey: "test_service_ready",
},
Expand All @@ -53,7 +53,7 @@ func testProcess(t *testing.T) {
require.NoError(t, err)
require.True(t, p.Equal(&process.Process{
Hash: p.Hash,
Key: req.Key,
Name: req.Name,
Nodes: req.Nodes,
Edges: req.Edges,
}))
Expand Down
4 changes: 2 additions & 2 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
)

// New returns a new execution. It returns an error if inputs are invalid.
func New(processHash, instanceHash, parentHash, eventHash hash.Hash, stepID string, taskKey string, inputs *types.Struct, tags []string, executorHash hash.Hash) *Execution {
func New(processHash, instanceHash, parentHash, eventHash hash.Hash, nodeKey, taskKey string, inputs *types.Struct, tags []string, executorHash hash.Hash) *Execution {
exec := &Execution{
ProcessHash: processHash,
EventHash: eventHash,
InstanceHash: instanceHash,
ParentHash: parentHash,
Inputs: inputs,
TaskKey: taskKey,
StepID: stepID,
NodeKey: nodeKey,
Tags: tags,
Status: Status_Created,
ExecutorHash: executorHash,
Expand Down
76 changes: 38 additions & 38 deletions execution/execution.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func (s *Orchestrator) dependencyFilter(exec *execution.Execution) func(wf *proc
if !exec.ProcessHash.Equal(wf.Hash) {
return false, nil
}
parents := wf.ParentIDs(node.ID())
parents := wf.ParentKeys(node.Key)
if len(parents) == 0 {
return false, nil
}
if len(parents) > 1 {
return false, fmt.Errorf("multi parents not supported")
}
return parents[0] == exec.StepID, nil
return parents[0] == exec.NodeKey, nil
}
}

Expand Down Expand Up @@ -121,12 +121,12 @@ func (s *Orchestrator) execute(filter func(wf *process.Process, node *process.Pr

func (s *Orchestrator) executeNode(wf *process.Process, n *process.Process_Node, exec *execution.Execution, event *event.Event, data *types.Struct) error {
logrus.WithField("module", "orchestrator").
WithField("nodeID", n.ID()).
WithField("node.key", n.Key).
WithField("type", fmt.Sprintf("%T", n)).Debug("process process")
if task := n.GetTask(); task != nil {
// This returns directly because a task cannot process its children.
// Children will be processed only when the execution is done and the dependencies are resolved
return s.processTask(task, wf, exec, event, data)
return s.processTask(n.Key, task, wf, exec, event, data)
} else if m := n.GetMap(); m != nil {
var err error
data, err = s.processMap(m, wf, exec, data)
Expand All @@ -138,7 +138,7 @@ func (s *Orchestrator) executeNode(wf *process.Process, n *process.Process_Node,
return nil
}
}
for _, childrenID := range wf.ChildrenIDs(n.ID()) {
for _, childrenID := range wf.ChildrenKeys(n.Key) {
children, err := wf.FindNode(childrenID)
if err != nil {
// does not return an error to continue to process other tasks if needed
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *Orchestrator) resolveInput(wfHash hash.Hash, exec *execution.Execution,
if !wfHash.Equal(exec.ProcessHash) {
return nil, fmt.Errorf("reference's nodeKey not found")
}
if exec.StepID != nodeKey {
if exec.NodeKey != nodeKey {
parent, err := s.execution.Get(exec.ParentHash)
if err != nil {
return nil, err
Expand All @@ -193,7 +193,7 @@ func (s *Orchestrator) resolveInput(wfHash hash.Hash, exec *execution.Execution,
return exec.Outputs.Fields[outputKey], nil
}

func (s *Orchestrator) processTask(task *process.Process_Node_Task, wf *process.Process, exec *execution.Execution, event *event.Event, data *types.Struct) error {
func (s *Orchestrator) processTask(nodeKey string, task *process.Process_Node_Task, wf *process.Process, exec *execution.Execution, event *event.Event, data *types.Struct) error {
var eventHash, execHash hash.Hash
if event != nil {
eventHash = event.Hash
Expand All @@ -215,7 +215,7 @@ func (s *Orchestrator) processTask(task *process.Process_Node_Task, wf *process.
ProcessHash: wf.Hash,
EventHash: eventHash,
ParentHash: execHash,
StepID: task.Key,
NodeKey: nodeKey,
TaskKey: task.TaskKey,
Inputs: data,
ExecutorHash: executor.Hash,
Expand Down
Loading