diff --git a/config/config.go b/config/config.go index 158346287..aa09c7a11 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,7 @@ type Config struct { Database struct { ServiceRelativePath string + InstanceRelativePath string ExecutionRelativePath string } } @@ -81,6 +82,7 @@ func New() (*Config, error) { c.Core.Name = "engine" c.Core.Path = filepath.Join(home, ".mesg") c.Core.Database.ServiceRelativePath = filepath.Join("database", "services", serviceDBVersion) + c.Core.Database.InstanceRelativePath = filepath.Join("database", "instances", serviceDBVersion) c.Core.Database.ExecutionRelativePath = filepath.Join("database", "executions", executionDBVersion) c.Docker.Core.Path = "/mesg" c.Docker.Socket = "/var/run/docker.sock" diff --git a/core/main.go b/core/main.go index 962b9a62b..a814eacbd 100644 --- a/core/main.go +++ b/core/main.go @@ -38,6 +38,12 @@ func initDependencies() (*dependencies, error) { return nil, err } + // init instance db. + instanceDB, err := database.NewInstanceDB(filepath.Join(config.Core.Path, config.Core.Database.InstanceRelativePath)) + if err != nil { + return nil, err + } + // init execution db. executionDB, err := database.NewExecutionDB(filepath.Join(config.Core.Path, config.Core.Database.ExecutionRelativePath)) if err != nil { @@ -54,7 +60,7 @@ func initDependencies() (*dependencies, error) { m := dockermanager.New(c) // init sdk. - sdk := sdk.New(m, c, serviceDB, executionDB) + sdk := sdk.New(m, c, serviceDB, instanceDB, executionDB) return &dependencies{ config: config, diff --git a/database/instance_db.go b/database/instance_db.go new file mode 100644 index 000000000..718b45475 --- /dev/null +++ b/database/instance_db.go @@ -0,0 +1,104 @@ +package database + +import ( + "encoding/json" + + "github.com/mesg-foundation/core/service" + "github.com/syndtr/goleveldb/leveldb" +) + +// InstanceDB describes the API of Instance database. +type InstanceDB interface { + // Get retrives instance by instance hash. + Get(hash string) (*service.Instance, error) + + // Save saves instance to database. + Save(i *service.Instance) error + + // Close closes underlying database connection. + Close() error +} + +// LevelDBInstanceDB is a database for storing services' instances. +type LevelDBInstanceDB struct { + db *leveldb.DB +} + +// NewInstanceDB returns the database which is located under given path. +func NewInstanceDB(path string) (*LevelDBInstanceDB, error) { + db, err := leveldb.OpenFile(path, nil) + if err != nil { + return nil, err + } + return &LevelDBInstanceDB{db: db}, nil +} + +// marshal returns the byte slice from service. +func (d *LevelDBInstanceDB) marshal(i *service.Instance) ([]byte, error) { + return json.Marshal(i) +} + +// unmarshal returns the service from byte slice. +func (d *LevelDBInstanceDB) unmarshal(id string, value []byte) (*service.Instance, error) { + var s service.Instance + if err := json.Unmarshal(value, &s); err != nil { + return nil, &DecodeError{ID: id} + } + return &s, nil +} + +// Get retrives instance by instance hash. +func (d *LevelDBInstanceDB) Get(hash string) (*service.Instance, error) { + tx, err := d.db.OpenTransaction() + if err != nil { + return nil, err + } + b, err := tx.Get([]byte(hash), nil) + if err != nil { + tx.Discard() + if err == leveldb.ErrNotFound { + return nil, &ErrNotFound{ID: hash} + } + return nil, err + } + i, err := d.unmarshal(hash, b) + if err != nil { + tx.Discard() + return nil, err + } + return i, tx.Commit() +} + +// Save saves instance to database. +func (d *LevelDBInstanceDB) Save(i *service.Instance) error { + // check service + if i.Hash == "" { + return errCannotSaveWithoutHash + } + + // open database transaction + tx, err := d.db.OpenTransaction() + if err != nil { + return err + } + + // encode service + b, err := d.marshal(i) + if err != nil { + tx.Discard() + return err + } + + // save instance with hash. + if err := tx.Put([]byte(i.Hash), b, nil); err != nil { + tx.Discard() + return err + } + + return tx.Commit() +} + +// Close closes database. +func (d *LevelDBInstanceDB) Close() error { + return d.db.Close() +} diff --git a/protobuf/api/instance.pb.go b/protobuf/api/instance.pb.go new file mode 100644 index 000000000..147ac169c --- /dev/null +++ b/protobuf/api/instance.pb.go @@ -0,0 +1,222 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protobuf/api/instance.proto + +package api + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type CreateInstanceRequest struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Env []string `protobuf:"bytes,2,rep,name=env,proto3" json:"env,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateInstanceRequest) Reset() { *m = CreateInstanceRequest{} } +func (m *CreateInstanceRequest) String() string { return proto.CompactTextString(m) } +func (*CreateInstanceRequest) ProtoMessage() {} +func (*CreateInstanceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_instance_50bf97d913e2b93f, []int{0} +} +func (m *CreateInstanceRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateInstanceRequest.Unmarshal(m, b) +} +func (m *CreateInstanceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateInstanceRequest.Marshal(b, m, deterministic) +} +func (dst *CreateInstanceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateInstanceRequest.Merge(dst, src) +} +func (m *CreateInstanceRequest) XXX_Size() int { + return xxx_messageInfo_CreateInstanceRequest.Size(m) +} +func (m *CreateInstanceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CreateInstanceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateInstanceRequest proto.InternalMessageInfo + +func (m *CreateInstanceRequest) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *CreateInstanceRequest) GetEnv() []string { + if m != nil { + return m.Env + } + return nil +} + +type CreateInstanceResponse struct { + Sid string `protobuf:"bytes,1,opt,name=sid,proto3" json:"sid,omitempty"` + Hash string `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` + ServiceHash string `protobuf:"bytes,3,opt,name=serviceHash,proto3" json:"serviceHash,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateInstanceResponse) Reset() { *m = CreateInstanceResponse{} } +func (m *CreateInstanceResponse) String() string { return proto.CompactTextString(m) } +func (*CreateInstanceResponse) ProtoMessage() {} +func (*CreateInstanceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_instance_50bf97d913e2b93f, []int{1} +} +func (m *CreateInstanceResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateInstanceResponse.Unmarshal(m, b) +} +func (m *CreateInstanceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateInstanceResponse.Marshal(b, m, deterministic) +} +func (dst *CreateInstanceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateInstanceResponse.Merge(dst, src) +} +func (m *CreateInstanceResponse) XXX_Size() int { + return xxx_messageInfo_CreateInstanceResponse.Size(m) +} +func (m *CreateInstanceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_CreateInstanceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateInstanceResponse proto.InternalMessageInfo + +func (m *CreateInstanceResponse) GetSid() string { + if m != nil { + return m.Sid + } + return "" +} + +func (m *CreateInstanceResponse) GetHash() string { + if m != nil { + return m.Hash + } + return "" +} + +func (m *CreateInstanceResponse) GetServiceHash() string { + if m != nil { + return m.ServiceHash + } + return "" +} + +func init() { + proto.RegisterType((*CreateInstanceRequest)(nil), "api.CreateInstanceRequest") + proto.RegisterType((*CreateInstanceResponse)(nil), "api.CreateInstanceResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// InstanceClient is the client API for Instance service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type InstanceClient interface { + Create(ctx context.Context, in *CreateInstanceRequest, opts ...grpc.CallOption) (*CreateInstanceResponse, error) +} + +type instanceClient struct { + cc *grpc.ClientConn +} + +func NewInstanceClient(cc *grpc.ClientConn) InstanceClient { + return &instanceClient{cc} +} + +func (c *instanceClient) Create(ctx context.Context, in *CreateInstanceRequest, opts ...grpc.CallOption) (*CreateInstanceResponse, error) { + out := new(CreateInstanceResponse) + err := c.cc.Invoke(ctx, "/api.Instance/Create", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// InstanceServer is the server API for Instance service. +type InstanceServer interface { + Create(context.Context, *CreateInstanceRequest) (*CreateInstanceResponse, error) +} + +func RegisterInstanceServer(s *grpc.Server, srv InstanceServer) { + s.RegisterService(&_Instance_serviceDesc, srv) +} + +func _Instance_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateInstanceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InstanceServer).Create(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/api.Instance/Create", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InstanceServer).Create(ctx, req.(*CreateInstanceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Instance_serviceDesc = grpc.ServiceDesc{ + ServiceName: "api.Instance", + HandlerType: (*InstanceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Create", + Handler: _Instance_Create_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "protobuf/api/instance.proto", +} + +func init() { + proto.RegisterFile("protobuf/api/instance.proto", fileDescriptor_instance_50bf97d913e2b93f) +} + +var fileDescriptor_instance_50bf97d913e2b93f = []byte{ + // 193 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2e, 0x28, 0xca, 0x2f, + 0xc9, 0x4f, 0x2a, 0x4d, 0xd3, 0x4f, 0x2c, 0xc8, 0xd4, 0xcf, 0xcc, 0x2b, 0x2e, 0x49, 0xcc, 0x4b, + 0x4e, 0xd5, 0x03, 0x8b, 0x0a, 0x31, 0x27, 0x16, 0x64, 0x2a, 0x59, 0x72, 0x89, 0x3a, 0x17, 0xa5, + 0x26, 0x96, 0xa4, 0x7a, 0x42, 0x25, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0xf8, 0xb8, + 0x98, 0x32, 0x53, 0x24, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0x98, 0x32, 0x53, 0x84, 0x04, 0xb8, + 0x98, 0x53, 0xf3, 0xca, 0x24, 0x98, 0x14, 0x98, 0x35, 0x38, 0x83, 0x40, 0x4c, 0xa5, 0x04, 0x2e, + 0x31, 0x74, 0xad, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, 0x20, 0xb5, 0xc5, 0x70, 0xcd, 0x20, 0xa6, + 0x90, 0x10, 0x17, 0x4b, 0x46, 0x62, 0x71, 0x86, 0x04, 0x13, 0x58, 0x08, 0xcc, 0x16, 0x52, 0xe0, + 0xe2, 0x2e, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xf5, 0x00, 0x49, 0x31, 0x83, 0xa5, 0x90, 0x85, + 0x8c, 0xfc, 0xb9, 0x38, 0x60, 0x66, 0x0b, 0x39, 0x73, 0xb1, 0x41, 0x6c, 0x13, 0x92, 0xd2, 0x4b, + 0x2c, 0xc8, 0xd4, 0xc3, 0xea, 0x6a, 0x29, 0x69, 0xac, 0x72, 0x10, 0x67, 0x29, 0x31, 0x24, 0xb1, + 0x81, 0x7d, 0x6e, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xbc, 0xfc, 0x91, 0xfd, 0x18, 0x01, 0x00, + 0x00, +} diff --git a/protobuf/api/instance.proto b/protobuf/api/instance.proto new file mode 100644 index 000000000..91a017300 --- /dev/null +++ b/protobuf/api/instance.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package api; + +service Instance { + rpc Create (CreateInstanceRequest) returns (CreateInstanceResponse) {} +} + +message CreateInstanceRequest { + string id = 1; // Service's sid or hash. + repeated string env = 2; // Env vars to apply to service's instance on runtime. +} + +message CreateInstanceResponse { + string sid = 1; // Service's id. + string hash = 2; // Service's instance hash. + string serviceHash = 3; // Service's bare hash. +} diff --git a/scripts/build-proto.sh b/scripts/build-proto.sh index af6d2dd10..fd346721b 100755 --- a/scripts/build-proto.sh +++ b/scripts/build-proto.sh @@ -9,6 +9,7 @@ GRPC_PLUGIN="--go_out=plugins=grpc,paths=source_relative:." protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/definition/execution.proto protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/definition/service.proto protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/api/service.proto +protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/api/instance.proto protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/coreapi/api.proto protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/api/api.proto protoc $GRPC_PLUGIN --proto_path=$PROJECT $GRPC/serviceapi/api.proto diff --git a/sdk/instance/instance.go b/sdk/instance/instance.go new file mode 100644 index 000000000..e154d9562 --- /dev/null +++ b/sdk/instance/instance.go @@ -0,0 +1,117 @@ +package instancesdk + +import ( + "crypto/sha256" + "errors" + "io/ioutil" + "net/http" + "os" + + "github.com/docker/docker/pkg/archive" + "github.com/mesg-foundation/core/container" + "github.com/mesg-foundation/core/database" + "github.com/mesg-foundation/core/service" + "github.com/mesg-foundation/core/service/manager" + "github.com/mesg-foundation/core/x/xos" + "github.com/mr-tron/base58" +) + +// Instance exposes service instance APIs of MESG. +type Instance struct { + m manager.Manager + container container.Container + db database.ServiceDB + instanceDB database.InstanceDB +} + +// New creates a new Instance SDK with given options. +func New(m manager.Manager, c container.Container, db database.ServiceDB, instanceDB database.InstanceDB) *Instance { + return &Instance{ + m: m, + container: c, + db: db, + instanceDB: instanceDB, + } +} + +// Create creates a new service instance for service with id(sid/hash) and applies given env vars. +func (i *Instance) Create(id string, env []string) (*service.Instance, error) { + // get the service from service db. + srv, err := i.db.Get(id) + if err != nil { + return nil, err + } + + // download and untar service context into path. + path, err := ioutil.TempDir("", "mesg") + if err != nil { + return nil, err + } + defer os.RemoveAll(path) + + resp, err := http.Get("http://ipfs.app.mesg.com:8080/ipfs/" + srv.Source) + if err != nil { + return nil, err + } + if resp.StatusCode != 200 { + return nil, errors.New("service's source code is not reachable") + } + defer resp.Body.Close() + + if err := archive.Untar(resp.Body, path, nil); err != nil { + return nil, err + } + + // build service's Docker image and apply to service. + imageHash, err := i.container.Build(path) + if err != nil { + return nil, err + } + + // overwrite default env vars with user defined ones. + instanceEnv := xos.EnvMergeMaps(xos.EnvSliceToMap(srv.Configuration.Env), xos.EnvSliceToMap(env)) + + // calculate instance's hash. + h := sha256.New() + h.Write([]byte(srv.Hash)) + h.Write([]byte(xos.EnvMapToString(instanceEnv))) + instanceHash := base58.Encode(h.Sum(nil)) + + // check if instance is already running. + _, err = i.instanceDB.Get(instanceHash) + if err == nil { + return nil, errors.New("service's instance is already running") + } + if !database.IsErrNotFound(err) { + return nil, err + } + + // save & start instance. + o := &service.Instance{ + Sid: srv.Sid, + Hash: instanceHash, + ServiceHash: srv.Hash, + ImageHash: imageHash, + } + if err := i.instanceDB.Save(o); err != nil { + return nil, err + } + + srv.Hash = instanceHash + srv.Configuration.Image = imageHash + srv.Configuration.Env = xos.EnvMapToSlice(instanceEnv) + serviceIDs, networkID, err := i.m.Start(srv) + if err != nil { + return nil, err + } + + // TODO(ilgooz) this should be set by Manager.Start(), by the underlying + // container orchestration tool. + o.COType = "docker" + o.COInfo = struct { + ServiceIDs []string + NetworkID string + }{serviceIDs, networkID} + + return o, i.instanceDB.Save(o) +} diff --git a/sdk/sdk.go b/sdk/sdk.go index 280214d4d..f605c273f 100644 --- a/sdk/sdk.go +++ b/sdk/sdk.go @@ -9,6 +9,7 @@ import ( "github.com/mesg-foundation/core/database" "github.com/mesg-foundation/core/event" "github.com/mesg-foundation/core/execution" + instancesdk "github.com/mesg-foundation/core/sdk/instance" servicesdk "github.com/mesg-foundation/core/sdk/service" "github.com/mesg-foundation/core/service" "github.com/mesg-foundation/core/service/manager" @@ -18,7 +19,8 @@ import ( // SDK exposes all functionalities of MESG core. type SDK struct { - Service *servicesdk.Service + Service *servicesdk.Service + Instance *instancesdk.Instance ps *pubsub.PubSub @@ -29,9 +31,10 @@ type SDK struct { } // New creates a new SDK with given options. -func New(m manager.Manager, c container.Container, db database.ServiceDB, execDB database.ExecutionDB) *SDK { +func New(m manager.Manager, c container.Container, db database.ServiceDB, instanceDB database.InstanceDB, execDB database.ExecutionDB) *SDK { return &SDK{ Service: servicesdk.New(m, c, db, execDB), + Instance: instancesdk.New(m, c, db, instanceDB), ps: pubsub.New(0), m: m, container: c, @@ -61,7 +64,7 @@ func (sdk *SDK) StartService(serviceID string) error { if err != nil { return err } - _, err = sdk.m.Start(s) + _, _, err = sdk.m.Start(s) return err } diff --git a/sdk/sdk_test.go b/sdk/sdk_test.go index 123d8848e..a5f1aa2bb 100644 --- a/sdk/sdk_test.go +++ b/sdk/sdk_test.go @@ -15,13 +15,15 @@ import ( ) const ( - servicedbname = "service.db.test" - execdbname = "exec.db.test" + servicedbname = "service.db.test" + instancedbname = "instance.db.test" + execdbname = "exec.db.test" ) type apiTesting struct { *testing.T serviceDB *database.LevelDBServiceDB + instanceDB *database.LevelDBInstanceDB executionDB *database.LevelDBExecutionDB containerMock *mocks.Container } @@ -30,6 +32,7 @@ func (t *apiTesting) close() { require.NoError(t, t.serviceDB.Close()) require.NoError(t, t.executionDB.Close()) require.NoError(t, os.RemoveAll(servicedbname)) + require.NoError(t, os.RemoveAll(instancedbname)) require.NoError(t, os.RemoveAll(execdbname)) } @@ -40,14 +43,18 @@ func newTesting(t *testing.T) (*SDK, *apiTesting) { db, err := database.NewServiceDB(servicedbname) require.NoError(t, err) + instanceDB, err := database.NewInstanceDB(instancedbname) + require.NoError(t, err) + execDB, err := database.NewExecutionDB(execdbname) require.NoError(t, err) - a := New(m, containerMock, db, execDB) + a := New(m, containerMock, db, instanceDB, execDB) return a, &apiTesting{ T: t, serviceDB: db, + instanceDB: instanceDB, executionDB: execDB, containerMock: containerMock, } diff --git a/server/grpc/core/test_test.go b/server/grpc/core/test_test.go index 2dee59d4b..c3c65a3aa 100644 --- a/server/grpc/core/test_test.go +++ b/server/grpc/core/test_test.go @@ -22,8 +22,9 @@ var ( ) const ( - servicedbname = "service.db.test" - execdbname = "exec.db.test" + servicedbname = "service.db.test" + instancedbname = "instance.db.test" + execdbname = "exec.db.test" ) func newServerWithContainer(t *testing.T, c container.Container) (*Server, func()) { @@ -32,17 +33,22 @@ func newServerWithContainer(t *testing.T, c container.Container) (*Server, func( db, err := database.NewServiceDB(servicedbname) require.NoError(t, err) + instanceDB, err := database.NewInstanceDB(instancedbname) + require.NoError(t, err) + execDB, err := database.NewExecutionDB(execdbname) require.NoError(t, err) - a := sdk.New(m, c, db, execDB) + a := sdk.New(m, c, db, instanceDB, execDB) server := NewServer(a) closer := func() { db.Close() + instanceDB.Close() execDB.Close() os.RemoveAll(servicedbname) + os.RemoveAll(instancedbname) os.RemoveAll(execdbname) } return server, closer diff --git a/server/grpc/instance.go b/server/grpc/instance.go new file mode 100644 index 000000000..c2e682421 --- /dev/null +++ b/server/grpc/instance.go @@ -0,0 +1,31 @@ +package grpc + +import ( + "context" + + protobuf_api "github.com/mesg-foundation/core/protobuf/api" + "github.com/mesg-foundation/core/sdk" +) + +// InstanceServer is the type to aggregate all Instance APIs. +type InstanceServer struct { + sdk *sdk.SDK +} + +// NewInstanceServer creates a new ServiceServer. +func NewInstanceServer(sdk *sdk.SDK) *InstanceServer { + return &InstanceServer{sdk: sdk} +} + +// Create creates a new instance from service. +func (s *InstanceServer) Create(ctx context.Context, request *protobuf_api.CreateInstanceRequest) (*protobuf_api.CreateInstanceResponse, error) { + i, err := s.sdk.Instance.Create(request.Id, request.Env) + if err != nil { + return nil, err + } + return &protobuf_api.CreateInstanceResponse{ + Sid: i.Sid, + Hash: i.Hash, + ServiceHash: i.ServiceHash, + }, nil +} diff --git a/server/grpc/server.go b/server/grpc/server.go index 0be0376f8..b0799e703 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -104,11 +104,13 @@ func (s *Server) Close() { func (s *Server) register() error { coreServer := core.NewServer(s.sdk) coreServiceServer := NewServiceServer(s.sdk) + coreInstanceServer := NewInstanceServer(s.sdk) serviceServer := service.NewServer(s.sdk) serviceapi.RegisterServiceServer(s.instance, serviceServer) coreapi.RegisterCoreServer(s.instance, coreServer) protobuf_api.RegisterServiceXServer(s.instance, coreServiceServer) + protobuf_api.RegisterInstanceServer(s.instance, coreInstanceServer) reflection.Register(s.instance) return nil diff --git a/server/grpc/service/test_test.go b/server/grpc/service/test_test.go index d4880023e..5c4ea6932 100644 --- a/server/grpc/service/test_test.go +++ b/server/grpc/service/test_test.go @@ -21,14 +21,18 @@ var ( ) const ( - servicedbname = "service.db.test" - execdbname = "exec.db.test" + servicedbname = "service.db.test" + instancedbname = "instance.db.test" + execdbname = "exec.db.test" ) func newServer(t *testing.T) (*Server, func()) { db, err := database.NewServiceDB(servicedbname) require.NoError(t, err) + instanceDB, err := database.NewInstanceDB(instancedbname) + require.NoError(t, err) + execDB, err := database.NewExecutionDB(execdbname) require.NoError(t, err) @@ -37,13 +41,15 @@ func newServer(t *testing.T) (*Server, func()) { m := dockermanager.New(c) // TODO(ilgooz): create mocks from manager.Manager and use instead. - a := sdk.New(m, c, db, execDB) + a := sdk.New(m, c, db, instanceDB, execDB) server := NewServer(a) closer := func() { require.NoError(t, db.Close()) + require.NoError(t, instanceDB.Close()) require.NoError(t, execDB.Close()) require.NoError(t, os.RemoveAll(servicedbname)) + require.NoError(t, os.RemoveAll(instancedbname)) require.NoError(t, os.RemoveAll(execdbname)) } diff --git a/service/manager/dockermanager/start.go b/service/manager/dockermanager/start.go index 4dd893300..ffd38068f 100644 --- a/service/manager/dockermanager/start.go +++ b/service/manager/dockermanager/start.go @@ -14,29 +14,29 @@ import ( ) // Start starts the service. -func (m *DockerManager) Start(s *service.Service) (serviceIDs []string, err error) { +func (m *DockerManager) Start(s *service.Service) (serviceIDs []string, networkID string, err error) { status, err := m.Status(s) if err != nil || status == service.RUNNING { - return nil, err //TODO: if the service is already running, serviceIDs should be returned. + return nil, "", err //TODO: if the service is already running, serviceIDs should be returned. } // If there is one but not all services running stop to restart all if status == service.PARTIAL { if err := m.Stop(s); err != nil { - return nil, err + return nil, "", err } } sNamespace := serviceNamespace(s.Hash) - networkID, err := m.c.CreateNetwork(sNamespace) + networkID, err = m.c.CreateNetwork(sNamespace) if err != nil { - return nil, err + return nil, "", err } sharedNetworkID, err := m.c.SharedNetworkID() if err != nil { - return nil, err + return nil, "", err } conf, err := config.Global() if err != nil { - return nil, err + return nil, "", err } _, port, _ := xnet.SplitHostPort(conf.Server.Address) endpoint := conf.Core.Name + ":" + strconv.Itoa(port) @@ -51,7 +51,7 @@ func (m *DockerManager) Start(s *service.Service) (serviceIDs []string, err erro volumes := extractVolumes(s, d) volumesFrom, err := extractVolumesFrom(s, d) if err != nil { - return nil, err + return nil, "", err } serviceID, err := m.c.StartService(container.ServiceOptions{ Namespace: dependencyNamespace(sNamespace, d.Key), @@ -78,11 +78,11 @@ func (m *DockerManager) Start(s *service.Service) (serviceIDs []string, err erro }) if err != nil { m.Stop(s) - return nil, err + return nil, "", err } serviceIDs = append(serviceIDs, serviceID) } - return serviceIDs, nil + return serviceIDs, networkID, nil } func extractPorts(d *service.Dependency) []container.Port { diff --git a/service/manager/dockermanager/start_integration_test.go b/service/manager/dockermanager/start_integration_test.go index c15ad43e6..b2868599f 100644 --- a/service/manager/dockermanager/start_integration_test.go +++ b/service/manager/dockermanager/start_integration_test.go @@ -24,7 +24,7 @@ func TestIntegrationStartServiceIntegration(t *testing.T) { c = newIntegrationContainer(t) m = New(c) ) - dockerServices, err := m.Start(s) + dockerServices, _, err := m.Start(s) defer m.Stop(s) require.NoError(t, err) require.Equal(t, len(s.Dependencies), len(dockerServices)) @@ -51,7 +51,7 @@ func TestIntegrationStartWith2DependenciesIntegration(t *testing.T) { c = newIntegrationContainer(t) m = New(c) ) - servicesID, err := m.Start(service) + servicesID, _, err := m.Start(service) defer m.Stop(service) require.NoError(t, err) require.Equal(t, 2, len(servicesID)) @@ -82,7 +82,7 @@ func TestIntegrationStartAgainService(t *testing.T) { m.Start(s) defer m.Stop(s) - dockerServices, err := m.Start(s) + dockerServices, _, err := m.Start(s) require.NoError(t, err) require.Equal(t, len(dockerServices), 0) // 0 because already started so no new one to start status, _ := m.Status(s) @@ -163,7 +163,7 @@ func TestIntegrationStartStopStart(t *testing.T) { m.Start(s) m.Stop(s) - dockerServices, err := m.Start(s) + dockerServices, _, err := m.Start(s) defer m.Stop(s) require.NoError(t, err) require.Equal(t, len(dockerServices), 1) @@ -199,11 +199,11 @@ func TestIntegrationServiceDependenciesListensFromSamePort(t *testing.T) { m = New(c) ) - _, err := m.Start(s) + _, _, err := m.Start(s) require.NoError(t, err) defer m.Stop(s) - _, err = m.Start(s1) + _, _, err = m.Start(s1) require.NotZero(t, err) require.Contains(t, err.Error(), `port '80' is already in use`) } @@ -230,6 +230,6 @@ func TestStartWithSamePorts(t *testing.T) { m = New(c) ) - _, err := m.Start(service) + _, _, err := m.Start(service) require.Error(t, err) } diff --git a/service/manager/dockermanager/start_test.go b/service/manager/dockermanager/start_test.go index c41f57625..31ff2d731 100644 --- a/service/manager/dockermanager/start_test.go +++ b/service/manager/dockermanager/start_test.go @@ -109,7 +109,7 @@ func TestStartService(t *testing.T) { mc.On("SharedNetworkID").Once().Return(sharedNetworkID, nil) mockStartService(s, d, mc, networkID, sharedNetworkID, containerServiceID, nil) - serviceIDs, err := m.Start(s) + serviceIDs, _, err := m.Start(s) require.NoError(t, err) require.Len(t, serviceIDs, 1) require.Equal(t, containerServiceID, serviceIDs[0]) @@ -160,7 +160,7 @@ func TestStartWith2Dependencies(t *testing.T) { mockStartService(s, d, mc, networkID, sharedNetworkID, containerServiceIDs[i], nil) } - serviceIDs, err := m.Start(s) + serviceIDs, _, err := m.Start(s) require.NoError(t, err) require.Len(t, serviceIDs, len(s.Dependencies)) @@ -190,7 +190,7 @@ func TestStartServiceRunning(t *testing.T) { d, _ := s.GetDependency(dependencyKey) mc.On("Status", dependencyNamespace(serviceNamespace(s.Hash), d.Key)).Once().Return(container.RUNNING, nil) - dockerServices, err := m.Start(s) + dockerServices, _, err := m.Start(s) require.NoError(t, err) require.Len(t, dockerServices, 0) @@ -240,7 +240,7 @@ func TestPartiallyRunningService(t *testing.T) { mockStartService(s, d, mc, networkID, sharedNetworkID, containerServiceIDs[i], nil) } - serviceIDs, err := m.Start(s) + serviceIDs, _, err := m.Start(s) require.NoError(t, err) require.Len(t, serviceIDs, len(s.Dependencies)) @@ -279,7 +279,7 @@ func TestServiceStartError(t *testing.T) { mockStartService(s, d, mc, networkID, sharedNetworkID, "", startErr) mc.On("Status", dependencyNamespace(serviceNamespace(s.Hash), d.Key)).Once().Return(container.STOPPED, nil) - serviceIDs, err := m.Start(s) + serviceIDs, _, err := m.Start(s) require.Equal(t, startErr, err) require.Len(t, serviceIDs, 0) diff --git a/service/manager/dockermanager/status_integration_test.go b/service/manager/dockermanager/status_integration_test.go index 7d6b2f4e7..92c2fdb38 100644 --- a/service/manager/dockermanager/status_integration_test.go +++ b/service/manager/dockermanager/status_integration_test.go @@ -28,7 +28,7 @@ func TestIntegrationStatusService(t *testing.T) { status, err := m.Status(s) require.NoError(t, err) require.Equal(t, service.STOPPED, status) - dockerServices, err := m.Start(s) + dockerServices, _, err := m.Start(s) defer m.Stop(s) require.NoError(t, err) require.Equal(t, len(dockerServices), len(s.Dependencies)) diff --git a/service/manager/dockermanager/volume_integration_test.go b/service/manager/dockermanager/volume_integration_test.go index 59ec092a4..1ccbc7c28 100644 --- a/service/manager/dockermanager/volume_integration_test.go +++ b/service/manager/dockermanager/volume_integration_test.go @@ -38,7 +38,7 @@ func TestIntegrationDeleteVolumes(t *testing.T) { m = New(c) ) - _, err := m.Start(s) + _, _, err := m.Start(s) require.NoError(t, err) err = m.Stop(s) require.NoError(t, err) diff --git a/service/manager/manager.go b/service/manager/manager.go index 5a24a6c57..8398f0a6b 100644 --- a/service/manager/manager.go +++ b/service/manager/manager.go @@ -6,9 +6,11 @@ import ( // Manager is responsible for managing Docker Containers of MESG services. // it can be implemented for any container orchestration tool. +// TODO(ilgooz): discuss if these should accept service.Instance instead of service.Service. type Manager interface { - // Start starts service and returns service ids related to service. - Start(s *service.Service) (serviceIDs []string, err error) + // Start starts service and returns related info provided by the underlying container + // orchestration tool. + Start(s *service.Service) (serviceIDs []string, networkID string, err error) // Stop stops service. Stop(s *service.Service) error diff --git a/service/service.go b/service/service.go index d368f5b29..fce494bfe 100644 --- a/service/service.go +++ b/service/service.go @@ -60,6 +60,16 @@ type Service struct { DeployedAt time.Time `hash:"-"` } +// Instance represents a running instance of service. +type Instance struct { + Sid string + Hash string + ServiceHash string + ImageHash string + COType string + COInfo interface{} +} + // StatusType of the service. type StatusType uint