Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement nested data map in process and change process map structure #1540

Merged
merged 9 commits into from
Dec 11, 2019
2 changes: 2 additions & 0 deletions e2e/orchestrator_event_filter_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func testOrchestratorEventFilterTask(executionStream pb.Execution_StreamClient,
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "n2", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.Equal(t, "shouldMatch", exec.Inputs.Fields["msg"].GetStringValue())
Expand All @@ -97,6 +98,7 @@ func testOrchestratorEventFilterTask(executionStream pb.Execution_StreamClient,
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "n2", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "shouldMatch", exec.Outputs.Fields["msg"].GetStringValue())
Expand Down
152 changes: 152 additions & 0 deletions e2e/orchestrator_event_map_task_map_task_omplex_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"context"
"testing"

"github.com/mesg-foundation/engine/execution"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/process"
pb "github.com/mesg-foundation/engine/protobuf/api"
"github.com/mesg-foundation/engine/protobuf/types"
"github.com/stretchr/testify/require"
)

func testOrchestratorEventMapTaskMapTaskComplexData(executionStream pb.Execution_StreamClient, instanceHash hash.Hash) func(t *testing.T) {
return func(t *testing.T) {
var (
processHash hash.Hash
dataEvent = &types.Struct{
Fields: map[string]*types.Value{
"msg": {
Kind: &types.Value_StructValue{
StructValue: &types.Struct{
Fields: map[string]*types.Value{
"msg": {
Kind: &types.Value_StringValue{
StringValue: "complex",
},
},
"timestamp": {
Kind: &types.Value_NumberValue{
NumberValue: 101,
},
},
"array": {
Kind: &types.Value_ListValue{
ListValue: &types.ListValue{Values: []*types.Value{
{Kind: &types.Value_StringValue{StringValue: "first"}},
{Kind: &types.Value_StringValue{StringValue: "second"}},
{Kind: &types.Value_StringValue{StringValue: "third"}},
}},
},
},
},
},
},
},
},
}
)
t.Run("create process", func(t *testing.T) {
respProc, err := client.ProcessClient.Create(context.Background(), &pb.CreateProcessRequest{
Name: "event-map-task-map-task-complex-data-process",
Nodes: []*process.Process_Node{
{
Key: "n0",
Type: &process.Process_Node_Event_{
Event: &process.Process_Node_Event{
InstanceHash: instanceHash,
EventKey: "test_event_complex",
},
},
},
{
Key: "n1",
Type: &process.Process_Node_Map_{
Map: &process.Process_Node_Map{
Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {
Value: &process.Process_Node_Map_Output_Map_{
Map: &process.Process_Node_Map_Output_Map{Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {Value: &process.Process_Node_Map_Output_StringConst{
StringConst: "isAConstant",
}},
"array": {Value: &process.Process_Node_Map_Output_List_{
List: &process.Process_Node_Map_Output_List{Outputs: []*process.Process_Node_Map_Output{
{Value: &process.Process_Node_Map_Output_StringConst{StringConst: "first-constant"}},
{Value: &process.Process_Node_Map_Output_StringConst{StringConst: "second-constant"}},
{Value: &process.Process_Node_Map_Output_StringConst{StringConst: "third-constant"}},
{Value: &process.Process_Node_Map_Output_StringConst{StringConst: "fourth-constant"}},
}},
}},
}},
},
},
},
},
},
},
{
Key: "n2",
Type: &process.Process_Node_Task_{
Task: &process.Process_Node_Task{
InstanceHash: instanceHash,
TaskKey: "task_complex",
},
},
},
},
Edges: []*process.Process_Edge{
{Src: "n0", Dst: "n1"},
{Src: "n1", Dst: "n2"},
},
})
require.NoError(t, err)
processHash = respProc.Hash
})
t.Run("trigger process", func(t *testing.T) {
_, err := client.EventClient.Create(context.Background(), &pb.CreateEventRequest{
InstanceHash: instanceHash,
Key: "test_event_complex",
Data: dataEvent,
})
require.NoError(t, err)
})
t.Run("first task", func(t *testing.T) {
t.Run("check in progress execution", func(t *testing.T) {
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task_complex", exec.TaskKey)
require.Equal(t, "n2", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.Equal(t, "isAConstant", exec.Inputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue())
require.Len(t, exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values, 4)
require.Equal(t, "first-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[0].GetStringValue())
require.Equal(t, "second-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[1].GetStringValue())
require.Equal(t, "third-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[2].GetStringValue())
require.Equal(t, "fourth-constant", exec.Inputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[3].GetStringValue())
})
t.Run("check completed execution", func(t *testing.T) {
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task_complex", exec.TaskKey)
require.Equal(t, "n2", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "isAConstant", exec.Outputs.Fields["msg"].GetStructValue().Fields["msg"].GetStringValue())
require.Len(t, exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values, 4)
require.Equal(t, "first-constant", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[0].GetStringValue())
require.Equal(t, "second-constant", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[1].GetStringValue())
require.Equal(t, "third-constant", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[2].GetStringValue())
require.Equal(t, "fourth-constant", exec.Outputs.Fields["msg"].GetStructValue().Fields["array"].GetListValue().Values[3].GetStringValue())
require.NotEmpty(t, exec.Outputs.Fields["msg"].GetStructValue().Fields["timestamp"].GetNumberValue())
})
})
t.Run("delete process", func(t *testing.T) {
_, err := client.ProcessClient.Delete(context.Background(), &pb.DeleteProcessRequest{Hash: processHash})
require.NoError(t, err)
})
}
}
14 changes: 6 additions & 8 deletions e2e/orchestrator_event_map_task_map_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ func testOrchestratorEventMapTaskMapTask(executionStream pb.Execution_StreamClie
Key: "n1",
Type: &process.Process_Node_Map_{
Map: &process.Process_Node_Map{
Outputs: []*process.Process_Node_Map_Output{
{
Key: "msg",
Value: &process.Process_Node_Map_Output_Constant{
Constant: &types.Value{Kind: &types.Value_StringValue{StringValue: "itsAConstant"}},
Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {
Value: &process.Process_Node_Map_Output_StringConst{
StringConst: "itsAConstant",
},
},
},
Expand All @@ -58,9 +57,8 @@ func testOrchestratorEventMapTaskMapTask(executionStream pb.Execution_StreamClie
Key: "n3",
Type: &process.Process_Node_Map_{
Map: &process.Process_Node_Map{
Outputs: []*process.Process_Node_Map_Output{
{
Key: "msg",
Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {
Value: &process.Process_Node_Map_Output_Ref{
Ref: &process.Process_Node_Map_Output_Reference{
NodeKey: "n0",
Expand Down
11 changes: 6 additions & 5 deletions e2e/orchestrator_event_map_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ func testOrchestratorEventMapTask(executionStream pb.Execution_StreamClient, ins
Key: "n1",
Type: &process.Process_Node_Map_{
Map: &process.Process_Node_Map{
Outputs: []*process.Process_Node_Map_Output{
{
Key: "msg",
Value: &process.Process_Node_Map_Output_Constant{
Constant: &types.Value{Kind: &types.Value_StringValue{StringValue: "itsAConstant"}},
Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {
Value: &process.Process_Node_Map_Output_StringConst{
StringConst: "itsAConstant",
},
},
},
Expand Down Expand Up @@ -88,6 +87,7 @@ func testOrchestratorEventMapTask(executionStream pb.Execution_StreamClient, ins
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "n2", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.Equal(t, "itsAConstant", exec.Inputs.Fields["msg"].GetStringValue())
Expand All @@ -96,6 +96,7 @@ func testOrchestratorEventMapTask(executionStream pb.Execution_StreamClient, ins
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "n2", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "itsAConstant", exec.Outputs.Fields["msg"].GetStringValue())
Expand Down
2 changes: 2 additions & 0 deletions e2e/orchestrator_event_task_complex_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func testOrchestratorEventTaskComplexData(executionStream pb.Execution_StreamCli
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task_complex", exec.TaskKey)
require.Equal(t, "n1", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.True(t, data.Equal(exec.Inputs))
Expand All @@ -98,6 +99,7 @@ func testOrchestratorEventTaskComplexData(executionStream pb.Execution_StreamCli
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task_complex", exec.TaskKey)
require.Equal(t, "n1", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.True(t, data.Equal(exec.Inputs))
Expand Down
2 changes: 2 additions & 0 deletions e2e/orchestrator_event_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func testOrchestratorEventTask(executionStream pb.Execution_StreamClient, instan
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "n1", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.Equal(t, "foo_1", exec.Inputs.Fields["msg"].GetStringValue())
Expand All @@ -80,6 +81,7 @@ func testOrchestratorEventTask(executionStream pb.Execution_StreamClient, instan
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "n1", exec.NodeKey)
require.True(t, processHash.Equal(exec.ProcessHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "foo_1", exec.Outputs.Fields["msg"].GetStringValue())
Expand Down
16 changes: 8 additions & 8 deletions e2e/orchestrator_result_map_task_map_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ func testOrchestratorResultMapTaskMapTask(executionStream pb.Execution_StreamCli
Key: "n1",
Type: &process.Process_Node_Map_{
Map: &process.Process_Node_Map{
Outputs: []*process.Process_Node_Map_Output{
{
Key: "msg",
Value: &process.Process_Node_Map_Output_Constant{
Constant: &types.Value{Kind: &types.Value_StringValue{StringValue: "itsAConstant"}},
Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {
Value: &process.Process_Node_Map_Output_StringConst{
StringConst: "itsAConstant",
},
},
},
Expand All @@ -57,9 +56,8 @@ func testOrchestratorResultMapTaskMapTask(executionStream pb.Execution_StreamCli
Key: "n3",
Type: &process.Process_Node_Map_{
Map: &process.Process_Node_Map{
Outputs: []*process.Process_Node_Map_Output{
{
Key: "msg",
Outputs: map[string]*process.Process_Node_Map_Output{
"msg": {
Value: &process.Process_Node_Map_Output_Ref{
Ref: &process.Process_Node_Map_Output_Reference{
NodeKey: "n0",
Expand Down Expand Up @@ -113,6 +111,7 @@ func testOrchestratorResultMapTaskMapTask(executionStream pb.Execution_StreamCli
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task2", exec.TaskKey)
require.Equal(t, "", exec.NodeKey)
require.True(t, hash.Int(11010101011).Equal(exec.EventHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.True(t, exec.Inputs.Equal(&types.Struct{
Expand All @@ -129,6 +128,7 @@ func testOrchestratorResultMapTaskMapTask(executionStream pb.Execution_StreamCli
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task2", exec.TaskKey)
require.Equal(t, "", exec.NodeKey)
require.True(t, hash.Int(11010101011).Equal(exec.EventHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "foo_result", exec.Outputs.Fields["msg"].GetStringValue())
Expand Down
4 changes: 4 additions & 0 deletions e2e/orchestrator_result_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func testOrchestratorResultTask(executionStream pb.Execution_StreamClient, runne
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "", exec.NodeKey)
require.True(t, hash.Int(11010101011).Equal(exec.EventHash))
require.Equal(t, execution.Status_InProgress, exec.Status)
require.True(t, exec.Inputs.Equal(&types.Struct{
Expand All @@ -84,6 +85,7 @@ func testOrchestratorResultTask(executionStream pb.Execution_StreamClient, runne
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task1", exec.TaskKey)
require.Equal(t, "", exec.NodeKey)
require.True(t, hash.Int(11010101011).Equal(exec.EventHash))
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "foo_2", exec.Outputs.Fields["msg"].GetStringValue())
Expand All @@ -94,6 +96,7 @@ func testOrchestratorResultTask(executionStream pb.Execution_StreamClient, runne
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task2", exec.TaskKey)
require.Equal(t, "n1", exec.NodeKey)
require.Equal(t, processHash, exec.ProcessHash)
require.Equal(t, execution.Status_InProgress, exec.Status)
require.Equal(t, "foo_2", exec.Inputs.Fields["msg"].GetStringValue())
Expand All @@ -102,6 +105,7 @@ func testOrchestratorResultTask(executionStream pb.Execution_StreamClient, runne
exec, err := executionStream.Recv()
require.NoError(t, err)
require.Equal(t, "task2", exec.TaskKey)
require.Equal(t, "n1", exec.NodeKey)
require.Equal(t, processHash, exec.ProcessHash)
require.Equal(t, execution.Status_Completed, exec.Status)
require.Equal(t, "foo_2", exec.Outputs.Fields["msg"].GetStringValue())
Expand Down
3 changes: 3 additions & 0 deletions e2e/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ func testOrchestrator(t *testing.T) {
t.Run("result map task map task", testOrchestratorResultMapTaskMapTask(executionStream, testRunnerHash, testInstanceHash))
t.Run("event map task map task", testOrchestratorEventMapTaskMapTask(executionStream, testInstanceHash))
t.Run("event task complex data", testOrchestratorEventTaskComplexData(executionStream, testInstanceHash))
t.Run("event map task map task complex data", testOrchestratorEventMapTaskMapTaskComplexData(executionStream, testInstanceHash))

// to execute last because of go routine leak. See fixme in following function
t.Run("event filter task", testOrchestratorEventFilterTask(executionStream, testInstanceHash))
}
Loading