Skip to content

Commit

Permalink
Merge pull request #1106 from mesg-foundation/fix/execution-servicehash
Browse files Browse the repository at this point in the history
Replace execution serviceHash by instanceHash
  • Loading branch information
antho1404 authored Jun 26, 2019
2 parents 43a4a42 + 3716708 commit e794aa8
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 171 deletions.
18 changes: 0 additions & 18 deletions database/instance_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ type InstanceDB interface {
// GetAll retrieves all instances.
GetAll() ([]*instance.Instance, error)

// GetAllByService retrieves all instances of service by service's hash.
GetAllByService(serviceHash hash.Hash) ([]*instance.Instance, error)

// Save saves instance to database.
Save(i *instance.Instance) error

Expand Down Expand Up @@ -85,21 +82,6 @@ func (d *LevelDBInstanceDB) GetAll() ([]*instance.Instance, error) {
return instances, iter.Error()
}

// GetAllByService retrieves all instances of service by service's hash.
func (d *LevelDBInstanceDB) GetAllByService(serviceHash hash.Hash) ([]*instance.Instance, error) {
instances, err := d.GetAll()
if err != nil {
return nil, err
}
someInstances := []*instance.Instance{}
for _, instance := range instances {
if instance.ServiceHash.Equal(serviceHash) {
someInstances = append(someInstances, instance)
}
}
return someInstances, nil
}

// Save saves instance to database.
func (d *LevelDBInstanceDB) Save(i *instance.Instance) error {
if i.Hash.IsZero() {
Expand Down
36 changes: 18 additions & 18 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ func (s Status) String() (r string) {

// Execution stores all information about executions.
type Execution struct {
Hash hash.Hash `hash:"-"`
ParentHash hash.Hash `hash:"name:parentHash"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
ServiceHash hash.Hash `hash:"name:serviceHash"`
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
Hash hash.Hash `hash:"-"`
ParentHash hash.Hash `hash:"name:parentHash"`
EventID string `hash:"name:eventID"`
Status Status `hash:"-"`
InstanceHash hash.Hash `hash:"name:instanceHash"`
TaskKey string `hash:"name:taskKey"`
Tags []string `hash:"name:tags"`
Inputs map[string]interface{} `hash:"name:inputs"`
Outputs map[string]interface{} `hash:"-"`
Error string `hash:"-"`
}

// New returns a new execution. It returns an error if inputs are invalid.
func New(serviceHash, parentHash hash.Hash, eventID, taskKey string, inputs map[string]interface{}, tags []string) *Execution {
func New(instanceHash, parentHash hash.Hash, eventID, taskKey string, inputs map[string]interface{}, tags []string) *Execution {
exec := &Execution{
EventID: eventID,
ServiceHash: serviceHash,
ParentHash: parentHash,
Inputs: inputs,
TaskKey: taskKey,
Tags: tags,
Status: Created,
EventID: eventID,
InstanceHash: instanceHash,
ParentHash: parentHash,
Inputs: inputs,
TaskKey: taskKey,
Tags: tags,
Status: Created,
}
exec.Hash = hash.Dump(exec)
return exec
Expand Down
6 changes: 3 additions & 3 deletions execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNewFromService(t *testing.T) {

execution := New(hash, parentHash, eventID, taskKey, nil, tags)
require.NotNil(t, execution)
require.Equal(t, hash, execution.ServiceHash)
require.Equal(t, hash, execution.InstanceHash)
require.Equal(t, parentHash, execution.ParentHash)
require.Equal(t, eventID, execution.EventID)
require.Equal(t, taskKey, execution.TaskKey)
Expand Down Expand Up @@ -67,8 +67,8 @@ func TestStatus(t *testing.T) {
func TestExecutionHash(t *testing.T) {
ids := make(map[string]bool)

f := func(serviceHash, parentHash []byte, eventID, taskKey, input string, tags []string) bool {
e := New(serviceHash, parentHash, eventID, taskKey, map[string]interface{}{"input": input}, tags)
f := func(instanceHash, parentHash []byte, eventID, taskKey, input string, tags []string) bool {
e := New(instanceHash, parentHash, eventID, taskKey, map[string]interface{}{"input": input}, tags)
if ids[string(e.Hash)] {
return false
}
Expand Down
50 changes: 25 additions & 25 deletions protobuf/api/execution.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions protobuf/api/execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ message StreamExecutionRequest{
// Status to filter executions.
definition.Status status = 1;

// Service's hash to filter executions.
string serviceHash = 2;
// Instance's hash to filter executions.
string instanceHash = 2;
}

// Filter used to filter a stream of executions.
Expand Down
51 changes: 25 additions & 26 deletions protobuf/definition/execution.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions protobuf/definition/execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ message Execution {
// Status is the current status of execution.
Status status = 4;

// serviceHash is hash of service that can proceed an execution
string serviceHash = 5;
// instanceHash is hash of the instance that can proceed an execution
string instanceHash = 5;

// taskKey is the key of the task of this execution.
string taskKey = 6;
Expand Down
49 changes: 21 additions & 28 deletions sdk/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (e *Execution) Update(executionHash hash.Hash, outputs []byte, reterr error
}

go e.ps.Pub(exec, streamTopic)
go e.ps.Pub(exec, subTopic(exec.ServiceHash))
go e.ps.Pub(exec, subTopic(exec.InstanceHash))
return nil
}

Expand All @@ -80,7 +80,7 @@ func (e *Execution) processExecution(executionHash hash.Hash, outputData []byte,
return nil, err
}
} else {
o, err := e.validateExecutionOutput(exec.ServiceHash, exec.TaskKey, outputData)
o, err := e.validateExecutionOutput(exec.InstanceHash, exec.TaskKey, outputData)
if err != nil {
tx.Discard()
return nil, err
Expand All @@ -105,13 +105,13 @@ func (e *Execution) processExecution(executionHash hash.Hash, outputData []byte,
return exec, nil
}

func (e *Execution) validateExecutionOutput(serviceHash hash.Hash, taskKey string, jsonout []byte) (map[string]interface{}, error) {
func (e *Execution) validateExecutionOutput(instanceHash hash.Hash, taskKey string, jsonout []byte) (map[string]interface{}, error) {
var output map[string]interface{}
if err := json.Unmarshal(jsonout, &output); err != nil {
return nil, fmt.Errorf("invalid output: %s", err)
}

s, err := e.service.Get(serviceHash)
s, err := e.service.Get(instanceHash)
if err != nil {
return nil, err
}
Expand All @@ -123,19 +123,17 @@ func (e *Execution) validateExecutionOutput(serviceHash hash.Hash, taskKey strin
}

// Execute executes a task tasKey with inputData and tags for service serviceID.
func (e *Execution) Execute(serviceHash hash.Hash, taskKey string, inputData map[string]interface{}, tags []string) (executionHash hash.Hash, err error) {
s, err := e.service.Get(serviceHash)
func (e *Execution) Execute(instanceHash hash.Hash, taskKey string, inputData map[string]interface{}, tags []string) (executionHash hash.Hash, err error) {
// a task should be executed only if task's service is running.
instance, err := e.instance.Get(instanceHash)
if err != nil {
return nil, err
}
// a task should be executed only if task's service is running.
instances, err := e.instance.List(&instancesdk.Filter{ServiceHash: s.Hash})

s, err := e.service.Get(instance.ServiceHash)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, &NotRunningServiceError{ServiceID: s.Hash.String()}
}

if err := s.RequireTaskInputs(taskKey, inputData); err != nil {
return nil, err
Expand All @@ -156,35 +154,30 @@ func (e *Execution) Execute(serviceHash hash.Hash, taskKey string, inputData map
return exec.Hash, nil
}

// Listen listens executions on service.
func (e *Execution) Listen(serviceHash hash.Hash, f *Filter) (*Listener, error) {
s, err := e.service.Get(serviceHash)
// Listen listens executions on instance.
func (e *Execution) Listen(instanceHash hash.Hash, f *Filter) (*Listener, error) {
inst, err := e.instance.Get(instanceHash)
if err != nil {
return nil, err
}

service, err := e.service.Get(inst.ServiceHash)
if err != nil {
return nil, err
}

if f != nil && f.HasTaskKey() {
if _, err := s.GetTask(f.TaskKey); err != nil {
if _, err := service.GetTask(f.TaskKey); err != nil {
return nil, err
}
}

l := NewListener(e.ps, subTopic(s.Hash), f)
l := NewListener(e.ps, subTopic(inst.Hash), f)
go l.Listen()
return l, nil
}

// subTopic returns the topic to listen for tasks from this service.
func subTopic(serviceHash hash.Hash) string {
return serviceHash.String() + "." + topic
}

// NotRunningServiceError is an error returned when the service is not running that
// a task needed to be executed on.
type NotRunningServiceError struct {
ServiceID string
}

func (e *NotRunningServiceError) Error() string {
return fmt.Sprintf("Service %q is not running", e.ServiceID)
func subTopic(instanceHash hash.Hash) string {
return instanceHash.String() + "." + topic
}
10 changes: 5 additions & 5 deletions sdk/execution/execution_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

// Filter store fileds for matching executions.
type Filter struct {
Statuses []execution.Status
ServiceHash hash.Hash
TaskKey string
Tags []string
Statuses []execution.Status
InstanceHash hash.Hash
TaskKey string
Tags []string
}

// Match matches execution.
Expand All @@ -21,7 +21,7 @@ func (f *Filter) Match(e *execution.Execution) bool {
return true
}

if !f.ServiceHash.IsZero() && !f.ServiceHash.Equal(e.ServiceHash) {
if !f.InstanceHash.IsZero() && !f.InstanceHash.Equal(e.InstanceHash) {
return false
}

Expand Down
Loading

0 comments on commit e794aa8

Please sign in to comment.