Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename workflow to process #1278

Merged
merged 5 commits into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
serviceDBVersion = "v3"
executionDBVersion = "v2"
instanceDBVersion = "v1"
workflowDBVersion = "v1"
processDBVersion = "v1"
)

var (
Expand All @@ -49,7 +49,7 @@ type Config struct {
ServiceRelativePath string
InstanceRelativePath string
ExecutionRelativePath string
WorkflowRelativePath string
ProcessRelativePath string
}

Tendermint struct {
Expand Down Expand Up @@ -95,7 +95,7 @@ func New() (*Config, error) {
c.Database.ServiceRelativePath = filepath.Join("database", "services", serviceDBVersion)
c.Database.InstanceRelativePath = filepath.Join("database", "instance", instanceDBVersion)
c.Database.ExecutionRelativePath = filepath.Join("database", "executions", executionDBVersion)
c.Database.WorkflowRelativePath = filepath.Join("database", "workflows", workflowDBVersion)
c.Database.ProcessRelativePath = filepath.Join("database", "processes", processDBVersion)

c.Tendermint.Config = tmconfig.DefaultConfig()
c.Tendermint.Config.P2P.AddrBookStrict = false
Expand Down
20 changes: 10 additions & 10 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/mesg-foundation/engine/database/store"
"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/logger"
"github.com/mesg-foundation/engine/scheduler"
"github.com/mesg-foundation/engine/orchestrator"
enginesdk "github.com/mesg-foundation/engine/sdk"
instancesdk "github.com/mesg-foundation/engine/sdk/instance"
servicesdk "github.com/mesg-foundation/engine/sdk/service"
Expand All @@ -29,7 +29,7 @@ import (

var network = flag.Bool("experimental-network", false, "start the engine with the network")

func initDatabases(cfg *config.Config) (*database.ServiceDB, *database.LevelDBInstanceDB, *database.LevelDBExecutionDB, *database.LevelDBWorkflowDB, error) {
func initDatabases(cfg *config.Config) (*database.ServiceDB, *database.LevelDBInstanceDB, *database.LevelDBExecutionDB, *database.LevelDBProcessDB, error) {
// init services db.
serviceStore, err := store.NewLevelDBStore(filepath.Join(cfg.Path, cfg.Database.ServiceRelativePath))
if err != nil {
Expand All @@ -49,13 +49,13 @@ func initDatabases(cfg *config.Config) (*database.ServiceDB, *database.LevelDBIn
return nil, nil, nil, nil, err
}

// init workflow db.
workflowDB, err := database.NewWorkflowDB(filepath.Join(cfg.Path, cfg.Database.WorkflowRelativePath))
// init process db.
processDB, err := database.NewProcessDB(filepath.Join(cfg.Path, cfg.Database.ProcessRelativePath))
if err != nil {
return nil, nil, nil, nil, err
}

return serviceDB, instanceDB, executionDB, workflowDB, nil
return serviceDB, instanceDB, executionDB, processDB, nil
}

func deployCoreServices(cfg *config.Config, sdk *enginesdk.SDK) error {
Expand Down Expand Up @@ -132,7 +132,7 @@ func main() {
logger.Init(cfg.Log.Format, cfg.Log.Level, cfg.Log.ForceColors)

// init databases
serviceDB, instanceDB, executionDB, workflowDB, err := initDatabases(cfg)
serviceDB, instanceDB, executionDB, processDB, err := initDatabases(cfg)
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
}
Expand All @@ -155,7 +155,7 @@ func main() {
app := cosmos.NewApp(logger.TendermintLogger(), db)

// init sdk.
sdk, err = enginesdk.New(app, c, instanceDB, executionDB, workflowDB, cfg.Name, strconv.Itoa(port))
sdk, err = enginesdk.New(app, c, instanceDB, executionDB, processDB, cfg.Name, strconv.Itoa(port))
if err != nil {
logrus.WithField("module", "main").Fatalln(err)
}
Expand All @@ -172,7 +172,7 @@ func main() {
logrus.WithField("module", "main").Fatalln(err)
}
} else {
sdk = enginesdk.NewDeprecated(c, serviceDB, instanceDB, executionDB, workflowDB, cfg.Name, strconv.Itoa(port))
sdk = enginesdk.NewDeprecated(c, serviceDB, instanceDB, executionDB, processDB, cfg.Name, strconv.Itoa(port))

// init system services.
if err := deployCoreServices(cfg, sdk); err != nil {
Expand All @@ -191,8 +191,8 @@ func main() {
}
}()

logrus.WithField("module", "main").Info("starting workflow engine")
s := scheduler.New(sdk.Event, sdk.Execution, sdk.Workflow)
logrus.WithField("module", "main").Info("starting process engine")
s := orchestrator.New(sdk.Event, sdk.Execution, sdk.Process)
go func() {
if err := s.Start(); err != nil {
logrus.WithField("module", "main").Fatalln(err)
Expand Down
124 changes: 124 additions & 0 deletions database/process_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package database

import (
"encoding/json"
"fmt"

"github.com/mesg-foundation/engine/hash"
"github.com/mesg-foundation/engine/process"
"github.com/sirupsen/logrus"
"github.com/syndtr/goleveldb/leveldb"
)

// ProcessDB describes the API of database package.
type ProcessDB interface {
// Save saves a process to database.
Save(s *process.Process) error

// Get gets a process from database by its unique hash.
Get(hash hash.Hash) (*process.Process, error)

// Delete deletes a process from database by its unique hash.
Delete(hash hash.Hash) error

// All returns all processes from database.
All() ([]*process.Process, error)

// Close closes underlying database connection.
Close() error
}

// LevelDBProcessDB is a database for storing processes definition.
type LevelDBProcessDB struct {
db *leveldb.DB
}

// NewProcessDB returns the database which is located under given path.
func NewProcessDB(path string) (*LevelDBProcessDB, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}
return &LevelDBProcessDB{db: db}, nil
}

// marshal returns the byte slice from process.
func (d *LevelDBProcessDB) marshal(s *process.Process) ([]byte, error) {
return json.Marshal(s)
}

// unmarshal returns the process from byte slice.
func (d *LevelDBProcessDB) unmarshal(hash hash.Hash, value []byte) (*process.Process, error) {
var s process.Process
if err := json.Unmarshal(value, &s); err != nil {
return nil, fmt.Errorf("database: could not decode process %q: %s", hash, err)
}
return &s, nil
}

// All returns every process in database.
func (d *LevelDBProcessDB) All() ([]*process.Process, error) {
var (
processes []*process.Process
iter = d.db.NewIterator(nil, nil)
)
for iter.Next() {
hash := hash.Hash(iter.Key())
s, err := d.unmarshal(hash, iter.Value())
if err != nil {
// NOTE: Ignore all decode errors (possibly due to a process
// structure change or database corruption)
logrus.WithField("process", hash.String()).Warning(err.Error())
continue
}
processes = append(processes, s)
}
iter.Release()
return processes, iter.Error()
}

// Delete deletes process from database.
func (d *LevelDBProcessDB) Delete(hash hash.Hash) error {
tx, err := d.db.OpenTransaction()
if err != nil {
return err
}
if _, err := tx.Get(hash, nil); err != nil {
tx.Discard()
return err
}
if err := tx.Delete(hash, nil); err != nil {
tx.Discard()
return err
}
return tx.Commit()

}

// Get retrives process from database.
func (d *LevelDBProcessDB) Get(hash hash.Hash) (*process.Process, error) {
b, err := d.db.Get(hash, nil)
if err != nil {
return nil, err
}
return d.unmarshal(hash, b)
}

// Save stores process in database.
// If there is an another process that uses the same sid, it'll be deleted.
func (d *LevelDBProcessDB) Save(s *process.Process) error {
if s.Hash.IsZero() {
return errCannotSaveWithoutHash
}

b, err := d.marshal(s)
if err != nil {
return err
}
return d.db.Put(s.Hash, b, nil)
}

// Close closes database.
func (d *LevelDBProcessDB) Close() error {
return d.db.Close()
}
124 changes: 0 additions & 124 deletions database/workflow_db.go

This file was deleted.

6 changes: 3 additions & 3 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s Status) String() (r string) {
// Execution stores all information about executions.
type Execution struct {
Hash hash.Hash `hash:"-"`
WorkflowHash hash.Hash `hash:"name:workflowHash"`
ProcessHash hash.Hash `hash:"name:processHash"`
ParentHash hash.Hash `hash:"name:parentHash"`
EventHash hash.Hash `hash:"name:eventHash"`
Status Status `hash:"-"`
Expand All @@ -49,9 +49,9 @@ type Execution struct {
}

// New returns a new execution. It returns an error if inputs are invalid.
func New(workflowHash, instanceHash, parentHash, eventHash hash.Hash, stepID string, taskKey string, inputs map[string]interface{}, tags []string) *Execution {
func New(processHash, instanceHash, parentHash, eventHash hash.Hash, stepID string, taskKey string, inputs map[string]interface{}, tags []string) *Execution {
exec := &Execution{
WorkflowHash: workflowHash,
ProcessHash: processHash,
EventHash: eventHash,
InstanceHash: instanceHash,
ParentHash: parentHash,
Expand Down
4 changes: 2 additions & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package filter

// Predicate is the type of conditions that can be applied in a filter of a workflow trigger
// Predicate is the type of conditions that can be applied in a filter of a process trigger
type Predicate uint

// List of possible conditions for workflow's filter
// List of possible conditions for process's filter
const (
EQ Predicate = iota + 1
)
Expand Down
Loading