Skip to content

Commit

Permalink
Merge pull request #1109 from mesg-foundation/bug/instance-event-task
Browse files Browse the repository at this point in the history
Fix service connection with instance
  • Loading branch information
krhubert authored Jun 25, 2019
2 parents f7c63a4 + c372dec commit 5a3fa10
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
10 changes: 10 additions & 0 deletions server/grpc/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mesg-foundation/core/protobuf/coreapi"
"github.com/mesg-foundation/core/sdk"
eventsdk "github.com/mesg-foundation/core/sdk/event"
instancesdk "github.com/mesg-foundation/core/sdk/instance"
"github.com/mesg-foundation/core/version"
)

Expand Down Expand Up @@ -70,6 +71,15 @@ func (s *Server) ExecuteTask(ctx context.Context, request *coreapi.ExecuteTaskRe
if err != nil {
return nil, err
}

instances, err := s.sdk.Instance.List(&instancesdk.Filter{ServiceHash: hash})
if err != nil {
return nil, err
}
if len(instances) != 1 {
return nil, fmt.Errorf("you should have exactly one instance running to execute this service")
}

var inputs map[string]interface{}
if err := json.Unmarshal([]byte(request.InputData), &inputs); err != nil {
return nil, fmt.Errorf("cannot parse execution's inputs (JSON format): %s", err)
Expand Down
18 changes: 14 additions & 4 deletions server/grpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ func NewServer(sdk *sdk.SDK) *Server {

// EmitEvent permits to send and event to anyone who subscribed to it.
func (s *Server) EmitEvent(context context.Context, request *serviceapi.EmitEventRequest) (*serviceapi.EmitEventReply, error) {
serviceHash, err := hash.Decode(request.Token)
instanceHash, err := hash.Decode(request.Token)
if err != nil {
return nil, err
}

instance, err := s.sdk.Instance.Get(instanceHash)
if err != nil {
return nil, err
}
Expand All @@ -39,17 +44,22 @@ func (s *Server) EmitEvent(context context.Context, request *serviceapi.EmitEven
if err := json.Unmarshal([]byte(request.EventData), &data); err != nil {
return nil, err
}
return &serviceapi.EmitEventReply{}, s.sdk.Event.Emit(serviceHash, request.EventKey, data)
return &serviceapi.EmitEventReply{}, s.sdk.Event.Emit(instance.ServiceHash, request.EventKey, data)
}

// ListenTask creates a stream that will send data for every task to execute.
func (s *Server) ListenTask(request *serviceapi.ListenTaskRequest, stream serviceapi.Service_ListenTaskServer) error {
serviceHash, err := hash.Decode(request.Token)
instanceHash, err := hash.Decode(request.Token)
if err != nil {
return err
}

instance, err := s.sdk.Instance.Get(instanceHash)
if err != nil {
return err
}

ln, err := s.sdk.Execution.Listen(serviceHash, inProgressFilter)
ln, err := s.sdk.Execution.Listen(instance.ServiceHash, inProgressFilter)
if err != nil {
return err
}
Expand Down

0 comments on commit 5a3fa10

Please sign in to comment.