Skip to content

Commit

Permalink
Merge pull request #1122 from mesg-foundation/fix/event-use-instance-…
Browse files Browse the repository at this point in the history
…hash

Use instance hash in event
  • Loading branch information
Nicolas Mahé authored Jun 26, 2019
2 parents 40a1278 + 12a57af commit e00cc7a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 100 deletions.
25 changes: 9 additions & 16 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
package event

import (
"time"

"github.com/mesg-foundation/core/service"
"github.com/mesg-foundation/core/hash"
)

// Event stores all informations about Events.
type Event struct {
Service *service.Service
Key string
Data map[string]interface{}
CreatedAt time.Time
InstanceHash hash.Hash
Key string
Data map[string]interface{}
}

// Create creates an event eventKey with eventData for service s.
func Create(s *service.Service, eventKey string, eventData map[string]interface{}) (*Event, error) {
if err := s.RequireEventData(eventKey, eventData); err != nil {
return nil, err
}
func Create(instanceHash hash.Hash, eventKey string, eventData map[string]interface{}) *Event {
return &Event{
Service: s,
Key: eventKey,
Data: eventData,
CreatedAt: time.Now(),
}, nil
InstanceHash: instanceHash,
Key: eventKey,
Data: eventData,
}
}
72 changes: 0 additions & 72 deletions event/event_test.go

This file was deleted.

30 changes: 20 additions & 10 deletions sdk/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/cskr/pubsub"
"github.com/mesg-foundation/core/event"
"github.com/mesg-foundation/core/hash"
instancesdk "github.com/mesg-foundation/core/sdk/instance"
servicesdk "github.com/mesg-foundation/core/sdk/service"
)

Expand All @@ -15,31 +16,40 @@ const (

// Event exposes event APIs of MESG.
type Event struct {
ps *pubsub.PubSub
service *servicesdk.Service
ps *pubsub.PubSub
instance *instancesdk.Instance
service *servicesdk.Service
}

// New creates a new Event SDK with given options.
func New(ps *pubsub.PubSub, service *servicesdk.Service) *Event {
func New(ps *pubsub.PubSub, service *servicesdk.Service, instance *instancesdk.Instance) *Event {
return &Event{
ps: ps,
service: service,
ps: ps,
service: service,
instance: instance,
}
}

// Emit emits a MESG event eventKey with eventData for service token.
func (e *Event) Emit(serviceHash hash.Hash, eventKey string, eventData map[string]interface{}) error {
s, err := e.service.Get(serviceHash)
func (e *Event) Emit(instanceHash hash.Hash, eventKey string, eventData map[string]interface{}) error {
instance, err := e.instance.Get(instanceHash)
if err != nil {
return err
}
ev, err := event.Create(s, eventKey, eventData)

service, err := e.service.Get(instance.ServiceHash)
if err != nil {
return err
}

go e.ps.Pub(ev, streamTopic)
go e.ps.Pub(ev, subTopic(s.Hash))
if err := service.RequireEventData(eventKey, eventData); err != nil {
return err
}

event := event.Create(instanceHash, eventKey, eventData)

go e.ps.Pub(event, streamTopic)
go e.ps.Pub(event, subTopic(instanceHash))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func New(c container.Container, serviceDB database.ServiceDB, instanceDB databas
serviceSDK := servicesdk.New(c, serviceDB)
instanceSDK := instancesdk.New(c, serviceSDK, instanceDB)
executionSDK := executionsdk.New(ps, serviceSDK, instanceSDK, execDB)
eventSDK := eventsdk.New(ps, serviceSDK)
eventSDK := eventsdk.New(ps, serviceSDK, instanceSDK)
return &SDK{
Service: serviceSDK,
Instance: instanceSDK,
Expand Down
5 changes: 4 additions & 1 deletion server/grpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func (s *Server) EmitEvent(context context.Context, request *serviceapi.EmitEven
if err := json.Unmarshal([]byte(request.EventData), &data); err != nil {
return nil, err
}
return &serviceapi.EmitEventReply{}, s.sdk.Event.Emit(instance.ServiceHash, request.EventKey, data)
if err := s.sdk.Event.Emit(instance.Hash, request.EventKey, data); err != nil {
return nil, err
}
return &serviceapi.EmitEventReply{}, nil
}

// ListenTask creates a stream that will send data for every task to execute.
Expand Down

0 comments on commit e00cc7a

Please sign in to comment.