diff --git a/event/event.go b/event/event.go index 366b13498..e763ca353 100644 --- a/event/event.go +++ b/event/event.go @@ -1,28 +1,21 @@ package event import ( - "time" - - "github.com/mesg-foundation/core/service" + "github.com/mesg-foundation/core/hash" ) // Event stores all informations about Events. type Event struct { - Service *service.Service - Key string - Data map[string]interface{} - CreatedAt time.Time + InstanceHash hash.Hash + Key string + Data map[string]interface{} } // Create creates an event eventKey with eventData for service s. -func Create(s *service.Service, eventKey string, eventData map[string]interface{}) (*Event, error) { - if err := s.RequireEventData(eventKey, eventData); err != nil { - return nil, err - } +func Create(instanceHash hash.Hash, eventKey string, eventData map[string]interface{}) *Event { return &Event{ - Service: s, - Key: eventKey, - Data: eventData, - CreatedAt: time.Now(), - }, nil + InstanceHash: instanceHash, + Key: eventKey, + Data: eventData, + } } diff --git a/event/event_test.go b/event/event_test.go deleted file mode 100644 index fe73d7449..000000000 --- a/event/event_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package event - -import ( - "testing" - - "github.com/mesg-foundation/core/service" - "github.com/stretchr/testify/require" -) - -func TestCreate(t *testing.T) { - s := &service.Service{ - Name: "TestCreate", - Events: []*service.Event{ - {Key: "test"}, - }, - } - var data map[string]interface{} - exec, err := Create(s, "test", data) - require.NoError(t, err) - require.Equal(t, s, exec.Service) - require.Equal(t, data, exec.Data) - require.Equal(t, "test", exec.Key) - require.NotNil(t, exec.CreatedAt) -} - -func TestCreateNotPresentEvent(t *testing.T) { - var ( - serviceName = "TestCreateNotPresentEvent" - eventKey = "test" - invalidEventName = "testInvalid" - s = &service.Service{ - Name: serviceName, - Events: []*service.Event{ - { - Key: eventKey, - }, - }, - } - ) - var data map[string]interface{} - _, err := Create(s, invalidEventName, data) - require.Error(t, err) - notFoundErr, ok := err.(*service.EventNotFoundError) - require.True(t, ok) - require.Equal(t, invalidEventName, notFoundErr.EventKey) - require.Equal(t, serviceName, notFoundErr.ServiceName) -} - -func TestCreateInvalidData(t *testing.T) { - var ( - eventKey = "test" - serviceName = "TestCreateInvalidData" - s = &service.Service{ - Name: serviceName, - Events: []*service.Event{ - { - Key: eventKey, - Data: []*service.Parameter{ - {Key: "xxx"}, - }, - }, - }, - } - ) - var data map[string]interface{} - _, err := Create(s, "test", data) - require.Error(t, err) - invalidErr, ok := err.(*service.InvalidEventDataError) - require.True(t, ok) - require.Equal(t, eventKey, invalidErr.EventKey) - require.Equal(t, serviceName, invalidErr.ServiceName) -} diff --git a/sdk/event/event.go b/sdk/event/event.go index 53bc97f65..001af3068 100644 --- a/sdk/event/event.go +++ b/sdk/event/event.go @@ -4,6 +4,7 @@ import ( "github.com/cskr/pubsub" "github.com/mesg-foundation/core/event" "github.com/mesg-foundation/core/hash" + instancesdk "github.com/mesg-foundation/core/sdk/instance" servicesdk "github.com/mesg-foundation/core/sdk/service" ) @@ -15,31 +16,40 @@ const ( // Event exposes event APIs of MESG. type Event struct { - ps *pubsub.PubSub - service *servicesdk.Service + ps *pubsub.PubSub + instance *instancesdk.Instance + service *servicesdk.Service } // New creates a new Event SDK with given options. -func New(ps *pubsub.PubSub, service *servicesdk.Service) *Event { +func New(ps *pubsub.PubSub, service *servicesdk.Service, instance *instancesdk.Instance) *Event { return &Event{ - ps: ps, - service: service, + ps: ps, + service: service, + instance: instance, } } // Emit emits a MESG event eventKey with eventData for service token. -func (e *Event) Emit(serviceHash hash.Hash, eventKey string, eventData map[string]interface{}) error { - s, err := e.service.Get(serviceHash) +func (e *Event) Emit(instanceHash hash.Hash, eventKey string, eventData map[string]interface{}) error { + instance, err := e.instance.Get(instanceHash) if err != nil { return err } - ev, err := event.Create(s, eventKey, eventData) + + service, err := e.service.Get(instance.ServiceHash) if err != nil { return err } - go e.ps.Pub(ev, streamTopic) - go e.ps.Pub(ev, subTopic(s.Hash)) + if err := service.RequireEventData(eventKey, eventData); err != nil { + return err + } + + event := event.Create(instanceHash, eventKey, eventData) + + go e.ps.Pub(event, streamTopic) + go e.ps.Pub(event, subTopic(instanceHash)) return nil } diff --git a/sdk/sdk.go b/sdk/sdk.go index 9b9075757..07d00131c 100644 --- a/sdk/sdk.go +++ b/sdk/sdk.go @@ -24,7 +24,7 @@ func New(c container.Container, serviceDB database.ServiceDB, instanceDB databas serviceSDK := servicesdk.New(c, serviceDB) instanceSDK := instancesdk.New(c, serviceSDK, instanceDB) executionSDK := executionsdk.New(ps, serviceSDK, instanceSDK, execDB) - eventSDK := eventsdk.New(ps, serviceSDK) + eventSDK := eventsdk.New(ps, serviceSDK, instanceSDK) return &SDK{ Service: serviceSDK, Instance: instanceSDK, diff --git a/server/grpc/service/service.go b/server/grpc/service/service.go index 7e4bd4872..282d4c938 100644 --- a/server/grpc/service/service.go +++ b/server/grpc/service/service.go @@ -44,7 +44,10 @@ func (s *Server) EmitEvent(context context.Context, request *serviceapi.EmitEven if err := json.Unmarshal([]byte(request.EventData), &data); err != nil { return nil, err } - return &serviceapi.EmitEventReply{}, s.sdk.Event.Emit(instance.ServiceHash, request.EventKey, data) + if err := s.sdk.Event.Emit(instance.Hash, request.EventKey, data); err != nil { + return nil, err + } + return &serviceapi.EmitEventReply{}, nil } // ListenTask creates a stream that will send data for every task to execute.