diff --git a/e2e/api_test.go b/e2e/api_test.go index 304121e15..0e697df9f 100644 --- a/e2e/api_test.go +++ b/e2e/api_test.go @@ -4,13 +4,16 @@ import ( "context" "os" "path/filepath" + "strconv" "testing" "github.com/cosmos/cosmos-sdk/crypto/keys" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/mesg-foundation/engine/app" "github.com/mesg-foundation/engine/config" + "github.com/mesg-foundation/engine/container" "github.com/mesg-foundation/engine/cosmos" + "github.com/mesg-foundation/engine/ext/xnet" pb "github.com/mesg-foundation/engine/protobuf/api" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -18,7 +21,6 @@ import ( type apiclient struct { pb.EventClient - pb.RunnerClient } var ( @@ -29,6 +31,10 @@ var ( kb *cosmos.Keybase cfg *config.Config engineAddress sdk.AccAddress + cont container.Container + ipfsEndpoint string + engineName string + enginePort string ) func TestAPI(t *testing.T) { @@ -58,13 +64,20 @@ func TestAPI(t *testing.T) { engineAddress = acc.GetAddress() } + // init runner builder + cont, err = container.New(cfg.Name) + require.NoError(t, err) + _, port, _ := xnet.SplitHostPort(cfg.Server.Address) + enginePort = strconv.Itoa(port) + engineName = cfg.Name + ipfsEndpoint = cfg.IpfsEndpoint + // init gRPC client conn, err := grpc.DialContext(context.Background(), "localhost:50052", grpc.WithInsecure()) require.NoError(t, err) client = apiclient{ pb.NewEventClient(conn), - pb.NewRunnerClient(conn), } // run tests diff --git a/e2e/complex_service_test.go b/e2e/complex_service_test.go index 4e65c3b3c..87ac303b9 100644 --- a/e2e/complex_service_test.go +++ b/e2e/complex_service_test.go @@ -4,48 +4,81 @@ import ( "context" "testing" + "github.com/mesg-foundation/engine/ext/xos" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/protobuf/acknowledgement" pb "github.com/mesg-foundation/engine/protobuf/api" + "github.com/mesg-foundation/engine/runner/builder" + "github.com/mesg-foundation/engine/service" + runnermodule "github.com/mesg-foundation/engine/x/runner" + runnerrest "github.com/mesg-foundation/engine/x/runner/client/rest" "github.com/stretchr/testify/require" ) func testComplexService(t *testing.T) { var ( - testServiceHash hash.Hash - testRunnerHashC hash.Hash - testInstanceHash hash.Hash + testServiceComplexHash hash.Hash + testRunnerComplexHash hash.Hash + testInstanceComplexHash hash.Hash + testInstanceComplexEnvHash hash.Hash + testServiceComplexStruct *service.Service + testServiceComplexImageHash string + testInstanceComplexEnv []string ) - t.Run("create", func(t *testing.T) { + t.Run("create service", func(t *testing.T) { testComplexCreateServiceMsg.Owner = engineAddress - testServiceHash = lcdBroadcastMsg(testComplexCreateServiceMsg) + testServiceComplexHash = lcdBroadcastMsg(testComplexCreateServiceMsg) }) - stream, err := client.EventClient.Stream(context.Background(), &pb.StreamEventRequest{}) - require.NoError(t, err) - acknowledgement.WaitForStreamToBeReady(stream) + t.Run("get service", func(t *testing.T) { + lcdGet("service/get/"+testServiceComplexHash.String(), &testServiceComplexStruct) + require.Equal(t, testServiceComplexHash, testServiceComplexStruct.Hash) + }) + testInstanceComplexEnv = xos.EnvMergeSlices(testServiceComplexStruct.Configuration.Env, []string{"ENVB=is_override"}) - t.Run("run", func(t *testing.T) { - resp, err := client.RunnerClient.Create(context.Background(), &pb.CreateRunnerRequest{ - ServiceHash: testServiceHash, - Env: []string{"ENVB=is_override"}, - }) - require.NoError(t, err) - testRunnerHashC = resp.Hash + t.Run("get runner hashes", func(t *testing.T) { + var res runnerrest.HashResponse + lcdPost("runner/hash", &runnerrest.HashRequest{ + ServiceHash: testServiceComplexHash, + Address: engineAddress.String(), + Env: testInstanceComplexEnv, + }, &res) + testRunnerComplexHash = res.RunnerHash + testInstanceComplexHash = res.InstanceHash + testInstanceComplexEnvHash = res.EnvHash + }) - resp1, err := client.RunnerClient.Get(context.Background(), &pb.GetRunnerRequest{Hash: testRunnerHashC}) + t.Run("build service image", func(t *testing.T) { + var err error + testServiceComplexImageHash, err = builder.Build(cont, testServiceComplexStruct, ipfsEndpoint) require.NoError(t, err) - testInstanceHash = resp1.InstanceHash }) + t.Run("start runner", func(t *testing.T) { + require.NoError(t, builder.Start(cont, testServiceComplexStruct, testInstanceComplexHash, testRunnerComplexHash, testServiceComplexImageHash, testInstanceComplexEnv, engineName, enginePort)) + }) + + t.Run("register runner", func(t *testing.T) { + msg := runnermodule.MsgCreate{ + Owner: engineAddress, + ServiceHash: testServiceComplexHash, + EnvHash: testInstanceComplexEnvHash, + } + require.True(t, testRunnerComplexHash.Equal(lcdBroadcastMsg(msg))) + }) + + stream, err := client.EventClient.Stream(context.Background(), &pb.StreamEventRequest{}) + require.NoError(t, err) + acknowledgement.WaitForStreamToBeReady(stream) + t.Run("check events", func(t *testing.T) { okEventsNo := 6 for i := 0; i < okEventsNo; { ev, err := stream.Recv() require.NoError(t, err) - if !ev.InstanceHash.Equal(testInstanceHash) { + if !ev.InstanceHash.Equal(testInstanceComplexHash) { continue } i++ @@ -60,8 +93,14 @@ func testComplexService(t *testing.T) { }) t.Run("delete", func(t *testing.T) { - t.Skip("FIXME: this call never get trough. some issue with the service's dependency") - _, err := client.RunnerClient.Delete(context.Background(), &pb.DeleteRunnerRequest{Hash: testRunnerHashC}) - require.NoError(t, err) + t.Skip("FIXME: this test timeout on CIRCLE CI. works well on local computer") + msg := runnermodule.MsgDelete{ + Owner: engineAddress, + Hash: testRunnerComplexHash, + } + + lcdBroadcastMsg(msg) + + require.NoError(t, builder.Stop(cont, testRunnerComplexHash, testServiceComplexStruct.Dependencies)) }) } diff --git a/e2e/definition_test.go b/e2e/definition_test.go index 4da352470..4af8b0d8c 100644 --- a/e2e/definition_test.go +++ b/e2e/definition_test.go @@ -2,10 +2,10 @@ package main import ( "github.com/mesg-foundation/engine/service" - serviceModule "github.com/mesg-foundation/engine/x/service" + servicemodule "github.com/mesg-foundation/engine/x/service" ) -var testComplexCreateServiceMsg = &serviceModule.MsgCreate{ +var testComplexCreateServiceMsg = &servicemodule.MsgCreate{ Sid: "test-complex-service", Name: "test-complex-service", Dependencies: []*service.Service_Dependency{ @@ -39,7 +39,7 @@ var testComplexCreateServiceMsg = &serviceModule.MsgCreate{ Source: "QmSuVcdic2dhS5QKQGWp66SJQUkDRqAqCHpU6Sx9uXJcdc", } -var testCreateServiceMsg = &serviceModule.MsgCreate{ +var testCreateServiceMsg = &servicemodule.MsgCreate{ Sid: "test-service", Name: "test-service", Configuration: service.Service_Configuration{ diff --git a/e2e/runner_test.go b/e2e/runner_test.go index b6561ed5f..07e62c6f6 100644 --- a/e2e/runner_test.go +++ b/e2e/runner_test.go @@ -5,19 +5,51 @@ import ( "testing" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/mesg-foundation/engine/ext/xos" "github.com/mesg-foundation/engine/hash" "github.com/mesg-foundation/engine/protobuf/acknowledgement" pb "github.com/mesg-foundation/engine/protobuf/api" "github.com/mesg-foundation/engine/runner" + "github.com/mesg-foundation/engine/runner/builder" + runnermodule "github.com/mesg-foundation/engine/x/runner" runnerrest "github.com/mesg-foundation/engine/x/runner/client/rest" "github.com/stretchr/testify/require" ) -var testRunnerHash hash.Hash -var testRunnerAddress sdk.AccAddress +var ( + testRunnerHash hash.Hash + testInstanceEnvHash hash.Hash + testRunnerAddress sdk.AccAddress + testServiceImageHash string +) func testRunner(t *testing.T) { - t.Run("create", func(t *testing.T) { + var ( + testInstanceEnv = xos.EnvMergeSlices(testServiceStruct.Configuration.Env, []string{"BAR=3", "REQUIRED=4"}) + ) + t.Run("hash", func(t *testing.T) { + var res runnerrest.HashResponse + lcdPost("runner/hash", &runnerrest.HashRequest{ + ServiceHash: testServiceHash, + Address: engineAddress.String(), + Env: testInstanceEnv, + }, &res) + testRunnerHash = res.RunnerHash + testInstanceHash = res.InstanceHash + testInstanceEnvHash = res.EnvHash + }) + + t.Run("build service image", func(t *testing.T) { + var err error + testServiceImageHash, err = builder.Build(cont, testServiceStruct, ipfsEndpoint) + require.NoError(t, err) + }) + + t.Run("start", func(t *testing.T) { + require.NoError(t, builder.Start(cont, testServiceStruct, testInstanceHash, testRunnerHash, testServiceImageHash, testInstanceEnv, engineName, enginePort)) + }) + + t.Run("register", func(t *testing.T) { stream, err := client.EventClient.Stream(context.Background(), &pb.StreamEventRequest{ Filter: &pb.StreamEventRequest_Filter{ Key: "test_service_ready", @@ -26,88 +58,49 @@ func testRunner(t *testing.T) { require.NoError(t, err) acknowledgement.WaitForStreamToBeReady(stream) - resp, err := client.RunnerClient.Create(context.Background(), &pb.CreateRunnerRequest{ + msg := runnermodule.MsgCreate{ + Owner: engineAddress, ServiceHash: testServiceHash, - Env: []string{"BAR=3", "REQUIRED=4"}, - }) - require.NoError(t, err) - testRunnerHash = resp.Hash + EnvHash: testInstanceEnvHash, + } + require.True(t, testRunnerHash.Equal(lcdBroadcastMsg(msg))) // wait for service to be ready _, err = stream.Recv() require.NoError(t, err) }) - t.Run("recreate", func(t *testing.T) { - _, err := client.RunnerClient.Delete(context.Background(), &pb.DeleteRunnerRequest{Hash: testRunnerHash}) - require.NoError(t, err) - resp, err := client.RunnerClient.Create(context.Background(), &pb.CreateRunnerRequest{ - ServiceHash: testServiceHash, - Env: []string{"BAR=3", "REQUIRED=4"}, - }) - require.NoError(t, err) - testRunnerHash = resp.Hash - }) - t.Run("get", func(t *testing.T) { - t.Run("grpc", func(t *testing.T) { - resp, err := client.RunnerClient.Get(context.Background(), &pb.GetRunnerRequest{Hash: testRunnerHash}) - require.NoError(t, err) - require.Equal(t, testRunnerHash, resp.Hash) - testInstanceHash = resp.InstanceHash - }) - t.Run("lcd", func(t *testing.T) { - var r *runner.Runner - lcdGet("runner/get/"+testRunnerHash.String(), &r) - require.Equal(t, testRunnerHash, r.Hash) - testRunnerAddress = r.Address - }) + var run *runner.Runner + lcdGet("runner/get/"+testRunnerHash.String(), &run) + require.Equal(t, testRunnerHash, run.Hash) + testRunnerAddress = run.Address }) - // TODO: need to test the filters t.Run("list", func(t *testing.T) { - t.Run("grpc", func(t *testing.T) { - resp, err := client.RunnerClient.List(context.Background(), &pb.ListRunnerRequest{}) - require.NoError(t, err) - require.Len(t, resp.Runners, 1) - require.Equal(t, testInstanceHash, resp.Runners[0].InstanceHash) - require.Equal(t, testRunnerHash, resp.Runners[0].Hash) - }) - t.Run("lcd", func(t *testing.T) { - rs := make([]*runner.Runner, 0) - lcdGet("runner/list", &rs) - require.Len(t, rs, 1) - require.Equal(t, testInstanceHash, rs[0].InstanceHash) - require.Equal(t, testRunnerHash, rs[0].Hash) - }) - }) - - t.Run("hash", func(t *testing.T) { - var res runnerrest.HashResponse - lcdPost("runner/hash", &runnerrest.HashRequest{ - ServiceHash: testServiceHash, - Address: engineAddress.String(), - Env: []string{"BAR=3", "REQUIRED=4"}, - }, &res) - require.Equal(t, testRunnerHash, res.RunnerHash) - require.Equal(t, testInstanceHash, res.InstanceHash) + rs := make([]*runner.Runner, 0) + lcdGet("runner/list", &rs) + require.Len(t, rs, 1) + require.Equal(t, testInstanceHash, rs[0].InstanceHash) + require.Equal(t, testRunnerHash, rs[0].Hash) }) } func testDeleteRunner(t *testing.T) { - _, err := client.RunnerClient.Delete(context.Background(), &pb.DeleteRunnerRequest{Hash: testRunnerHash}) - require.NoError(t, err) + msg := runnermodule.MsgDelete{ + Owner: engineAddress, + Hash: testRunnerHash, + } + lcdBroadcastMsg(msg) - t.Run("grpc", func(t *testing.T) { - resp, err := client.RunnerClient.List(context.Background(), &pb.ListRunnerRequest{}) - require.NoError(t, err) - require.Len(t, resp.Runners, 0) - }) - t.Run("lcd", func(t *testing.T) { + require.NoError(t, builder.Stop(cont, testRunnerHash, testServiceStruct.Dependencies)) + + t.Run("check deletion", func(t *testing.T) { rs := make([]*runner.Runner, 0) lcdGet("runner/list", &rs) require.Len(t, rs, 0) }) + t.Run("check coins on runner", func(t *testing.T) { var coins sdk.Coins lcdGet("bank/balances/"+testRunnerAddress.String(), &coins) diff --git a/e2e/service_test.go b/e2e/service_test.go index 6070b5927..0a4930c31 100644 --- a/e2e/service_test.go +++ b/e2e/service_test.go @@ -14,6 +14,7 @@ import ( var ( testServiceHash hash.Hash testServiceAddress sdk.AccAddress + testServiceStruct *service.Service ) func testService(t *testing.T) { @@ -23,10 +24,9 @@ func testService(t *testing.T) { }) t.Run("get", func(t *testing.T) { - var s *service.Service - lcdGet("service/get/"+testServiceHash.String(), &s) - require.Equal(t, testServiceHash, s.Hash) - testServiceAddress = s.Address + lcdGet("service/get/"+testServiceHash.String(), &testServiceStruct) + require.Equal(t, testServiceHash, testServiceStruct.Hash) + testServiceAddress = testServiceStruct.Address }) t.Run("list", func(t *testing.T) { diff --git a/runner/builder/builder.go b/runner/builder/builder.go index dde206e04..9c5782ad0 100644 --- a/runner/builder/builder.go +++ b/runner/builder/builder.go @@ -64,23 +64,22 @@ func (b *Builder) Create(req *api.CreateRunnerRequest) (*runnerpb.Runner, error) } // start the container - imageHash, err := build(b.container, srv, b.ipfsEndpoint) + imageHash, err := Build(b.container, srv, b.ipfsEndpoint) if err != nil { return nil, err } - _, err = start(b.container, srv, inst.Hash, expRunnerHash, imageHash, instanceEnv, b.engineName, b.port) - if err != nil { + if err = Start(b.container, srv, inst.Hash, expRunnerHash, imageHash, instanceEnv, b.engineName, b.port); err != nil { return nil, err } run, err := b.mc.CreateRunner(req) if err != nil { - stop(b.container, expRunnerHash, srv.Dependencies) + Stop(b.container, expRunnerHash, srv.Dependencies) return nil, err } if !run.Hash.Equal(expRunnerHash) { - stop(b.container, expRunnerHash, srv.Dependencies) + Stop(b.container, expRunnerHash, srv.Dependencies) return nil, errors.New("calculated runner hash is not the same") } return run, nil @@ -109,7 +108,7 @@ func (b *Builder) Delete(req *api.DeleteRunnerRequest) error { } // stop the local running service - if err := stop(b.container, r.Hash, srv.Dependencies); err != nil { + if err := Stop(b.container, r.Hash, srv.Dependencies); err != nil { return err } diff --git a/runner/builder/container.go b/runner/builder/container.go index 459339a24..b0263459e 100644 --- a/runner/builder/container.go +++ b/runner/builder/container.go @@ -12,6 +12,7 @@ import ( "github.com/docker/docker/client" "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/idtools" "github.com/mesg-foundation/engine/container" "github.com/mesg-foundation/engine/ext/xerrors" "github.com/mesg-foundation/engine/ext/xos" @@ -20,7 +21,7 @@ import ( ) // Build the imge of the container -func build(cont container.Container, srv *service.Service, ipfsEndpoint string) (string, error) { +func Build(cont container.Container, srv *service.Service, ipfsEndpoint string) (string, error) { // download and untar service context into path. path, err := ioutil.TempDir("", "mesg") if err != nil { @@ -37,7 +38,10 @@ func build(cont container.Container, srv *service.Service, ipfsEndpoint string) } defer resp.Body.Close() - if err := archive.Untar(resp.Body, path, nil); err != nil { + if err := archive.Untar(resp.Body, path, &archive.TarOptions{ChownOpts: &idtools.Identity{ + UID: os.Geteuid(), + GID: os.Getegid()}, + }); err != nil { return "", err } @@ -51,12 +55,12 @@ func build(cont container.Container, srv *service.Service, ipfsEndpoint string) } // Start starts the service. -func start(cont container.Container, srv *service.Service, instanceHash hash.Hash, runnerHash hash.Hash, imageHash string, env []string, engineName, port string) (serviceIDs []string, err error) { +func Start(cont container.Container, srv *service.Service, instanceHash hash.Hash, runnerHash hash.Hash, imageHash string, env []string, engineName, port string) (err error) { endpoint := net.JoinHostPort(engineName, port) namespace := namespace(runnerHash) networkID, err := cont.CreateNetwork(namespace) if err != nil { - return nil, err + return err } sharedNetworkID := cont.SharedNetworkID() // BUG: https://github.com/mesg-foundation/engine/issues/382 @@ -68,7 +72,7 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has volumes := convertVolumes(srv, d.Volumes, d.Key) volumesFrom, err := convertVolumesFrom(srv, d.VolumesFrom) if err != nil { - return nil, err + return err } configs = append(configs, container.ServiceOptions{ Namespace: dependencyNamespace(namespace, d.Key), @@ -97,7 +101,7 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has volumes := convertVolumes(srv, srv.Configuration.Volumes, service.MainServiceKey) volumesFrom, err := convertVolumesFrom(srv, srv.Configuration.VolumesFrom) if err != nil { - return nil, err + return err } configs = append(configs, container.ServiceOptions{ Namespace: dependencyNamespace(namespace, service.MainServiceKey), @@ -126,21 +130,19 @@ func start(cont container.Container, srv *service.Service, instanceHash hash.Has }) // Start - serviceIDs = make([]string, 0) for _, c := range configs { - serviceID, err := cont.StartService(c) + _, err := cont.StartService(c) if err != nil { - stop(cont, runnerHash, srv.Dependencies) - return nil, err + Stop(cont, runnerHash, srv.Dependencies) + return err } - serviceIDs = append(serviceIDs, serviceID) } - return serviceIDs, nil + return nil } // Stop stops an instance. -func stop(cont container.Container, runnerHash hash.Hash, dependencies []*service.Service_Dependency) error { +func Stop(cont container.Container, runnerHash hash.Hash, dependencies []*service.Service_Dependency) error { var ( wg sync.WaitGroup errs xerrors.SyncErrors