diff --git a/api.yaml b/api.yaml index 09525f85..87bf46c1 100755 --- a/api.yaml +++ b/api.yaml @@ -295,6 +295,12 @@ paths: application/json: schema: $ref: "#/components/schemas/Blog" + application/x-www-form-urlencoded: + schema: + $ref: "#/components/schemas/Blog" + multipart/form-data: + schema: + $ref: "#/components/schemas/Blog" responses: 200: description: Update Blog diff --git a/controllers/rest/api.go b/controllers/rest/api.go index 8f951cd3..465a2ef2 100644 --- a/controllers/rest/api.go +++ b/controllers/rest/api.go @@ -51,6 +51,7 @@ type RESTAPI struct { registeredPrePathInitializers map[reflect.Value]int postPathInitializers []PathInitializer registeredPostPathInitializers map[reflect.Value]int + entityFactories map[string]model.EntityFactory } type schema struct { @@ -175,6 +176,14 @@ func (p *RESTAPI) RegisterProjection(name string, projection projections.Project p.projections[name] = projection } +//RegisterEntityFactory Adds entity factory so that it can be referenced in the OpenAPI spec +func (p *RESTAPI) RegisterEntityFactory(name string, factory model.EntityFactory) { + if p.entityFactories == nil { + p.entityFactories = make(map[string]model.EntityFactory) + } + p.entityFactories[name] = factory +} + //GetMiddleware get middleware by name func (p *RESTAPI) GetMiddleware(name string) (Middleware, error) { if tmiddleware, ok := p.middlewares[name]; ok { @@ -256,6 +265,11 @@ func (p *RESTAPI) GetSchemas() (map[string]interface{}, error) { return schemes, nil } +//GetEntityFactories get event factories +func (p *RESTAPI) GetEntityFactories() map[string]model.EntityFactory { + return p.entityFactories +} + //Initialize and setup configurations for RESTAPI func (p *RESTAPI) Initialize(ctxt context.Context) error { //register standard controllers diff --git a/controllers/rest/main.go b/controllers/rest/main.go index 3a5c316b..0815b2ed 100644 --- a/controllers/rest/main.go +++ b/controllers/rest/main.go @@ -1,12 +1,15 @@ package rest import ( + "context" "encoding/json" "github.com/getkin/kin-openapi/openapi3" "github.com/labstack/echo/v4" + "github.com/wepala/weos/model" "io/ioutil" "os" "strings" + "time" ) //New instantiates and initializes the api @@ -21,8 +24,7 @@ func New(apiConfig string) (*RESTAPI, error) { return api, err } -//Start API -func Start(port string, apiConfig string) *RESTAPI { +func Start(port string, apiConfig string, replay bool) *RESTAPI { api, err := New(apiConfig) if err != nil { api.EchoInstance().Logger.Error(err) @@ -31,6 +33,17 @@ func Start(port string, apiConfig string) *RESTAPI { if err != nil { api.EchoInstance().Logger.Fatal(err) } + + if replay == true { + e, _ := api.GetEventStore("Default") + eventRepo := e.(*model.EventRepositoryGorm) + projection, _ := api.GetProjection("Default") + factories := api.GetEntityFactories() + + total, success, failed, err := eventRepo.ReplayEvents(context.Background(), time.Time{}, factories, projection) + api.EchoInstance().Logger.Debugf("total: %d, success: %d, failed: %d, err: %s", total, success, failed, err) + } + api.EchoInstance().Logger.Fatal(api.EchoInstance().Start(":" + port)) return api } diff --git a/controllers/rest/operation_initializers.go b/controllers/rest/operation_initializers.go index 197cb96b..49c3fde0 100644 --- a/controllers/rest/operation_initializers.go +++ b/controllers/rest/operation_initializers.go @@ -41,6 +41,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } @@ -55,6 +56,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } break @@ -65,6 +67,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } @@ -79,6 +82,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } @@ -88,6 +92,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } else { @@ -104,6 +109,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } diff --git a/controllers/rest/weos_mocks_test.go b/controllers/rest/weos_mocks_test.go index 68f3de34..0c8b90b6 100644 --- a/controllers/rest/weos_mocks_test.go +++ b/controllers/rest/weos_mocks_test.go @@ -12,6 +12,7 @@ import ( "gorm.io/gorm" "net/http" "sync" + "time" ) // Ensure, that EventRepositoryMock does implement model.EventRepository. @@ -54,6 +55,9 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { +// panic("mock out the ReplayEvents method") +// }, // } // // // use mockedEventRepository in code that requires model.EventRepository @@ -91,6 +95,9 @@ type EventRepositoryMock struct { // PersistFunc mocks the Persist method. PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error + // ReplayEventsFunc mocks the ReplayEvents method. + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) + // calls tracks calls to the methods. calls struct { // AddSubscriber holds details about calls to the AddSubscriber method. @@ -151,6 +158,17 @@ type EventRepositoryMock struct { // Entity is the entity argument value. Entity model.AggregateInterface } + // ReplayEvents holds details about calls to the ReplayEvents method. + ReplayEvents []struct { + // Ctxt is the ctxt argument value. + Ctxt context.Context + // Date is the date argument value. + Date time.Time + // EntityFactories is the entityFactories argument value. + EntityFactories map[string]model.EntityFactory + // Projection is the projection argument value. + Projection model.Projection + } } lockAddSubscriber sync.RWMutex lockFlush sync.RWMutex @@ -162,6 +180,7 @@ type EventRepositoryMock struct { lockGetSubscribers sync.RWMutex lockMigrate sync.RWMutex lockPersist sync.RWMutex + lockReplayEvents sync.RWMutex } // AddSubscriber calls AddSubscriberFunc. @@ -488,6 +507,49 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { return calls } +// ReplayEvents calls ReplayEventsFunc. +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { + if mock.ReplayEventsFunc == nil { + panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") + } + callInfo := struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + Projection model.Projection + }{ + Ctxt: ctxt, + Date: date, + EntityFactories: entityFactories, + Projection: projection, + } + mock.lockReplayEvents.Lock() + mock.calls.ReplayEvents = append(mock.calls.ReplayEvents, callInfo) + mock.lockReplayEvents.Unlock() + return mock.ReplayEventsFunc(ctxt, date, entityFactories, projection) +} + +// ReplayEventsCalls gets all the calls that were made to ReplayEvents. +// Check the length with: +// len(mockedEventRepository.ReplayEventsCalls()) +func (mock *EventRepositoryMock) ReplayEventsCalls() []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + Projection model.Projection +} { + var calls []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + Projection model.Projection + } + mock.lockReplayEvents.RLock() + calls = mock.calls.ReplayEvents + mock.lockReplayEvents.RUnlock() + return calls +} + // Ensure, that ProjectionMock does implement model.Projection. // If this is not the case, regenerate this file with moq. var _ model.Projection = &ProjectionMock{} @@ -2071,6 +2133,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // // make and configure a mocked model.EntityFactory // mockedEntityFactory := &EntityFactoryMock{ +// BuilderFunc: func(ctx context.Context) ds.Builder { +// panic("mock out the Builder method") +// }, // DynamicStructFunc: func(ctx context.Context) ds.DynamicStruct { // panic("mock out the DynamicStruct method") // }, @@ -2096,6 +2161,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // } type EntityFactoryMock struct { + // BuilderFunc mocks the Builder method. + BuilderFunc func(ctx context.Context) ds.Builder + // DynamicStructFunc mocks the DynamicStruct method. DynamicStructFunc func(ctx context.Context) ds.DynamicStruct @@ -2116,6 +2184,11 @@ type EntityFactoryMock struct { // calls tracks calls to the methods. calls struct { + // Builder holds details about calls to the Builder method. + Builder []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // DynamicStruct holds details about calls to the DynamicStruct method. DynamicStruct []struct { // Ctx is the ctx argument value. @@ -2145,6 +2218,7 @@ type EntityFactoryMock struct { TableName []struct { } } + lockBuilder sync.RWMutex lockDynamicStruct sync.RWMutex lockFromSchemaAndBuilder sync.RWMutex lockName sync.RWMutex @@ -2153,6 +2227,37 @@ type EntityFactoryMock struct { lockTableName sync.RWMutex } +// Builder calls BuilderFunc. +func (mock *EntityFactoryMock) Builder(ctx context.Context) ds.Builder { + if mock.BuilderFunc == nil { + panic("EntityFactoryMock.BuilderFunc: method is nil but EntityFactory.Builder was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockBuilder.Lock() + mock.calls.Builder = append(mock.calls.Builder, callInfo) + mock.lockBuilder.Unlock() + return mock.BuilderFunc(ctx) +} + +// BuilderCalls gets all the calls that were made to Builder. +// Check the length with: +// len(mockedEntityFactory.BuilderCalls()) +func (mock *EntityFactoryMock) BuilderCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockBuilder.RLock() + calls = mock.calls.Builder + mock.lockBuilder.RUnlock() + return calls +} + // DynamicStruct calls DynamicStructFunc. func (mock *EntityFactoryMock) DynamicStruct(ctx context.Context) ds.DynamicStruct { if mock.DynamicStructFunc == nil { @@ -2331,3 +2436,153 @@ func (mock *EntityFactoryMock) TableNameCalls() []struct { mock.lockTableName.RUnlock() return calls } + +// Ensure, that EventDispatcherMock does implement model.EventDispatcher. +// If this is not the case, regenerate this file with moq. +var _ model.EventDispatcher = &EventDispatcherMock{} + +// EventDispatcherMock is a mock implementation of model.EventDispatcher. +// +// func TestSomethingThatUsesEventDispatcher(t *testing.T) { +// +// // make and configure a mocked model.EventDispatcher +// mockedEventDispatcher := &EventDispatcherMock{ +// AddSubscriberFunc: func(handler model.EventHandler) { +// panic("mock out the AddSubscriber method") +// }, +// DispatchFunc: func(ctx context.Context, event model.Event) { +// panic("mock out the Dispatch method") +// }, +// GetSubscribersFunc: func() []model.EventHandler { +// panic("mock out the GetSubscribers method") +// }, +// } +// +// // use mockedEventDispatcher in code that requires model.EventDispatcher +// // and then make assertions. +// +// } +type EventDispatcherMock struct { + // AddSubscriberFunc mocks the AddSubscriber method. + AddSubscriberFunc func(handler model.EventHandler) + + // DispatchFunc mocks the Dispatch method. + DispatchFunc func(ctx context.Context, event model.Event) + + // GetSubscribersFunc mocks the GetSubscribers method. + GetSubscribersFunc func() []model.EventHandler + + // calls tracks calls to the methods. + calls struct { + // AddSubscriber holds details about calls to the AddSubscriber method. + AddSubscriber []struct { + // Handler is the handler argument value. + Handler model.EventHandler + } + // Dispatch holds details about calls to the Dispatch method. + Dispatch []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Event is the event argument value. + Event model.Event + } + // GetSubscribers holds details about calls to the GetSubscribers method. + GetSubscribers []struct { + } + } + lockAddSubscriber sync.RWMutex + lockDispatch sync.RWMutex + lockGetSubscribers sync.RWMutex +} + +// AddSubscriber calls AddSubscriberFunc. +func (mock *EventDispatcherMock) AddSubscriber(handler model.EventHandler) { + if mock.AddSubscriberFunc == nil { + panic("EventDispatcherMock.AddSubscriberFunc: method is nil but EventDispatcher.AddSubscriber was just called") + } + callInfo := struct { + Handler model.EventHandler + }{ + Handler: handler, + } + mock.lockAddSubscriber.Lock() + mock.calls.AddSubscriber = append(mock.calls.AddSubscriber, callInfo) + mock.lockAddSubscriber.Unlock() + mock.AddSubscriberFunc(handler) +} + +// AddSubscriberCalls gets all the calls that were made to AddSubscriber. +// Check the length with: +// len(mockedEventDispatcher.AddSubscriberCalls()) +func (mock *EventDispatcherMock) AddSubscriberCalls() []struct { + Handler model.EventHandler +} { + var calls []struct { + Handler model.EventHandler + } + mock.lockAddSubscriber.RLock() + calls = mock.calls.AddSubscriber + mock.lockAddSubscriber.RUnlock() + return calls +} + +// Dispatch calls DispatchFunc. +func (mock *EventDispatcherMock) Dispatch(ctx context.Context, event model.Event) { + if mock.DispatchFunc == nil { + panic("EventDispatcherMock.DispatchFunc: method is nil but EventDispatcher.Dispatch was just called") + } + callInfo := struct { + Ctx context.Context + Event model.Event + }{ + Ctx: ctx, + Event: event, + } + mock.lockDispatch.Lock() + mock.calls.Dispatch = append(mock.calls.Dispatch, callInfo) + mock.lockDispatch.Unlock() + mock.DispatchFunc(ctx, event) +} + +// DispatchCalls gets all the calls that were made to Dispatch. +// Check the length with: +// len(mockedEventDispatcher.DispatchCalls()) +func (mock *EventDispatcherMock) DispatchCalls() []struct { + Ctx context.Context + Event model.Event +} { + var calls []struct { + Ctx context.Context + Event model.Event + } + mock.lockDispatch.RLock() + calls = mock.calls.Dispatch + mock.lockDispatch.RUnlock() + return calls +} + +// GetSubscribers calls GetSubscribersFunc. +func (mock *EventDispatcherMock) GetSubscribers() []model.EventHandler { + if mock.GetSubscribersFunc == nil { + panic("EventDispatcherMock.GetSubscribersFunc: method is nil but EventDispatcher.GetSubscribers was just called") + } + callInfo := struct { + }{} + mock.lockGetSubscribers.Lock() + mock.calls.GetSubscribers = append(mock.calls.GetSubscribers, callInfo) + mock.lockGetSubscribers.Unlock() + return mock.GetSubscribersFunc() +} + +// GetSubscribersCalls gets all the calls that were made to GetSubscribers. +// Check the length with: +// len(mockedEventDispatcher.GetSubscribersCalls()) +func (mock *EventDispatcherMock) GetSubscribersCalls() []struct { +} { + var calls []struct { + } + mock.lockGetSubscribers.RLock() + calls = mock.calls.GetSubscribers + mock.lockGetSubscribers.RUnlock() + return calls +} diff --git a/end2end_test.go b/end2end_test.go index 19175ee4..a4ab171f 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "github.com/testcontainers/testcontainers-go" + "github.com/wepala/weos/model" "github.com/wepala/weos/projections" "mime/multipart" "net/http" @@ -56,6 +57,10 @@ var page int var contentType string var result api.ListApiResponse var scenarioContext context.Context +var total int +var success int +var failed int +var errArray []error var filters string type FilterProperties struct { @@ -92,6 +97,7 @@ func InitializeSuite(ctx *godog.TestSuiteContext) { page = 1 limit = 0 result = api.ListApiResponse{} + total, success, failed = 0, 0, 0 os.Remove("e2e.db") openAPI = `openapi: 3.0.3 info: @@ -162,6 +168,7 @@ func reset(ctx context.Context, sc *godog.Scenario) (context.Context, error) { if err != nil { fmt.Errorf("unexpected error '%s'", err) } + total, success, failed = 0, 0, 0 db.Exec("PRAGMA foreign_keys = ON") e = echo.New() openAPI = `openapi: 3.0.3 @@ -1081,6 +1088,92 @@ func aFilterOnTheFieldNeWithValue(field, value string) error { return nil } +func callsTheReplayMethodOnTheEventRepository(arg1 string) error { + repo, err := API.GetEventStore("Default") + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } + eventRepo := repo.(*model.EventRepositoryGorm) + projection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } + + factories := API.GetEntityFactories() + total, success, failed, errArray = eventRepo.ReplayEvents(context.Background(), time.Time{}, factories, projection) + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } + return nil +} + +func sojournerDeletesTheTable(tableName string) error { + //output := map[string]interface{}{} + + apiProjection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("unexpected error getting projection: %s", err) + } + apiProjection1 := apiProjection.(*projections.GORMDB) + + result := apiProjection1.DB().Migrator().DropTable(strings.Title(tableName)) + if result != nil { + return fmt.Errorf("error dropping table: %s got err: %s", tableName, result) + } + return nil +} + +func theTableShouldBePopulatedWith(contentType string, details *godog.Table) error { + contentEntity := map[string]interface{}{} + var result *gorm.DB + + head := details.Rows[0].Cells + compare := map[string]interface{}{} + + for i := 1; i < len(details.Rows); i++ { + for n, cell := range details.Rows[i].Cells { + compare[head[n].Value] = cell.Value + } + + apiProjection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("unexpected error getting projection: %s", err) + } + apiProjection1 := apiProjection.(*projections.GORMDB) + result = apiProjection1.DB().Table(strings.Title(contentType)).Find(&contentEntity, "weos_ID = ?", compare["weos_id"]) + + if contentEntity == nil { + return fmt.Errorf("unexpected error finding content type in db") + } + + if result.Error != nil { + return fmt.Errorf("unexpected error finding content type: %s", result.Error) + } + + for key, value := range compare { + if key == "sequence_no" { + strSeq := strconv.Itoa(int(contentEntity[key].(int64))) + if strSeq != value { + return fmt.Errorf("expected %s %s %s, got %s", contentType, key, value, contentEntity[key]) + } + } else { + if contentEntity[key] != value { + return fmt.Errorf("expected %s %s %s, got %s", contentType, key, value, contentEntity[key]) + } + } + } + + } + return nil +} + +func theTotalNoEventsAndProcessedAndFailuresShouldBeReturned() error { + if total == 0 && success == 0 && failed == 0 { + return fmt.Errorf("expected total, success and failed to be non 0 values") + } + return nil +} + func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Before(reset) //add context steps @@ -1144,6 +1237,10 @@ func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Step(`^a filter on the field "([^"]*)" "like" with value "([^"]*)"$`, aFilterOnTheFieldLikeWithValue) ctx.Step(`^a filter on the field "([^"]*)" "lt" with value "([^"]*)"$`, aFilterOnTheFieldLtWithValue) ctx.Step(`^a filter on the field "([^"]*)" "ne" with value "([^"]*)"$`, aFilterOnTheFieldNeWithValue) + ctx.Step(`^"([^"]*)" calls the replay method on the event repository$`, callsTheReplayMethodOnTheEventRepository) + ctx.Step(`^Sojourner" deletes the "([^"]*)" table$`, sojournerDeletesTheTable) + ctx.Step(`^the "([^"]*)" table should be populated with$`, theTableShouldBePopulatedWith) + ctx.Step(`^the total no\. events and processed and failures should be returned$`, theTotalNoEventsAndProcessedAndFailuresShouldBeReturned) } func TestBDD(t *testing.T) { diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature new file mode 100644 index 00000000..76b2c6e2 --- /dev/null +++ b/features/hydrate-datastore-from-events.feature @@ -0,0 +1,259 @@ + +Feature: Hydrate database using events + + The events generated in the API could be used to re-create tables in the base data store or to create new datastores. + The events could be used to do repairs as well (if the handlers on the projection are done in an idempotent way) + + Background: + + Given a developer "Sojourner" + And "Sojourner" has an account with id "1234" + And "Open API 3.0" is used to model the service + And the specification is + """ + openapi: 3.0.3 + info: + title: Blog Aggregator Rest API + version: 0.1.0 + description: REST API for interacting with the Blog Aggregator + servers: + - url: https://prod1.weos.sh/blog/dev + description: WeOS Dev + - url: https://prod1.weos.sh/blog/v1 + x-weos-config: + logger: + level: warn + report-caller: true + formatter: json + database: + driver: sqlite3 + database: e2e.db + event-source: + - title: default + driver: service + endpoint: https://prod1.weos.sh/events/v1 + - title: event + driver: sqlite3 + database: e2e.db + databases: + - title: default + driver: sqlite3 + database: e2e.db + rest: + middleware: + - RequestID + - Recover + - ZapLogger + components: + schemas: + Blog: + type: object + properties: + id: + type: string + title: + type: string + description: blog title + description: + type: string + required: + - title + x-identifier: + - id + Post: + type: object + properties: + title: + type: string + description: + type: string + blog: + $ref: "#/components/schemas/Blog" + publishedDate: + type: string + format: date-time + views: + type: integer + categories: + type: array + items: + $ref: "#/components/schemas/Post" + required: + - title + Category: + type: object + properties: + title: + type: string + description: + type: string + required: + - title + paths: + /: + get: + operationId: Homepage + responses: + 200: + description: Application Homepage + /blog: + get: + operationId: Get Blogs + summary: Get List of Blogs + parameters: + - in: query + name: page + schema: + type: integer + - in: query + name: limit + schema: + type: integer + - in: query + name: filters + schema: + type: array + items: + type: object + properties: + field: + type: string + operator: + type: string + value: + type: array + items: + type: string + + required: false + description: query string + responses: + 200: + description: List of blogs + content: + application/json: + schema: + type: object + properties: + total: + type: integer + page: + type: integer + items: + type: array + items: + $ref: "#/components/schemas/Blog" + post: + operationId: Add Blog + requestBody: + description: Blog info that is submitted + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + application/x-www-form-urlencoded: + schema: + $ref: "#/components/schemas/Blog" + application/xml: + schema: + $ref: "#/components/schemas/Blog" + responses: + 201: + description: Add Blog to Aggregator + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + 400: + description: Invalid blog submitted + /blogs/{id}: + get: + parameters: + - in: path + name: id + schema: + type: string + required: true + description: blog id + - in: query + name: sequence_no + schema: + type: string + summary: Get Blog by id + operationId: Get Blog + responses: + 200: + description: Blog details without any supporting collections + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + put: + parameters: + - in: path + name: id + schema: + type: string + required: true + description: blog id + summary: Update blog details + operationId: Update Blog + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + responses: + 200: + description: Update Blog + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + delete: + parameters: + - in: path + name: id + schema: + type: string + required: true + description: blog id + summary: Delete blog + operationId: Delete Blog + responses: + 200: + description: Blog Deleted + """ + And the service is running + And blogs in the api + | id | weos_id | sequence_no | title | description | + | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | + | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | + | 164 | 24KjFbp82wGq4qb5LAxLdA5GbR2 | 1 | Blog 6 | Some Blog 6 | + | 3 | 24KjHaQbjEv0ZxfKxFup1dI6iKP | 4 | Blog 3 | Some Blog 3 | + | 4 | 24KjIq8KJhIhWa7d8sNJhRilGpA | 1 | Blog 4 | Some Blog 4 | + | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | + | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | + | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | + + @WEOS-1327 + Scenario: Hydrate tables based on events + + A developer should be able to configure an event repository to replay all it's events on startup. This should trigger + the associated projections + + Given Sojourner" deletes the "Blog" table + When "Sojourner" calls the replay method on the event repository + Then the "Blog" table should be populated with + | id | weos_id | sequence_no | title | description | + | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | + | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | + | 164 | 24KjFbp82wGq4qb5LAxLdA5GbR2 | 1 | Blog 6 | Some Blog 6 | + | 3 | 24KjHaQbjEv0ZxfKxFup1dI6iKP | 4 | Blog 3 | Some Blog 3 | + | 4 | 24KjIq8KJhIhWa7d8sNJhRilGpA | 1 | Blog 4 | Some Blog 4 | + | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | + | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | + | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | + And the total no. events and processed and failures should be returned diff --git a/model/entity_factory.go b/model/entity_factory.go index f4db839d..d2246968 100644 --- a/model/entity_factory.go +++ b/model/entity_factory.go @@ -15,6 +15,7 @@ type EntityFactory interface { Name() string TableName() string Schema() *openapi3.Schema + Builder(ctx context.Context) ds.Builder } type DefaultEntityFactory struct { @@ -50,6 +51,10 @@ func (d *DefaultEntityFactory) DynamicStruct(ctx context.Context) ds.DynamicStru return d.builder.Build() } +func (d *DefaultEntityFactory) Builder(ctx context.Context) ds.Builder { + return d.builder +} + //GetEntityFactory get entity factory from context func GetEntityFactory(ctx context.Context) EntityFactory { if value, ok := ctx.Value(weosContext.ENTITY_FACTORY).(EntityFactory); ok { diff --git a/model/event_dispatcher.go b/model/event_dispatcher.go index ec9842c2..7a92e0c7 100644 --- a/model/event_dispatcher.go +++ b/model/event_dispatcher.go @@ -11,8 +11,10 @@ type DefaultEventDisptacher struct { dispatch sync.Mutex } -func (e *DefaultEventDisptacher) Dispatch(ctx context.Context, event Event) { +func (e *DefaultEventDisptacher) Dispatch(ctx context.Context, event Event) []error { //mutex helps keep state between routines + var errors []error + e.dispatch.Lock() defer e.dispatch.Unlock() var wg sync.WaitGroup @@ -26,11 +28,16 @@ func (e *DefaultEventDisptacher) Dispatch(ctx context.Context, event Event) { } wg.Done() }() - handler(ctx, event) + + err := handler(ctx, event) + if err != nil { + errors = append(errors, err) + } + }() } - wg.Wait() + return errors } func (e *DefaultEventDisptacher) AddSubscriber(handler EventHandler) { @@ -41,4 +48,4 @@ func (e *DefaultEventDisptacher) GetSubscribers() []EventHandler { return e.handlers } -type EventHandler func(ctx context.Context, event Event) +type EventHandler func(ctx context.Context, event Event) error diff --git a/model/event_dispatcher_test.go b/model/event_dispatcher_test.go index e9cdc87f..9820bd20 100644 --- a/model/event_dispatcher_test.go +++ b/model/event_dispatcher_test.go @@ -1,6 +1,7 @@ package model_test import ( + "fmt" weos "github.com/wepala/weos/model" "golang.org/x/net/context" "testing" @@ -19,15 +20,18 @@ func TestEventDisptacher_Dispatch(t *testing.T) { } dispatcher := &weos.DefaultEventDisptacher{} handlersCalled := 0 - dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) { + dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) error { handlersCalled += 1 + return nil }) - dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) { + dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) error { handlersCalled += 1 if event.Type != mockEvent.Type { t.Errorf("expected the type to be '%s', got '%s'", mockEvent.Type, event.Type) + return fmt.Errorf("expected the type to be '%s', got '%s'", mockEvent.Type, event.Type) } + return nil }) dispatcher.Dispatch(context.TODO(), *mockEvent) diff --git a/model/interfaces.go b/model/interfaces.go index ae490e49..1149078b 100644 --- a/model/interfaces.go +++ b/model/interfaces.go @@ -4,6 +4,7 @@ package model import ( ds "github.com/ompluscator/dynamic-struct" "golang.org/x/net/context" + "time" "gorm.io/gorm" ) @@ -69,6 +70,7 @@ type EventRepository interface { GetByAggregateAndSequenceRange(ID string, start int64, end int64) ([]*Event, error) AddSubscriber(handler EventHandler) GetSubscribers() ([]EventHandler, error) + ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projection Projection) (int, int, int, []error) } type Datastore interface { diff --git a/model/mocks_test.go b/model/mocks_test.go index debf1e3d..2a4c6a4d 100644 --- a/model/mocks_test.go +++ b/model/mocks_test.go @@ -12,6 +12,7 @@ import ( "gorm.io/gorm" "net/http" "sync" + "time" ) // Ensure, that EventRepositoryMock does implement model.EventRepository. @@ -54,6 +55,9 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { +// panic("mock out the ReplayEvents method") +// }, // } // // // use mockedEventRepository in code that requires model.EventRepository @@ -91,6 +95,9 @@ type EventRepositoryMock struct { // PersistFunc mocks the Persist method. PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error + // ReplayEventsFunc mocks the ReplayEvents method. + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) + // calls tracks calls to the methods. calls struct { // AddSubscriber holds details about calls to the AddSubscriber method. @@ -151,6 +158,17 @@ type EventRepositoryMock struct { // Entity is the entity argument value. Entity model.AggregateInterface } + // ReplayEvents holds details about calls to the ReplayEvents method. + ReplayEvents []struct { + // Ctxt is the ctxt argument value. + Ctxt context.Context + // Date is the date argument value. + Date time.Time + // EntityFactories is the entityFactories argument value. + EntityFactories map[string]model.EntityFactory + // Projection is the projection argument value. + Projection model.Projection + } } lockAddSubscriber sync.RWMutex lockFlush sync.RWMutex @@ -162,6 +180,7 @@ type EventRepositoryMock struct { lockGetSubscribers sync.RWMutex lockMigrate sync.RWMutex lockPersist sync.RWMutex + lockReplayEvents sync.RWMutex } // AddSubscriber calls AddSubscriberFunc. @@ -488,6 +507,49 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { return calls } +// ReplayEvents calls ReplayEventsFunc. +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { + if mock.ReplayEventsFunc == nil { + panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") + } + callInfo := struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + Projection model.Projection + }{ + Ctxt: ctxt, + Date: date, + EntityFactories: entityFactories, + Projection: projection, + } + mock.lockReplayEvents.Lock() + mock.calls.ReplayEvents = append(mock.calls.ReplayEvents, callInfo) + mock.lockReplayEvents.Unlock() + return mock.ReplayEventsFunc(ctxt, date, entityFactories, projection) +} + +// ReplayEventsCalls gets all the calls that were made to ReplayEvents. +// Check the length with: +// len(mockedEventRepository.ReplayEventsCalls()) +func (mock *EventRepositoryMock) ReplayEventsCalls() []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + Projection model.Projection +} { + var calls []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + Projection model.Projection + } + mock.lockReplayEvents.RLock() + calls = mock.calls.ReplayEvents + mock.lockReplayEvents.RUnlock() + return calls +} + // Ensure, that ProjectionMock does implement model.Projection. // If this is not the case, regenerate this file with moq. var _ model.Projection = &ProjectionMock{} @@ -2071,6 +2133,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // // make and configure a mocked model.EntityFactory // mockedEntityFactory := &EntityFactoryMock{ +// BuilderFunc: func(ctx context.Context) ds.Builder { +// panic("mock out the Builder method") +// }, // DynamicStructFunc: func(ctx context.Context) ds.DynamicStruct { // panic("mock out the DynamicStruct method") // }, @@ -2096,6 +2161,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // } type EntityFactoryMock struct { + // BuilderFunc mocks the Builder method. + BuilderFunc func(ctx context.Context) ds.Builder + // DynamicStructFunc mocks the DynamicStruct method. DynamicStructFunc func(ctx context.Context) ds.DynamicStruct @@ -2116,6 +2184,11 @@ type EntityFactoryMock struct { // calls tracks calls to the methods. calls struct { + // Builder holds details about calls to the Builder method. + Builder []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // DynamicStruct holds details about calls to the DynamicStruct method. DynamicStruct []struct { // Ctx is the ctx argument value. @@ -2145,6 +2218,7 @@ type EntityFactoryMock struct { TableName []struct { } } + lockBuilder sync.RWMutex lockDynamicStruct sync.RWMutex lockFromSchemaAndBuilder sync.RWMutex lockName sync.RWMutex @@ -2153,6 +2227,37 @@ type EntityFactoryMock struct { lockTableName sync.RWMutex } +// Builder calls BuilderFunc. +func (mock *EntityFactoryMock) Builder(ctx context.Context) ds.Builder { + if mock.BuilderFunc == nil { + panic("EntityFactoryMock.BuilderFunc: method is nil but EntityFactory.Builder was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockBuilder.Lock() + mock.calls.Builder = append(mock.calls.Builder, callInfo) + mock.lockBuilder.Unlock() + return mock.BuilderFunc(ctx) +} + +// BuilderCalls gets all the calls that were made to Builder. +// Check the length with: +// len(mockedEntityFactory.BuilderCalls()) +func (mock *EntityFactoryMock) BuilderCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockBuilder.RLock() + calls = mock.calls.Builder + mock.lockBuilder.RUnlock() + return calls +} + // DynamicStruct calls DynamicStructFunc. func (mock *EntityFactoryMock) DynamicStruct(ctx context.Context) ds.DynamicStruct { if mock.DynamicStructFunc == nil { @@ -2331,3 +2436,153 @@ func (mock *EntityFactoryMock) TableNameCalls() []struct { mock.lockTableName.RUnlock() return calls } + +// Ensure, that EventDispatcherMock does implement model.EventDispatcher. +// If this is not the case, regenerate this file with moq. +var _ model.EventDispatcher = &EventDispatcherMock{} + +// EventDispatcherMock is a mock implementation of model.EventDispatcher. +// +// func TestSomethingThatUsesEventDispatcher(t *testing.T) { +// +// // make and configure a mocked model.EventDispatcher +// mockedEventDispatcher := &EventDispatcherMock{ +// AddSubscriberFunc: func(handler model.EventHandler) { +// panic("mock out the AddSubscriber method") +// }, +// DispatchFunc: func(ctx context.Context, event model.Event) { +// panic("mock out the Dispatch method") +// }, +// GetSubscribersFunc: func() []model.EventHandler { +// panic("mock out the GetSubscribers method") +// }, +// } +// +// // use mockedEventDispatcher in code that requires model.EventDispatcher +// // and then make assertions. +// +// } +type EventDispatcherMock struct { + // AddSubscriberFunc mocks the AddSubscriber method. + AddSubscriberFunc func(handler model.EventHandler) + + // DispatchFunc mocks the Dispatch method. + DispatchFunc func(ctx context.Context, event model.Event) + + // GetSubscribersFunc mocks the GetSubscribers method. + GetSubscribersFunc func() []model.EventHandler + + // calls tracks calls to the methods. + calls struct { + // AddSubscriber holds details about calls to the AddSubscriber method. + AddSubscriber []struct { + // Handler is the handler argument value. + Handler model.EventHandler + } + // Dispatch holds details about calls to the Dispatch method. + Dispatch []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Event is the event argument value. + Event model.Event + } + // GetSubscribers holds details about calls to the GetSubscribers method. + GetSubscribers []struct { + } + } + lockAddSubscriber sync.RWMutex + lockDispatch sync.RWMutex + lockGetSubscribers sync.RWMutex +} + +// AddSubscriber calls AddSubscriberFunc. +func (mock *EventDispatcherMock) AddSubscriber(handler model.EventHandler) { + if mock.AddSubscriberFunc == nil { + panic("EventDispatcherMock.AddSubscriberFunc: method is nil but EventDispatcher.AddSubscriber was just called") + } + callInfo := struct { + Handler model.EventHandler + }{ + Handler: handler, + } + mock.lockAddSubscriber.Lock() + mock.calls.AddSubscriber = append(mock.calls.AddSubscriber, callInfo) + mock.lockAddSubscriber.Unlock() + mock.AddSubscriberFunc(handler) +} + +// AddSubscriberCalls gets all the calls that were made to AddSubscriber. +// Check the length with: +// len(mockedEventDispatcher.AddSubscriberCalls()) +func (mock *EventDispatcherMock) AddSubscriberCalls() []struct { + Handler model.EventHandler +} { + var calls []struct { + Handler model.EventHandler + } + mock.lockAddSubscriber.RLock() + calls = mock.calls.AddSubscriber + mock.lockAddSubscriber.RUnlock() + return calls +} + +// Dispatch calls DispatchFunc. +func (mock *EventDispatcherMock) Dispatch(ctx context.Context, event model.Event) { + if mock.DispatchFunc == nil { + panic("EventDispatcherMock.DispatchFunc: method is nil but EventDispatcher.Dispatch was just called") + } + callInfo := struct { + Ctx context.Context + Event model.Event + }{ + Ctx: ctx, + Event: event, + } + mock.lockDispatch.Lock() + mock.calls.Dispatch = append(mock.calls.Dispatch, callInfo) + mock.lockDispatch.Unlock() + mock.DispatchFunc(ctx, event) +} + +// DispatchCalls gets all the calls that were made to Dispatch. +// Check the length with: +// len(mockedEventDispatcher.DispatchCalls()) +func (mock *EventDispatcherMock) DispatchCalls() []struct { + Ctx context.Context + Event model.Event +} { + var calls []struct { + Ctx context.Context + Event model.Event + } + mock.lockDispatch.RLock() + calls = mock.calls.Dispatch + mock.lockDispatch.RUnlock() + return calls +} + +// GetSubscribers calls GetSubscribersFunc. +func (mock *EventDispatcherMock) GetSubscribers() []model.EventHandler { + if mock.GetSubscribersFunc == nil { + panic("EventDispatcherMock.GetSubscribersFunc: method is nil but EventDispatcher.GetSubscribers was just called") + } + callInfo := struct { + }{} + mock.lockGetSubscribers.Lock() + mock.calls.GetSubscribers = append(mock.calls.GetSubscribers, callInfo) + mock.lockGetSubscribers.Unlock() + return mock.GetSubscribersFunc() +} + +// GetSubscribersCalls gets all the calls that were made to GetSubscribers. +// Check the length with: +// len(mockedEventDispatcher.GetSubscribersCalls()) +func (mock *EventDispatcherMock) GetSubscribersCalls() []struct { +} { + var calls []struct { + } + mock.lockGetSubscribers.RLock() + calls = mock.calls.GetSubscribers + mock.lockGetSubscribers.RUnlock() + return calls +} diff --git a/model/module_test.go b/model/module_test.go index 21cb9012..93f67e8b 100644 --- a/model/module_test.go +++ b/model/module_test.go @@ -219,8 +219,8 @@ func TestWeOSApp_AddProjection(t *testing.T) { } mockProjection := &ProjectionMock{ GetEventHandlerFunc: func() weos.EventHandler { - return func(ctx context.Context, event weos.Event) { - + return func(ctx context.Context, event weos.Event) error { + return nil } }, MigrateFunc: func(ctx context.Context, builders map[string]dynamicstruct.Builder) error { diff --git a/model/repositories.go b/model/repositories.go index 47a2d7be..3c998dd6 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -2,11 +2,13 @@ package model import ( "encoding/json" + ds "github.com/ompluscator/dynamic-struct" "github.com/segmentio/ksuid" context2 "github.com/wepala/weos/context" "golang.org/x/net/context" "gorm.io/datatypes" "gorm.io/gorm" + "time" ) type EventRepositoryGorm struct { @@ -56,6 +58,9 @@ func NewGormEvent(event *Event) (GormEvent, error) { func (e *EventRepositoryGorm) Persist(ctxt context.Context, entity AggregateInterface) error { //TODO use the information in the context to get account info, module info. //didn't think it should barf if an empty list is passed + entityFact := ctxt.Value(context2.ENTITY_FACTORY) + schemaName := entityFact.(EntityFactory).Name() + var gormEvents []GormEvent entities := entity.GetNewChanges() savePointID := "s" + ksuid.New().String() //NOTE the save point can't start with a number @@ -79,6 +84,9 @@ func (e *EventRepositoryGorm) Persist(ctxt context.Context, entity AggregateInte if event.Meta.Group == "" { event.Meta.Group = e.GroupID } + if event.Meta.EntityType == "ContentEntity" || event.Meta.EntityType == "" { + event.Meta.EntityType = schemaName + } if !event.IsValid() { for _, terr := range event.GetErrors() { e.logger.Errorf("error encountered persisting entity '%s', '%s'", event.Meta.EntityID, terr) @@ -286,6 +294,78 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { return nil } +//Content may not be applicable to this func since there would be an instance of it being called at server.go run. Therefore we won't have a "proper" content which would contain the EntityFactory +func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projections Projection) (int, int, int, []error) { + var errors []error + var errArray []error + + schemas := make(map[string]ds.Builder) + + for _, value := range entityFactories { + schemas[value.Name()] = value.Builder(context.Background()) + } + + err := projections.Migrate(ctxt, schemas) + if err != nil { + e.logger.Errorf("error migrating tables: %s", err) + } + + var events []GormEvent + + if date.IsZero() { + result := e.DB.Table("gorm_events").Find(&events) + if result.Error != nil { + e.logger.Errorf("got error pulling events '%s'", result.Error) + errors = append(errors, result.Error) + return 0, 0, 0, errors + } + } else { + result := e.DB.Table("gorm_events").Where("created_at = ?", date).Find(&events) + if result.Error != nil { + e.logger.Errorf("got error pulling events '%s'", result.Error) + errors = append(errors, result.Error) + return 0, 0, 0, errors + } + } + + var tEvents []*Event + + for _, event := range events { + tEvents = append(tEvents, &Event{ + ID: event.ID, + Type: event.Type, + Payload: json.RawMessage(event.Payload), + Meta: EventMeta{ + EntityID: event.EntityID, + EntityType: event.EntityType, + RootID: event.RootID, + Module: event.ApplicationID, + User: event.User, + SequenceNo: event.SequenceNo, + }, + Version: 0, + }) + } + + totalEvents := len(tEvents) + successfulEvents := 0 + failedEvents := 0 + + for _, event := range tEvents { + + newContext := context.WithValue(ctxt, context2.ENTITY_FACTORY, entityFactories[event.Meta.EntityType]) + errArray = e.eventDispatcher.Dispatch(newContext, *event) + if len(errArray) == 0 { + successfulEvents++ + } else { + errors = append(errors, errArray...) + failedEvents++ + } + + } + return totalEvents, successfulEvents, failedEvents, errors +} + func NewBasicEventRepository(gormDB *gorm.DB, logger Log, useUnitOfWork bool, accountID string, applicationID string) (EventRepository, error) { if useUnitOfWork { transaction := gormDB.Begin() diff --git a/model/repositories_test.go b/model/repositories_test.go new file mode 100644 index 00000000..08b78699 --- /dev/null +++ b/model/repositories_test.go @@ -0,0 +1,155 @@ +package model_test + +import ( + "context" + weoscontext "github.com/wepala/weos/context" + "github.com/wepala/weos/controllers/rest" + "github.com/wepala/weos/model" + "os" + "testing" + "time" +) + +func TestEventRepository_ReplayEvents(t *testing.T) { + + ctx := context.Background() + + api, err := rest.New("../controllers/rest/fixtures/blog.yaml") + err = api.Initialize(context.TODO()) + if err != nil { + t.Fatal(err) + } + + entityType := "Blog" + + factories := api.GetEntityFactories() + newContext := context.WithValue(ctx, weoscontext.ENTITY_FACTORY, factories[entityType]) + + mockPayload1 := map[string]interface{}{"weos_id": "12345", "sequence_no": int64(1), "title": "Test Blog", "url": "testing.com"} + entity1 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "12345", + }, + SequenceNo: int64(0), + }, + Property: mockPayload1, + } + event1 := model.NewEntityEvent("create", entity1, "12345", mockPayload1) + entity1.NewChange(event1) + + mockPayload2 := map[string]interface{}{"weos_id": "123456", "sequence_no": int64(1), "title": "Test Blog1", "url": "testing1.com"} + entity2 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "123456", + }, + SequenceNo: int64(0), + }, + Property: mockPayload2, + } + event2 := model.NewEntityEvent("create", entity2, "123456", mockPayload2) + entity2.NewChange(event2) + + mockPayload3 := map[string]interface{}{"weos_id": "1234567", "sequence_no": int64(1), "title": "Test Blog2", "url": "testing2.com"} + entity3 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "1234567", + }, + SequenceNo: int64(0), + }, + Property: mockPayload3, + } + event3 := model.NewEntityEvent("create", entity3, "1234567", mockPayload3) + entity3.NewChange(event3) + + repo, err := api.GetEventStore("Default") + if err != nil { + t.Fatal(err) + } + + eventRepo := repo.(*model.EventRepositoryGorm) + projection, err := api.GetProjection("Default") + if err != nil { + t.Fatal(err) + } + + eventRepo.Persist(newContext, entity1) + eventRepo.Persist(newContext, entity2) + eventRepo.Persist(newContext, entity3) + + t.Run("replay events - drop tables", func(t *testing.T) { + err = eventRepo.DB.Migrator().DropTable("Blog") + if err != nil { + t.Fatal(err) + } + + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) + if err != nil { + t.Fatal(err) + } + + if total != 3 { + t.Fatalf("expected total events to be %d, got %d", 3, total) + } + + if successful != 3 { + t.Fatalf("expected successful events to be %d, got %d", 3, successful) + } + + if failed != 0 { + t.Fatalf("expected failed events to be %d, got %d", 0, failed) + } + }) + t.Run("replay events - existing data", func(t *testing.T) { + + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) + if err == nil { + t.Fatalf("expected there to be errors (unique constraint)") + } + + if total != 3 { + t.Fatalf("expected total events to be %d, got %d", 3, total) + } + + if successful != 0 { + t.Fatalf("expected successful events to be %d, got %d", 0, successful) + } + + if failed != 3 { + t.Fatalf("expected failed events to be %d, got %d", 3, failed) + } + }) + t.Run("replay events - remove rows", func(t *testing.T) { + output := map[string]interface{}{} + + searchResult := eventRepo.DB.Table("Blog").Where("weos_id = ?", "12345").Delete(&output) + if searchResult.Error != nil { + t.Fatal(searchResult.Error) + } + + searchResult = eventRepo.DB.Table("Blog").Where("weos_id = ?", "123456").Delete(&output) + if searchResult.Error != nil { + t.Fatal(searchResult.Error) + } + + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) + if err == nil { + t.Fatalf("expected there to be errors (unique constraint)") + } + + if total != 3 { + t.Fatalf("expected total events to be %d, got %d", 3, total) + } + + if successful != 2 { + t.Fatalf("expected successful events to be %d, got %d", 2, successful) + } + + if failed != 1 { + t.Fatalf("expected failed events to be %d, got %d", 1, failed) + } + }) + os.Remove("test.db") +} diff --git a/projections/gorm.go b/projections/gorm.go index 9058a598..4c622157 100644 --- a/projections/gorm.go +++ b/projections/gorm.go @@ -132,7 +132,7 @@ func (p *GORMDB) Migrate(ctx context.Context, builders map[string]ds.Builder) er } func (p *GORMDB) GetEventHandler() weos.EventHandler { - return func(ctx context.Context, event weos.Event) { + return func(ctx context.Context, event weos.Event) error { entityFactory := weos.GetEntityFactory(ctx) switch event.Type { case "create": @@ -141,12 +141,14 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { entity, err := entityFactory.NewEntity(ctx) if err != nil { p.logger.Errorf("error get a copy of the entity '%s'", err) + return err } eventPayload := entity.Property mapPayload := map[string]interface{}{} err = json.Unmarshal(event.Payload, &mapPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } mapPayload["sequence_no"] = event.Meta.SequenceNo @@ -154,10 +156,13 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { err = json.Unmarshal(bytes, &eventPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } + db := p.db.Table(entityFactory.Name()).Create(eventPayload) if db.Error != nil { p.logger.Errorf("error creating %s, got %s", entityFactory.Name(), db.Error) + return db.Error } } case "update": @@ -165,12 +170,14 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { entity, err := entityFactory.NewEntity(ctx) if err != nil { p.logger.Errorf("error creating entity '%s'", err) + return err } eventPayload := entity.Property mapPayload := map[string]interface{}{} err = json.Unmarshal(event.Payload, &mapPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } //set sequence number @@ -180,6 +187,7 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { err = json.Unmarshal(bytes, &eventPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } reader := ds.NewReader(eventPayload) @@ -192,6 +200,7 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { err = p.db.Model(eventPayload).Association(strings.Title(key)).Replace(field.Interface()) if err != nil { p.logger.Errorf("error clearing association %s for %s, got %s", strings.Title(key), entityFactory.Name(), err) + return err } } } @@ -200,6 +209,7 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { db := p.db.Table(entityFactory.Name()).Updates(eventPayload) if db.Error != nil { p.logger.Errorf("error creating %s, got %s", entityFactory.Name(), db.Error) + return db.Error } } case "delete": @@ -207,13 +217,17 @@ func (p *GORMDB) GetEventHandler() weos.EventHandler { entity, err := entityFactory.NewEntity(ctx) if err != nil { p.logger.Errorf("error creating entity '%s'", err) + return err } + db := p.db.Table(entityFactory.Name()).Where("weos_id = ?", event.Meta.EntityID).Delete(entity.Property) if db.Error != nil { p.logger.Errorf("error deleting %s, got %s", entityFactory.Name(), db.Error) + return db.Error } } } + return nil } } diff --git a/server.go b/server.go index e91f77a9..94e71f2c 100644 --- a/server.go +++ b/server.go @@ -8,6 +8,7 @@ import ( var port = flag.String("port", "8681", "-port=8681") var schema = flag.String("spec", "./api.yaml", "schema for initialization") +var replay = flag.Bool("replay events", false, "replay events from gorm events") func main() { flag.Parse() @@ -15,8 +16,8 @@ func main() { var apiEnv string apiEnv = os.Getenv("WEOS_SPEC") if apiEnv != "" { - weos.Start(*port, apiEnv) + weos.Start(*port, apiEnv, *replay) } else if *schema != "" { - weos.Start(*port, apiFlag) + weos.Start(*port, apiFlag, *replay) } }