Skip to content

Commit

Permalink
Merge pull request #1456 from mesg-foundation/feature/runners
Browse files Browse the repository at this point in the history
Update Instance SDK and create Runner SDK
  • Loading branch information
antho1404 authored Nov 6, 2019
2 parents 32487c5 + abd59e9 commit 1d33ebe
Show file tree
Hide file tree
Showing 37 changed files with 2,009 additions and 714 deletions.
51 changes: 24 additions & 27 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/logger"
"github.com/mesg-foundation/engine/orchestrator"
"github.com/mesg-foundation/engine/protobuf/api"
enginesdk "github.com/mesg-foundation/engine/sdk"
instancesdk "github.com/mesg-foundation/engine/sdk/instance"
runnersdk "github.com/mesg-foundation/engine/sdk/runner"
"github.com/mesg-foundation/engine/server/grpc"
"github.com/mesg-foundation/engine/version"
"github.com/mesg-foundation/engine/x/xerrors"
Expand All @@ -26,42 +27,38 @@ import (
db "github.com/tendermint/tm-db"
)

func initDatabases(cfg *config.Config) (*database.LevelDBInstanceDB, *database.LevelDBExecutionDB, *database.LevelDBProcessDB, error) {
// init instance db.
instanceDB, err := database.NewInstanceDB(filepath.Join(cfg.Path, cfg.Database.InstanceRelativePath))
if err != nil {
return nil, nil, nil, err
}

func initDatabases(cfg *config.Config) (*database.LevelDBExecutionDB, *database.LevelDBProcessDB, error) {
// init execution db.
executionDB, err := database.NewExecutionDB(filepath.Join(cfg.Path, cfg.Database.ExecutionRelativePath))
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
// init process db.
processDB, err := database.NewProcessDB(filepath.Join(cfg.Path, cfg.Database.ProcessRelativePath))
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

return instanceDB, executionDB, processDB, nil
return executionDB, processDB, nil
}

func stopRunningServices(sdk *enginesdk.SDK) error {
instances, err := sdk.Instance.List(&instancesdk.Filter{})
func stopRunningServices(sdk *enginesdk.SDK, cfg *config.Config, address string) error {
runners, err := sdk.Runner.List(&runnersdk.Filter{Address: address})
if err != nil {
return err
}
var (
instancesLen = len(instances)
errC = make(chan error, instancesLen)
wg sync.WaitGroup
runnersLen = len(runners)
errC = make(chan error, runnersLen)
wg sync.WaitGroup
)
wg.Add(instancesLen)
for _, instance := range instances {
wg.Add(runnersLen)
for _, instance := range runners {
go func(hash hash.Hash) {
defer wg.Done()
err := sdk.Instance.Delete(hash, false)
err := sdk.Runner.Delete(&api.DeleteRunnerRequest{
Hash: hash,
DeleteData: false,
}, cfg.Account.Name, cfg.Account.Password)
if err != nil {
errC <- err
}
Expand All @@ -82,7 +79,7 @@ func loadOrGenConfigAccount(kb *cosmos.Keybase, cfg *config.Config) (keys.Info,
return nil, err
}
if exist {
return nil, nil
return kb.Get(cfg.Account.Name)
}
logrus.WithField("module", "main").Warn("Config account not found. Generating one for development...")
mnemonic, err := kb.NewMnemonic()
Expand Down Expand Up @@ -130,13 +127,13 @@ func main() {
logger.Init(cfg.Log.Format, cfg.Log.Level, cfg.Log.ForceColors)

// init databases
instanceDB, executionDB, processDB, err := initDatabases(cfg)
executionDB, processDB, err := initDatabases(cfg)
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
}

// init container.
c, err := container.New(cfg.Name)
container, err := container.New(cfg.Name)
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
}
Expand Down Expand Up @@ -168,7 +165,7 @@ func main() {
}

// gen config account
_, err = loadOrGenConfigAccount(kb, cfg)
acc, err := loadOrGenConfigAccount(kb, cfg)
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
}
Expand All @@ -189,7 +186,7 @@ func main() {
client := cosmos.NewClient(node, app.Cdc(), kb, genesis.ChainID)

// init sdk
sdk := enginesdk.New(client, app.Cdc(), kb, c, instanceDB, executionDB, processDB, cfg.Name, strconv.Itoa(port), cfg.IpfsEndpoint)
sdk := enginesdk.New(client, app.Cdc(), kb, executionDB, processDB, container, cfg.Name, strconv.Itoa(port), cfg.IpfsEndpoint)

// start tendermint node
logrus.WithField("module", "main").WithField("seeds", cfg.Tendermint.Config.P2P.Seeds).Info("starting tendermint node")
Expand Down Expand Up @@ -222,11 +219,11 @@ func main() {
}()

<-xsignal.WaitForInterrupt()
if err := stopRunningServices(sdk); err != nil {
if err := stopRunningServices(sdk, cfg, acc.GetAddress().String()); err != nil {
logrus.WithField("module", "main").Fatalln(err)
}

if err := c.Cleanup(); err != nil {
if err := container.Cleanup(); err != nil {
logrus.WithField("module", "main").Fatalln(err)
}
server.Close()
Expand Down
126 changes: 50 additions & 76 deletions database/instance_db.go
Original file line number Diff line number Diff line change
@@ -1,119 +1,93 @@
package database

import (
"encoding/json"
"errors"
"fmt"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/instance"
"github.com/syndtr/goleveldb/leveldb"
)

var (
errSaveInstanceWithoutHash = errors.New("database: can't save instance without hash")
errCannotSaveInstanceWithoutHash = errors.New("database: can't save instance without hash")
)

// InstanceDB describes the API of Instance database.
type InstanceDB interface {
// Exist check if instance with given hash exist.
Exist(hash hash.Hash) (bool, error)

// Get retrives instance by instance hash.
Get(hash hash.Hash) (*instance.Instance, error)

// GetAll retrieves all instances.
GetAll() ([]*instance.Instance, error)

// Save saves instance to database.
Save(i *instance.Instance) error

// Delete an instance by instance hash.
Delete(hash hash.Hash) error

// Close closes underlying database connection.
Close() error
}

// LevelDBInstanceDB is a database for storing services' instances.
type LevelDBInstanceDB struct {
db *leveldb.DB
// InstanceDB is a database for storing instance definition.
type InstanceDB struct {
s store.Store
cdc *codec.Codec
}

// NewInstanceDB returns the database which is located under given path.
func NewInstanceDB(path string) (*LevelDBInstanceDB, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
func NewInstanceDB(s store.Store, cdc *codec.Codec) *InstanceDB {
return &InstanceDB{
s: s,
cdc: cdc,
}
return &LevelDBInstanceDB{db: db}, nil
}

// marshal returns the byte slice from service.
func (d *LevelDBInstanceDB) marshal(i *instance.Instance) ([]byte, error) {
return json.Marshal(i)
}

// unmarshal returns the service from byte slice.
func (d *LevelDBInstanceDB) unmarshal(hash hash.Hash, value []byte) (*instance.Instance, error) {
// unmarshal returns the instance from byte slice.
func (d *InstanceDB) unmarshalInstance(hash hash.Hash, value []byte) (*instance.Instance, error) {
var s instance.Instance
if err := json.Unmarshal(value, &s); err != nil {
return nil, fmt.Errorf("database: could not decode instance %q: %s", hash, err)
if err := d.cdc.UnmarshalBinaryBare(value, &s); err != nil {
return nil, fmt.Errorf("database: could not decode instance %q: %w", hash.String(), err)
}
return &s, nil
}

// Exist check if instance with given hash exist.
func (d *LevelDBInstanceDB) Exist(hash hash.Hash) (bool, error) {
return d.db.Has(hash, nil)
}

// Get retrives instance by instance hash.
func (d *LevelDBInstanceDB) Get(hash hash.Hash) (*instance.Instance, error) {
b, err := d.db.Get(hash, nil)
if err != nil {
return nil, err
}
return d.unmarshal(hash, b)
}

// GetAll retrieves all instances.
func (d *LevelDBInstanceDB) GetAll() ([]*instance.Instance, error) {
instances := []*instance.Instance{}
iter := d.db.NewIterator(nil, nil)
// All returns every instance in database.
func (d *InstanceDB) All() ([]*instance.Instance, error) {
var (
instances []*instance.Instance
iter = d.s.NewIterator()
)
for iter.Next() {
i, err := d.unmarshal(iter.Key(), iter.Value())
hash := hash.Hash(iter.Key())
s, err := d.unmarshalInstance(hash, iter.Value())
if err != nil {
iter.Release()
return nil, err
}
instances = append(instances, i)
instances = append(instances, s)
}
iter.Release()
return instances, iter.Error()
}

// Save saves instance to database.
func (d *LevelDBInstanceDB) Save(i *instance.Instance) error {
if i.Hash.IsZero() {
return errSaveInstanceWithoutHash
// Save stores instance in database.
// If there is an another instance that uses the same sid, it'll be deleted.
func (d *InstanceDB) Save(r *instance.Instance) error {
if r.Hash.IsZero() {
return errCannotSaveInstanceWithoutHash
}

// encode service
b, err := d.marshal(i)
b, err := d.cdc.MarshalBinaryBare(r)
if err != nil {
return err
}

return d.db.Put(i.Hash, b, nil)
return d.s.Put(r.Hash, b)
}

// Close closes database.
func (d *LevelDBInstanceDB) Close() error {
return d.db.Close()
func (d *InstanceDB) Close() error {
return d.s.Close()
}

// Exist check if instance with given hash exist.
func (d *InstanceDB) Exist(hash hash.Hash) (bool, error) {
return d.s.Has(hash)
}

// Get retrives instance from database.
func (d *InstanceDB) Get(hash hash.Hash) (*instance.Instance, error) {
b, err := d.s.Get(hash)
if err != nil {
return nil, err
}
return d.unmarshalInstance(hash, b)
}

// Delete deletes an instance from database.
func (d *LevelDBInstanceDB) Delete(hash hash.Hash) error {
return d.db.Delete(hash, nil)
// Delete deletes instance from database.
func (d *InstanceDB) Delete(hash hash.Hash) error {
return d.s.Delete(hash)
}
Loading

0 comments on commit 1d33ebe

Please sign in to comment.