From 0e6c636c856cf90b69d469b1308e35b74e39b91f Mon Sep 17 00:00:00 2001 From: aphilbert Date: Fri, 4 Feb 2022 03:14:51 -0400 Subject: [PATCH 01/18] feature: WEOS-1327 --- .../hydrate-datastore-from-events.feature | 268 ++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 features/hydrate-datastore-from-events.feature diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature new file mode 100644 index 00000000..f968cf6c --- /dev/null +++ b/features/hydrate-datastore-from-events.feature @@ -0,0 +1,268 @@ + +Feature: Hydrate database using events + + The events generated in the API could be used to re-create tables in the base data store or to create new datastores. + The events could be used to do repairs as well (if the handlers on the projection are done in an idempotent way) + + Background: + + Given a developer "Sojourner" + And "Sojourner" has an account with id "1234" + And "Open API 3.0" is used to model the service + And the specification is + """ + openapi: 3.0.3 + info: + title: Blog Aggregator Rest API + version: 0.1.0 + description: REST API for interacting with the Blog Aggregator + servers: + - url: https://prod1.weos.sh/blog/dev + description: WeOS Dev + - url: https://prod1.weos.sh/blog/v1 + x-weos-config: + logger: + level: warn + report-caller: true + formatter: json + database: + driver: sqlite3 + database: e2e.db + event-source: + - title: default + driver: service + endpoint: https://prod1.weos.sh/events/v1 + - title: event + driver: sqlite3 + database: e2e.db + databases: + - title: default + driver: sqlite3 + database: e2e.db + rest: + middleware: + - RequestID + - Recover + - ZapLogger + components: + schemas: + Blog: + type: object + properties: + id: + type: string + title: + type: string + description: blog title + description: + type: string + required: + - title + x-identifier: + - id + Post: + type: object + properties: + title: + type: string + description: + type: string + blog: + $ref: "#/components/schemas/Blog" + publishedDate: + type: string + format: date-time + views: + type: integer + categories: + type: array + items: + $ref: "#/components/schemas/Post" + required: + - title + Category: + type: object + properties: + title: + type: string + description: + type: string + required: + - title + paths: + /: + get: + operationId: Homepage + responses: + 200: + description: Application Homepage + /blog: + get: + operationId: Get Blogs + summary: Get List of Blogs + parameters: + - in: query + name: page + schema: + type: integer + - in: query + name: limit + schema: + type: integer + - in: query + name: filters + schema: + type: array + items: + type: object + properties: + field: + type: string + operator: + type: string + value: + type: array + items: + type: string + + required: false + description: query string + responses: + 200: + description: List of blogs + content: + application/json: + schema: + type: object + properties: + total: + type: integer + page: + type: integer + items: + type: array + items: + $ref: "#/components/schemas/Blog" + post: + operationId: Add Blog + requestBody: + description: Blog info that is submitted + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + application/x-www-form-urlencoded: + schema: + $ref: "#/components/schemas/Blog" + application/xml: + schema: + $ref: "#/components/schemas/Blog" + responses: + 201: + description: Add Blog to Aggregator + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + 400: + description: Invalid blog submitted + /blogs/{id}: + get: + parameters: + - in: path + name: id + schema: + type: string + required: true + description: blog id + - in: query + name: sequence_no + schema: + type: string + summary: Get Blog by id + operationId: Get Blog + responses: + 200: + description: Blog details without any supporting collections + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + put: + parameters: + - in: path + name: id + schema: + type: string + required: true + description: blog id + summary: Update blog details + operationId: Update Blog + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + responses: + 200: + description: Update Blog + content: + application/json: + schema: + $ref: "#/components/schemas/Blog" + delete: + parameters: + - in: path + name: id + schema: + type: string + required: true + description: blog id + summary: Delete blog + operationId: Delete Blog + responses: + 200: + description: Blog Deleted + """ + And the service is running + And blogs in the api + | id | weos_id | sequence_no | title | description | + | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | + | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | + | 164 | 24KjFbp82wGq4qb5LAxLdA5GbR2 | 1 | Blog 6 | Some Blog 6 | + | 3 | 24KjHaQbjEv0ZxfKxFup1dI6iKP | 4 | Blog 3 | Some Blog 3 | + | 4 | 24KjIq8KJhIhWa7d8sNJhRilGpA | 1 | Blog 4 | Some Blog 4 | + | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | + | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | + | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | + + @WEOS-1327 + Scenario: Hydrate tables based on events + + A developer should be able to configure an event repository to replay all it's events on startup. This should trigger + the associated projections + + Given Sojourner" deletes the "Blogs" table + When "Sojourner" calls the replay method on the event repository + Then the "Blogs" table should be populated with + | id | weos_id | sequence_no | title | description | + | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | + | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | + | 164 | 24KjFbp82wGq4qb5LAxLdA5GbR2 | 1 | Blog 6 | Some Blog 6 | + | 3 | 24KjHaQbjEv0ZxfKxFup1dI6iKP | 4 | Blog 3 | Some Blog 3 | + | 4 | 24KjIq8KJhIhWa7d8sNJhRilGpA | 1 | Blog 4 | Some Blog 4 | + | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | + | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | + | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | + + @WEOS-1327 + Scenario: Repair data tables after some was deleted + + @WEOS-1327 + Scenario: Repair tables after some content has been deleted + + Scenario: Repair specific schemas + + Scenario: Set database to a specific state on a given date \ No newline at end of file From 6ec2826e3b7aa13f2f4827a35cf7336eabc25f39 Mon Sep 17 00:00:00 2001 From: aphilbert Date: Fri, 4 Feb 2022 20:29:45 -0400 Subject: [PATCH 02/18] feature: WEOS-1327 added scenario for deleting some data --- .../hydrate-datastore-from-events.feature | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature index f968cf6c..b9b94db5 100644 --- a/features/hydrate-datastore-from-events.feature +++ b/features/hydrate-datastore-from-events.feature @@ -256,12 +256,33 @@ Feature: Hydrate database using events | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | + And the total no. events and processed and failures should be returned @WEOS-1327 Scenario: Repair data tables after some was deleted @WEOS-1327 Scenario: Repair tables after some content has been deleted + Given a "Blog" with id "1237" was deleted + And a "Blog" with id "164" was deleted + When "Sojourner" calls the replay method on the event repository + Then the "Blogs" table should be populated with + | id | weos_id | sequence_no | title | description | + | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | + | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | + | 164 | 24KjFbp82wGq4qb5LAxLdA5GbR2 | 1 | Blog 6 | Some Blog 6 | + | 3 | 24KjHaQbjEv0ZxfKxFup1dI6iKP | 4 | Blog 3 | Some Blog 3 | + | 4 | 24KjIq8KJhIhWa7d8sNJhRilGpA | 1 | Blog 4 | Some Blog 4 | + | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | + | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | + | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | + And the total no. events and processed and failures should be returned + + Scenario: Continue loading events if error occurs + + The event that failed should be logged out WITHOUT the payload + + Scenario: Repair specific schemas From 980306f0fe2d120426791e44cbec828943f81d47 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Mon, 7 Feb 2022 11:49:12 -0400 Subject: [PATCH 03/18] feature:WEOS-1327 - Merged dev - Generated bdd steps --- controllers/rest/controller_standard.go | 37 +++++++++++++++++--- controllers/rest/controller_standard_test.go | 1 + end2end_test.go | 29 +++++++++++++-- model/content_entity.go | 13 ++++--- 4 files changed, 68 insertions(+), 12 deletions(-) diff --git a/controllers/rest/controller_standard.go b/controllers/rest/controller_standard.go index db8a4e12..446e142f 100644 --- a/controllers/rest/controller_standard.go +++ b/controllers/rest/controller_standard.go @@ -166,8 +166,34 @@ func UpdateMiddleware(api *RESTAPI, projection projections.Projection, commandDi } var weosID string var sequenceNo string - //reads the request body - payload, _ := ioutil.ReadAll(ctxt.Request().Body) + var payload []byte + var err error + + ct := ctxt.Request().Header.Get("Content-Type") + + switch ct { + case "application/json": + payload, err = ioutil.ReadAll(ctxt.Request().Body) + if err != nil { + return err + } + case "application/x-www-form-urlencoded": + payload, err = ConvertFormToJson(ctxt.Request(), "application/x-www-form-urlencoded") + if err != nil { + return err + } + default: + if strings.Contains(ct, "multipart/form-data") { + payload, err = ConvertFormToJson(ctxt.Request(), "multipart/form-data") + if err != nil { + return err + } + } else if ct == "" { + return NewControllerError("expected a content-type to be explicitly defined", err, http.StatusBadRequest) + } else { + return NewControllerError("the content-type provided is not supported", err, http.StatusBadRequest) + } + } //getting etag from context etagInterface := newContext.Value("If-Match") if etagInterface != nil { @@ -184,7 +210,7 @@ func UpdateMiddleware(api *RESTAPI, projection projections.Projection, commandDi } } - err := commandDispatcher.Dispatch(newContext, model.Update(newContext, payload, entityFactory.Name()), eventSource, projection, api.EchoInstance().Logger) + err = commandDispatcher.Dispatch(newContext, model.Update(newContext, payload, entityFactory.Name()), eventSource, projection, api.EchoInstance().Logger) if err != nil { api.e.Logger.Errorf("error persisting entity '%s'", err) if errr, ok := err.(*model.DomainError); ok { @@ -340,8 +366,9 @@ func ViewMiddleware(api *RESTAPI, projection projections.Projection, commandDisp useEntity, _ := newContext.Value("use_entity_id").(bool) seqInt, ok = newContext.Value("sequence_no").(int) if !ok { - seq = newContext.Value("sequence_no").(string) - ctxt.Logger().Debugf("invalid sequence no ") + if seq, ok = newContext.Value("sequence_no").(string); ok { + ctxt.Logger().Debugf("invalid sequence no") + } } //if use_entity_id is not set then let's get the item by key diff --git a/controllers/rest/controller_standard_test.go b/controllers/rest/controller_standard_test.go index 6f9918b7..9d862401 100644 --- a/controllers/rest/controller_standard_test.go +++ b/controllers/rest/controller_standard_test.go @@ -487,6 +487,7 @@ func TestStandardControllers_Update(t *testing.T) { resp := httptest.NewRecorder() req := httptest.NewRequest(http.MethodPut, "/blogs/"+weosId, body) req.Header.Set(weoscontext.HeaderXAccountID, accountID) + req.Header.Set("Content-Type", "application/json") req.Header.Set("If-Match", weosId+".1") mw := rest.Context(restAPI, projection, dispatcher, eventRepository, entityFactory, path, path.Put) updateMw := rest.UpdateMiddleware(restAPI, projection, dispatcher, eventRepository, entityFactory, path, path.Put) diff --git a/end2end_test.go b/end2end_test.go index ea80e576..853fbe78 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1007,6 +1007,26 @@ func theShouldBeDeleted(contentEntity string, id int) error { return nil } +func aWithIdWasDeleted(arg1, arg2 string) error { + return godog.ErrPending +} + +func callsTheReplayMethodOnTheEventRepository(arg1 string) error { + return godog.ErrPending +} + +func sojournerDeletesTheTable(arg1 string) error { + return godog.ErrPending +} + +func theTableShouldBePopulatedWith(arg1 string, arg2 *godog.Table) error { + return godog.ErrPending +} + +func theTotalNoEventsAndProcessedAndFailuresShouldBeReturned() error { + return godog.ErrPending +} + func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Before(reset) //add context steps @@ -1063,6 +1083,11 @@ func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Step(`^"([^"]*)" is on the "([^"]*)" delete screen with entity id "([^"]*)" for blog with id "([^"]*)"$`, isOnTheDeleteScreenWithEntityIdForBlogWithId) ctx.Step(`^"([^"]*)" is on the "([^"]*)" delete screen with id "([^"]*)"$`, isOnTheDeleteScreenWithId) ctx.Step(`^the "([^"]*)" "(\d+)" should be deleted$`, theShouldBeDeleted) + ctx.Step(`^a "([^"]*)" with id "([^"]*)" was deleted$`, aWithIdWasDeleted) + ctx.Step(`^"([^"]*)" calls the replay method on the event repository$`, callsTheReplayMethodOnTheEventRepository) + ctx.Step(`^Sojourner" deletes the "([^"]*)" table$`, sojournerDeletesTheTable) + ctx.Step(`^the "([^"]*)" table should be populated with$`, theTableShouldBePopulatedWith) + ctx.Step(`^the total no\. events and processed and failures should be returned$`, theTotalNoEventsAndProcessedAndFailuresShouldBeReturned) } func TestBDD(t *testing.T) { @@ -1072,8 +1097,8 @@ func TestBDD(t *testing.T) { TestSuiteInitializer: InitializeSuite, Options: &godog.Options{ Format: "pretty", - Tags: "~skipped && ~long", - //Tags: "WEOS-1131", + //Tags: "~skipped && ~long", + Tags: "WEOS-1327", //Tags: "WEOS-1110 && ~skipped", }, }.Run() diff --git a/model/content_entity.go b/model/content_entity.go index d620cd23..f6894d13 100644 --- a/model/content_entity.go +++ b/model/content_entity.go @@ -345,13 +345,16 @@ func (w *ContentEntity) ApplyEvents(changes []*Event) error { func (w *ContentEntity) ToMap() map[string]interface{} { result := make(map[string]interface{}) //get all fields and return the map - fields := w.reader.GetAllFields() - for _, field := range fields { - //check if the lowercase version of the field is the same as the schema and use the scehma version instead - if originialFieldName := w.GetOriginalFieldName(field.Name()); originialFieldName != "" { - result[w.GetOriginalFieldName(field.Name())] = field.Interface() + if w.reader != nil { + fields := w.reader.GetAllFields() + for _, field := range fields { + //check if the lowercase version of the field is the same as the schema and use the scehma version instead + if originialFieldName := w.GetOriginalFieldName(field.Name()); originialFieldName != "" { + result[w.GetOriginalFieldName(field.Name())] = field.Interface() + } } } + return result } From 682b73ab35a86d7a85700cfb591400364f9275d1 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Mon, 7 Feb 2022 12:57:02 -0400 Subject: [PATCH 04/18] feature:WEOS-1327 - Working on the ReplayEvents --- model/repositories.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/model/repositories.go b/model/repositories.go index 47a2d7be..d75f03c4 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -7,6 +7,7 @@ import ( "golang.org/x/net/context" "gorm.io/datatypes" "gorm.io/gorm" + "time" ) type EventRepositoryGorm struct { @@ -286,6 +287,22 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { return nil } +func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time) error { + var events []GormEvent + + if date.IsZero() { + result := e.DB.Table("gorm_events").Find(&events) + if result.Error != nil { + return result.Error + } + + } else { + //TODO use the date to query the database + } + + return nil +} + func NewBasicEventRepository(gormDB *gorm.DB, logger Log, useUnitOfWork bool, accountID string, applicationID string) (EventRepository, error) { if useUnitOfWork { transaction := gormDB.Begin() From d4c8568d09e412252260d52b070b7adddb73cebe Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Tue, 8 Feb 2022 00:26:28 -0400 Subject: [PATCH 05/18] feature:WEOS-1327 - Working on the ReplayEvents --- model/interfaces.go | 2 ++ model/repositories.go | 31 ++++++++++++++++++++++++++++++- server.go | 18 ++++++++++++++++-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/model/interfaces.go b/model/interfaces.go index 36671f23..cda8d04f 100644 --- a/model/interfaces.go +++ b/model/interfaces.go @@ -4,6 +4,7 @@ package model import ( ds "github.com/ompluscator/dynamic-struct" "golang.org/x/net/context" + "time" ) type CommandDispatcher interface { @@ -68,6 +69,7 @@ type EventRepository interface { GetByAggregateAndSequenceRange(ID string, start int64, end int64) ([]*Event, error) AddSubscriber(handler EventHandler) GetSubscribers() ([]EventHandler, error) + ReplayEvents(ctxt context.Context, date time.Time) error } type Datastore interface { diff --git a/model/repositories.go b/model/repositories.go index d75f03c4..01b97995 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -287,17 +287,46 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { return nil } +//Content may not be applicable to this func since there would be an instance of it being called at server.go run. Therefore we won't have a "proper" content which would contain the EntityFactory func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time) error { var events []GormEvent if date.IsZero() { result := e.DB.Table("gorm_events").Find(&events) if result.Error != nil { + e.logger.Errorf("got error pulling events '%s'", result.Error) return result.Error } + var tEvents []*Event + + for _, event := range events { + tEvents = append(tEvents, &Event{ + ID: event.ID, + Type: event.Type, + Payload: json.RawMessage(event.Payload), + Meta: EventMeta{ + EntityID: event.EntityID, + EntityType: event.EntityType, + RootID: event.RootID, + Module: event.ApplicationID, + User: event.User, + SequenceNo: event.SequenceNo, + }, + Version: 0, + }) + } + + for _, event := range tEvents { + e.eventDispatcher.Dispatch(ctxt, *event) + } + } else { - //TODO use the date to query the database + result := e.DB.Table("gorm_events").Where("created_at = ?", date).Find(&events) + if result.Error != nil { + e.logger.Errorf("got error pulling events '%s'", result.Error) + return result.Error + } } return nil diff --git a/server.go b/server.go index e91f77a9..2488f128 100644 --- a/server.go +++ b/server.go @@ -3,20 +3,34 @@ package main import ( "flag" weos "github.com/wepala/weos/controllers/rest" + "golang.org/x/net/context" "os" + "time" ) var port = flag.String("port", "8681", "-port=8681") var schema = flag.String("spec", "./api.yaml", "schema for initialization") +var replay = flag.Bool("replay events", true, "replay events from gorm events") + +//Pass flag for replay events func main() { flag.Parse() apiFlag := *schema var apiEnv string + var restAPI *weos.RESTAPI apiEnv = os.Getenv("WEOS_SPEC") if apiEnv != "" { - weos.Start(*port, apiEnv) + restAPI = weos.Start(*port, apiEnv) } else if *schema != "" { - weos.Start(*port, apiFlag) + restAPI = weos.Start(*port, apiFlag) } + + if *replay == true { + e, _ := restAPI.GetEventStore("default") + + //Entity factory will be needed in this context. + e.ReplayEvents(context.Background(), time.Time{}) + } + } From d3af9edc9b9b74cce76c184741e03f8fba551bf0 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Tue, 8 Feb 2022 00:27:48 -0400 Subject: [PATCH 06/18] feature:WEOS-1327 - Updated Replay events func --- model/repositories.go | 47 +++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/model/repositories.go b/model/repositories.go index 01b97995..b2172ff7 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -297,30 +297,6 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time) e.logger.Errorf("got error pulling events '%s'", result.Error) return result.Error } - - var tEvents []*Event - - for _, event := range events { - tEvents = append(tEvents, &Event{ - ID: event.ID, - Type: event.Type, - Payload: json.RawMessage(event.Payload), - Meta: EventMeta{ - EntityID: event.EntityID, - EntityType: event.EntityType, - RootID: event.RootID, - Module: event.ApplicationID, - User: event.User, - SequenceNo: event.SequenceNo, - }, - Version: 0, - }) - } - - for _, event := range tEvents { - e.eventDispatcher.Dispatch(ctxt, *event) - } - } else { result := e.DB.Table("gorm_events").Where("created_at = ?", date).Find(&events) if result.Error != nil { @@ -329,6 +305,29 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time) } } + var tEvents []*Event + + for _, event := range events { + tEvents = append(tEvents, &Event{ + ID: event.ID, + Type: event.Type, + Payload: json.RawMessage(event.Payload), + Meta: EventMeta{ + EntityID: event.EntityID, + EntityType: event.EntityType, + RootID: event.RootID, + Module: event.ApplicationID, + User: event.User, + SequenceNo: event.SequenceNo, + }, + Version: 0, + }) + } + + for _, event := range tEvents { + e.eventDispatcher.Dispatch(ctxt, *event) + } + return nil } From b2e480ce7f8a1133f19c2e0cba42f118de532fb4 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Tue, 8 Feb 2022 23:12:18 -0400 Subject: [PATCH 07/18] feature:WEOS-1327 - Updated Mocks test - Added a Builder func to entity factory - Updated server.go - Updated repository - Added to the replay events func - Added a check to migrate tables that do not exist in gorm (not ideal) - Other changes --- controllers/rest/api.go | 14 ++ controllers/rest/operation_initializers.go | 6 + controllers/rest/weos_mocks_test.go | 249 +++++++++++++++++++++ model/entity_factory.go | 5 + model/event.go | 1 + model/interfaces.go | 4 +- model/mocks_test.go | 249 +++++++++++++++++++++ model/repositories.go | 54 ++++- model/repositories_test.go | 175 +++++++++++++++ projections/gorm.go | 35 +++ server.go | 11 +- 11 files changed, 789 insertions(+), 14 deletions(-) create mode 100644 model/repositories_test.go diff --git a/controllers/rest/api.go b/controllers/rest/api.go index 0403fc6c..f14a11d2 100644 --- a/controllers/rest/api.go +++ b/controllers/rest/api.go @@ -51,6 +51,7 @@ type RESTAPI struct { registeredPrePathInitializers map[reflect.Value]int postPathInitializers []PathInitializer registeredPostPathInitializers map[reflect.Value]int + entityFactories map[string]model.EntityFactory } type schema struct { @@ -172,6 +173,14 @@ func (p *RESTAPI) RegisterProjection(name string, projection projections.Project p.projections[name] = projection } +//RegisterEntityFactory Adds entity factory so that it can be referenced in the OpenAPI spec +func (p *RESTAPI) RegisterEntityFactory(name string, factory model.EntityFactory) { + if p.entityFactories == nil { + p.entityFactories = make(map[string]model.EntityFactory) + } + p.entityFactories[name] = factory +} + //GetMiddleware get middleware by name func (p *RESTAPI) GetMiddleware(name string) (Middleware, error) { if tmiddleware, ok := p.middlewares[name]; ok { @@ -253,6 +262,11 @@ func (p *RESTAPI) GetSchemas() (map[string]interface{}, error) { return schemes, nil } +//GetEntityFactories get event factories +func (p *RESTAPI) GetEntityFactories() map[string]model.EntityFactory { + return p.entityFactories +} + //Initialize and setup configurations for RESTAPI func (p *RESTAPI) Initialize(ctxt context.Context) error { //register standard controllers diff --git a/controllers/rest/operation_initializers.go b/controllers/rest/operation_initializers.go index 3fb3994d..cc1d8dd7 100644 --- a/controllers/rest/operation_initializers.go +++ b/controllers/rest/operation_initializers.go @@ -41,6 +41,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } @@ -55,6 +56,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } break @@ -65,6 +67,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } @@ -79,6 +82,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } @@ -88,6 +92,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } else { @@ -104,6 +109,7 @@ func EntityFactoryInitializer(ctxt context.Context, api *RESTAPI, path string, m if builder, ok := schemas[contentType]; ok { entityFactory := new(model.DefaultEntityFactory).FromSchemaAndBuilder(contentType, swagger.Components.Schemas[contentType].Value, builder) newContext := context.WithValue(ctxt, weoscontext.ENTITY_FACTORY, entityFactory) + api.RegisterEntityFactory(entityFactory.Name(), entityFactory) return newContext, nil } } diff --git a/controllers/rest/weos_mocks_test.go b/controllers/rest/weos_mocks_test.go index 68f3de34..80edbb38 100644 --- a/controllers/rest/weos_mocks_test.go +++ b/controllers/rest/weos_mocks_test.go @@ -12,6 +12,7 @@ import ( "gorm.io/gorm" "net/http" "sync" + "time" ) // Ensure, that EventRepositoryMock does implement model.EventRepository. @@ -54,6 +55,9 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { +// panic("mock out the ReplayEvents method") +// }, // } // // // use mockedEventRepository in code that requires model.EventRepository @@ -91,6 +95,9 @@ type EventRepositoryMock struct { // PersistFunc mocks the Persist method. PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error + // ReplayEventsFunc mocks the ReplayEvents method. + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) + // calls tracks calls to the methods. calls struct { // AddSubscriber holds details about calls to the AddSubscriber method. @@ -151,6 +158,15 @@ type EventRepositoryMock struct { // Entity is the entity argument value. Entity model.AggregateInterface } + // ReplayEvents holds details about calls to the ReplayEvents method. + ReplayEvents []struct { + // Ctxt is the ctxt argument value. + Ctxt context.Context + // Date is the date argument value. + Date time.Time + // EntityFactories is the entityFactories argument value. + EntityFactories map[string]model.EntityFactory + } } lockAddSubscriber sync.RWMutex lockFlush sync.RWMutex @@ -162,6 +178,7 @@ type EventRepositoryMock struct { lockGetSubscribers sync.RWMutex lockMigrate sync.RWMutex lockPersist sync.RWMutex + lockReplayEvents sync.RWMutex } // AddSubscriber calls AddSubscriberFunc. @@ -488,6 +505,45 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { return calls } +// ReplayEvents calls ReplayEventsFunc. +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { + if mock.ReplayEventsFunc == nil { + panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") + } + callInfo := struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + }{ + Ctxt: ctxt, + Date: date, + EntityFactories: entityFactories, + } + mock.lockReplayEvents.Lock() + mock.calls.ReplayEvents = append(mock.calls.ReplayEvents, callInfo) + mock.lockReplayEvents.Unlock() + return mock.ReplayEventsFunc(ctxt, date, entityFactories) +} + +// ReplayEventsCalls gets all the calls that were made to ReplayEvents. +// Check the length with: +// len(mockedEventRepository.ReplayEventsCalls()) +func (mock *EventRepositoryMock) ReplayEventsCalls() []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory +} { + var calls []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + } + mock.lockReplayEvents.RLock() + calls = mock.calls.ReplayEvents + mock.lockReplayEvents.RUnlock() + return calls +} + // Ensure, that ProjectionMock does implement model.Projection. // If this is not the case, regenerate this file with moq. var _ model.Projection = &ProjectionMock{} @@ -2071,6 +2127,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // // make and configure a mocked model.EntityFactory // mockedEntityFactory := &EntityFactoryMock{ +// BuilderFunc: func(ctx context.Context) ds.Builder { +// panic("mock out the Builder method") +// }, // DynamicStructFunc: func(ctx context.Context) ds.DynamicStruct { // panic("mock out the DynamicStruct method") // }, @@ -2096,6 +2155,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // } type EntityFactoryMock struct { + // BuilderFunc mocks the Builder method. + BuilderFunc func(ctx context.Context) ds.Builder + // DynamicStructFunc mocks the DynamicStruct method. DynamicStructFunc func(ctx context.Context) ds.DynamicStruct @@ -2116,6 +2178,11 @@ type EntityFactoryMock struct { // calls tracks calls to the methods. calls struct { + // Builder holds details about calls to the Builder method. + Builder []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // DynamicStruct holds details about calls to the DynamicStruct method. DynamicStruct []struct { // Ctx is the ctx argument value. @@ -2145,6 +2212,7 @@ type EntityFactoryMock struct { TableName []struct { } } + lockBuilder sync.RWMutex lockDynamicStruct sync.RWMutex lockFromSchemaAndBuilder sync.RWMutex lockName sync.RWMutex @@ -2153,6 +2221,37 @@ type EntityFactoryMock struct { lockTableName sync.RWMutex } +// Builder calls BuilderFunc. +func (mock *EntityFactoryMock) Builder(ctx context.Context) ds.Builder { + if mock.BuilderFunc == nil { + panic("EntityFactoryMock.BuilderFunc: method is nil but EntityFactory.Builder was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockBuilder.Lock() + mock.calls.Builder = append(mock.calls.Builder, callInfo) + mock.lockBuilder.Unlock() + return mock.BuilderFunc(ctx) +} + +// BuilderCalls gets all the calls that were made to Builder. +// Check the length with: +// len(mockedEntityFactory.BuilderCalls()) +func (mock *EntityFactoryMock) BuilderCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockBuilder.RLock() + calls = mock.calls.Builder + mock.lockBuilder.RUnlock() + return calls +} + // DynamicStruct calls DynamicStructFunc. func (mock *EntityFactoryMock) DynamicStruct(ctx context.Context) ds.DynamicStruct { if mock.DynamicStructFunc == nil { @@ -2331,3 +2430,153 @@ func (mock *EntityFactoryMock) TableNameCalls() []struct { mock.lockTableName.RUnlock() return calls } + +// Ensure, that EventDispatcherMock does implement model.EventDispatcher. +// If this is not the case, regenerate this file with moq. +var _ model.EventDispatcher = &EventDispatcherMock{} + +// EventDispatcherMock is a mock implementation of model.EventDispatcher. +// +// func TestSomethingThatUsesEventDispatcher(t *testing.T) { +// +// // make and configure a mocked model.EventDispatcher +// mockedEventDispatcher := &EventDispatcherMock{ +// AddSubscriberFunc: func(handler model.EventHandler) { +// panic("mock out the AddSubscriber method") +// }, +// DispatchFunc: func(ctx context.Context, event model.Event) { +// panic("mock out the Dispatch method") +// }, +// GetSubscribersFunc: func() []model.EventHandler { +// panic("mock out the GetSubscribers method") +// }, +// } +// +// // use mockedEventDispatcher in code that requires model.EventDispatcher +// // and then make assertions. +// +// } +type EventDispatcherMock struct { + // AddSubscriberFunc mocks the AddSubscriber method. + AddSubscriberFunc func(handler model.EventHandler) + + // DispatchFunc mocks the Dispatch method. + DispatchFunc func(ctx context.Context, event model.Event) + + // GetSubscribersFunc mocks the GetSubscribers method. + GetSubscribersFunc func() []model.EventHandler + + // calls tracks calls to the methods. + calls struct { + // AddSubscriber holds details about calls to the AddSubscriber method. + AddSubscriber []struct { + // Handler is the handler argument value. + Handler model.EventHandler + } + // Dispatch holds details about calls to the Dispatch method. + Dispatch []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Event is the event argument value. + Event model.Event + } + // GetSubscribers holds details about calls to the GetSubscribers method. + GetSubscribers []struct { + } + } + lockAddSubscriber sync.RWMutex + lockDispatch sync.RWMutex + lockGetSubscribers sync.RWMutex +} + +// AddSubscriber calls AddSubscriberFunc. +func (mock *EventDispatcherMock) AddSubscriber(handler model.EventHandler) { + if mock.AddSubscriberFunc == nil { + panic("EventDispatcherMock.AddSubscriberFunc: method is nil but EventDispatcher.AddSubscriber was just called") + } + callInfo := struct { + Handler model.EventHandler + }{ + Handler: handler, + } + mock.lockAddSubscriber.Lock() + mock.calls.AddSubscriber = append(mock.calls.AddSubscriber, callInfo) + mock.lockAddSubscriber.Unlock() + mock.AddSubscriberFunc(handler) +} + +// AddSubscriberCalls gets all the calls that were made to AddSubscriber. +// Check the length with: +// len(mockedEventDispatcher.AddSubscriberCalls()) +func (mock *EventDispatcherMock) AddSubscriberCalls() []struct { + Handler model.EventHandler +} { + var calls []struct { + Handler model.EventHandler + } + mock.lockAddSubscriber.RLock() + calls = mock.calls.AddSubscriber + mock.lockAddSubscriber.RUnlock() + return calls +} + +// Dispatch calls DispatchFunc. +func (mock *EventDispatcherMock) Dispatch(ctx context.Context, event model.Event) { + if mock.DispatchFunc == nil { + panic("EventDispatcherMock.DispatchFunc: method is nil but EventDispatcher.Dispatch was just called") + } + callInfo := struct { + Ctx context.Context + Event model.Event + }{ + Ctx: ctx, + Event: event, + } + mock.lockDispatch.Lock() + mock.calls.Dispatch = append(mock.calls.Dispatch, callInfo) + mock.lockDispatch.Unlock() + mock.DispatchFunc(ctx, event) +} + +// DispatchCalls gets all the calls that were made to Dispatch. +// Check the length with: +// len(mockedEventDispatcher.DispatchCalls()) +func (mock *EventDispatcherMock) DispatchCalls() []struct { + Ctx context.Context + Event model.Event +} { + var calls []struct { + Ctx context.Context + Event model.Event + } + mock.lockDispatch.RLock() + calls = mock.calls.Dispatch + mock.lockDispatch.RUnlock() + return calls +} + +// GetSubscribers calls GetSubscribersFunc. +func (mock *EventDispatcherMock) GetSubscribers() []model.EventHandler { + if mock.GetSubscribersFunc == nil { + panic("EventDispatcherMock.GetSubscribersFunc: method is nil but EventDispatcher.GetSubscribers was just called") + } + callInfo := struct { + }{} + mock.lockGetSubscribers.Lock() + mock.calls.GetSubscribers = append(mock.calls.GetSubscribers, callInfo) + mock.lockGetSubscribers.Unlock() + return mock.GetSubscribersFunc() +} + +// GetSubscribersCalls gets all the calls that were made to GetSubscribers. +// Check the length with: +// len(mockedEventDispatcher.GetSubscribersCalls()) +func (mock *EventDispatcherMock) GetSubscribersCalls() []struct { +} { + var calls []struct { + } + mock.lockGetSubscribers.RLock() + calls = mock.calls.GetSubscribers + mock.lockGetSubscribers.RUnlock() + return calls +} diff --git a/model/entity_factory.go b/model/entity_factory.go index f4db839d..d2246968 100644 --- a/model/entity_factory.go +++ b/model/entity_factory.go @@ -15,6 +15,7 @@ type EntityFactory interface { Name() string TableName() string Schema() *openapi3.Schema + Builder(ctx context.Context) ds.Builder } type DefaultEntityFactory struct { @@ -50,6 +51,10 @@ func (d *DefaultEntityFactory) DynamicStruct(ctx context.Context) ds.DynamicStru return d.builder.Build() } +func (d *DefaultEntityFactory) Builder(ctx context.Context) ds.Builder { + return d.builder +} + //GetEntityFactory get entity factory from context func GetEntityFactory(ctx context.Context) EntityFactory { if value, ok := ctx.Value(weosContext.ENTITY_FACTORY).(EntityFactory); ok { diff --git a/model/event.go b/model/event.go index 49a47e3c..3b6d5bc5 100644 --- a/model/event.go +++ b/model/event.go @@ -101,6 +101,7 @@ type EventMeta struct { RootID string `json:"root_id"` Group string `json:"group"` Created string `json:"created"` + SchemaName string `json:"schema_name"` } func (e *Event) IsValid() bool { diff --git a/model/interfaces.go b/model/interfaces.go index cda8d04f..5d8750b4 100644 --- a/model/interfaces.go +++ b/model/interfaces.go @@ -1,6 +1,6 @@ package model -//go:generate moq -out temp_mocks_test.go -pkg model_test . Projection +//go:generate moq -out temp_mocks_test.go -pkg model_test . EventRepository import ( ds "github.com/ompluscator/dynamic-struct" "golang.org/x/net/context" @@ -69,7 +69,7 @@ type EventRepository interface { GetByAggregateAndSequenceRange(ID string, start int64, end int64) ([]*Event, error) AddSubscriber(handler EventHandler) GetSubscribers() ([]EventHandler, error) - ReplayEvents(ctxt context.Context, date time.Time) error + ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory) (int, int, int, error) } type Datastore interface { diff --git a/model/mocks_test.go b/model/mocks_test.go index debf1e3d..80722ccc 100644 --- a/model/mocks_test.go +++ b/model/mocks_test.go @@ -12,6 +12,7 @@ import ( "gorm.io/gorm" "net/http" "sync" + "time" ) // Ensure, that EventRepositoryMock does implement model.EventRepository. @@ -54,6 +55,9 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { +// panic("mock out the ReplayEvents method") +// }, // } // // // use mockedEventRepository in code that requires model.EventRepository @@ -91,6 +95,9 @@ type EventRepositoryMock struct { // PersistFunc mocks the Persist method. PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error + // ReplayEventsFunc mocks the ReplayEvents method. + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) + // calls tracks calls to the methods. calls struct { // AddSubscriber holds details about calls to the AddSubscriber method. @@ -151,6 +158,15 @@ type EventRepositoryMock struct { // Entity is the entity argument value. Entity model.AggregateInterface } + // ReplayEvents holds details about calls to the ReplayEvents method. + ReplayEvents []struct { + // Ctxt is the ctxt argument value. + Ctxt context.Context + // Date is the date argument value. + Date time.Time + // EntityFactories is the entityFactories argument value. + EntityFactories map[string]model.EntityFactory + } } lockAddSubscriber sync.RWMutex lockFlush sync.RWMutex @@ -162,6 +178,7 @@ type EventRepositoryMock struct { lockGetSubscribers sync.RWMutex lockMigrate sync.RWMutex lockPersist sync.RWMutex + lockReplayEvents sync.RWMutex } // AddSubscriber calls AddSubscriberFunc. @@ -488,6 +505,45 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { return calls } +// ReplayEvents calls ReplayEventsFunc. +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { + if mock.ReplayEventsFunc == nil { + panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") + } + callInfo := struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + }{ + Ctxt: ctxt, + Date: date, + EntityFactories: entityFactories, + } + mock.lockReplayEvents.Lock() + mock.calls.ReplayEvents = append(mock.calls.ReplayEvents, callInfo) + mock.lockReplayEvents.Unlock() + return mock.ReplayEventsFunc(ctxt, date, entityFactories) +} + +// ReplayEventsCalls gets all the calls that were made to ReplayEvents. +// Check the length with: +// len(mockedEventRepository.ReplayEventsCalls()) +func (mock *EventRepositoryMock) ReplayEventsCalls() []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory +} { + var calls []struct { + Ctxt context.Context + Date time.Time + EntityFactories map[string]model.EntityFactory + } + mock.lockReplayEvents.RLock() + calls = mock.calls.ReplayEvents + mock.lockReplayEvents.RUnlock() + return calls +} + // Ensure, that ProjectionMock does implement model.Projection. // If this is not the case, regenerate this file with moq. var _ model.Projection = &ProjectionMock{} @@ -2071,6 +2127,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // // make and configure a mocked model.EntityFactory // mockedEntityFactory := &EntityFactoryMock{ +// BuilderFunc: func(ctx context.Context) ds.Builder { +// panic("mock out the Builder method") +// }, // DynamicStructFunc: func(ctx context.Context) ds.DynamicStruct { // panic("mock out the DynamicStruct method") // }, @@ -2096,6 +2155,9 @@ var _ model.EntityFactory = &EntityFactoryMock{} // // } type EntityFactoryMock struct { + // BuilderFunc mocks the Builder method. + BuilderFunc func(ctx context.Context) ds.Builder + // DynamicStructFunc mocks the DynamicStruct method. DynamicStructFunc func(ctx context.Context) ds.DynamicStruct @@ -2116,6 +2178,11 @@ type EntityFactoryMock struct { // calls tracks calls to the methods. calls struct { + // Builder holds details about calls to the Builder method. + Builder []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // DynamicStruct holds details about calls to the DynamicStruct method. DynamicStruct []struct { // Ctx is the ctx argument value. @@ -2145,6 +2212,7 @@ type EntityFactoryMock struct { TableName []struct { } } + lockBuilder sync.RWMutex lockDynamicStruct sync.RWMutex lockFromSchemaAndBuilder sync.RWMutex lockName sync.RWMutex @@ -2153,6 +2221,37 @@ type EntityFactoryMock struct { lockTableName sync.RWMutex } +// Builder calls BuilderFunc. +func (mock *EntityFactoryMock) Builder(ctx context.Context) ds.Builder { + if mock.BuilderFunc == nil { + panic("EntityFactoryMock.BuilderFunc: method is nil but EntityFactory.Builder was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockBuilder.Lock() + mock.calls.Builder = append(mock.calls.Builder, callInfo) + mock.lockBuilder.Unlock() + return mock.BuilderFunc(ctx) +} + +// BuilderCalls gets all the calls that were made to Builder. +// Check the length with: +// len(mockedEntityFactory.BuilderCalls()) +func (mock *EntityFactoryMock) BuilderCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockBuilder.RLock() + calls = mock.calls.Builder + mock.lockBuilder.RUnlock() + return calls +} + // DynamicStruct calls DynamicStructFunc. func (mock *EntityFactoryMock) DynamicStruct(ctx context.Context) ds.DynamicStruct { if mock.DynamicStructFunc == nil { @@ -2331,3 +2430,153 @@ func (mock *EntityFactoryMock) TableNameCalls() []struct { mock.lockTableName.RUnlock() return calls } + +// Ensure, that EventDispatcherMock does implement model.EventDispatcher. +// If this is not the case, regenerate this file with moq. +var _ model.EventDispatcher = &EventDispatcherMock{} + +// EventDispatcherMock is a mock implementation of model.EventDispatcher. +// +// func TestSomethingThatUsesEventDispatcher(t *testing.T) { +// +// // make and configure a mocked model.EventDispatcher +// mockedEventDispatcher := &EventDispatcherMock{ +// AddSubscriberFunc: func(handler model.EventHandler) { +// panic("mock out the AddSubscriber method") +// }, +// DispatchFunc: func(ctx context.Context, event model.Event) { +// panic("mock out the Dispatch method") +// }, +// GetSubscribersFunc: func() []model.EventHandler { +// panic("mock out the GetSubscribers method") +// }, +// } +// +// // use mockedEventDispatcher in code that requires model.EventDispatcher +// // and then make assertions. +// +// } +type EventDispatcherMock struct { + // AddSubscriberFunc mocks the AddSubscriber method. + AddSubscriberFunc func(handler model.EventHandler) + + // DispatchFunc mocks the Dispatch method. + DispatchFunc func(ctx context.Context, event model.Event) + + // GetSubscribersFunc mocks the GetSubscribers method. + GetSubscribersFunc func() []model.EventHandler + + // calls tracks calls to the methods. + calls struct { + // AddSubscriber holds details about calls to the AddSubscriber method. + AddSubscriber []struct { + // Handler is the handler argument value. + Handler model.EventHandler + } + // Dispatch holds details about calls to the Dispatch method. + Dispatch []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Event is the event argument value. + Event model.Event + } + // GetSubscribers holds details about calls to the GetSubscribers method. + GetSubscribers []struct { + } + } + lockAddSubscriber sync.RWMutex + lockDispatch sync.RWMutex + lockGetSubscribers sync.RWMutex +} + +// AddSubscriber calls AddSubscriberFunc. +func (mock *EventDispatcherMock) AddSubscriber(handler model.EventHandler) { + if mock.AddSubscriberFunc == nil { + panic("EventDispatcherMock.AddSubscriberFunc: method is nil but EventDispatcher.AddSubscriber was just called") + } + callInfo := struct { + Handler model.EventHandler + }{ + Handler: handler, + } + mock.lockAddSubscriber.Lock() + mock.calls.AddSubscriber = append(mock.calls.AddSubscriber, callInfo) + mock.lockAddSubscriber.Unlock() + mock.AddSubscriberFunc(handler) +} + +// AddSubscriberCalls gets all the calls that were made to AddSubscriber. +// Check the length with: +// len(mockedEventDispatcher.AddSubscriberCalls()) +func (mock *EventDispatcherMock) AddSubscriberCalls() []struct { + Handler model.EventHandler +} { + var calls []struct { + Handler model.EventHandler + } + mock.lockAddSubscriber.RLock() + calls = mock.calls.AddSubscriber + mock.lockAddSubscriber.RUnlock() + return calls +} + +// Dispatch calls DispatchFunc. +func (mock *EventDispatcherMock) Dispatch(ctx context.Context, event model.Event) { + if mock.DispatchFunc == nil { + panic("EventDispatcherMock.DispatchFunc: method is nil but EventDispatcher.Dispatch was just called") + } + callInfo := struct { + Ctx context.Context + Event model.Event + }{ + Ctx: ctx, + Event: event, + } + mock.lockDispatch.Lock() + mock.calls.Dispatch = append(mock.calls.Dispatch, callInfo) + mock.lockDispatch.Unlock() + mock.DispatchFunc(ctx, event) +} + +// DispatchCalls gets all the calls that were made to Dispatch. +// Check the length with: +// len(mockedEventDispatcher.DispatchCalls()) +func (mock *EventDispatcherMock) DispatchCalls() []struct { + Ctx context.Context + Event model.Event +} { + var calls []struct { + Ctx context.Context + Event model.Event + } + mock.lockDispatch.RLock() + calls = mock.calls.Dispatch + mock.lockDispatch.RUnlock() + return calls +} + +// GetSubscribers calls GetSubscribersFunc. +func (mock *EventDispatcherMock) GetSubscribers() []model.EventHandler { + if mock.GetSubscribersFunc == nil { + panic("EventDispatcherMock.GetSubscribersFunc: method is nil but EventDispatcher.GetSubscribers was just called") + } + callInfo := struct { + }{} + mock.lockGetSubscribers.Lock() + mock.calls.GetSubscribers = append(mock.calls.GetSubscribers, callInfo) + mock.lockGetSubscribers.Unlock() + return mock.GetSubscribersFunc() +} + +// GetSubscribersCalls gets all the calls that were made to GetSubscribers. +// Check the length with: +// len(mockedEventDispatcher.GetSubscribersCalls()) +func (mock *EventDispatcherMock) GetSubscribersCalls() []struct { +} { + var calls []struct { + } + mock.lockGetSubscribers.RLock() + calls = mock.calls.GetSubscribers + mock.lockGetSubscribers.RUnlock() + return calls +} diff --git a/model/repositories.go b/model/repositories.go index b2172ff7..a05a8cc0 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -33,6 +33,7 @@ type GormEvent struct { ApplicationID string `gorm:"index"` User string `gorm:"index"` SequenceNo int64 + SchemaName string `gorm:"index"` } //NewGormEvent converts a domain event to something that is a bit easier for Gorm to work with @@ -52,11 +53,15 @@ func NewGormEvent(event *Event) (GormEvent, error) { ApplicationID: event.Meta.Module, User: event.Meta.User, SequenceNo: event.Meta.SequenceNo, + SchemaName: event.Meta.SchemaName, }, nil } func (e *EventRepositoryGorm) Persist(ctxt context.Context, entity AggregateInterface) error { //TODO use the information in the context to get account info, module info. //didn't think it should barf if an empty list is passed + entityFact := ctxt.Value(context2.ENTITY_FACTORY) + schemaName := entityFact.(EntityFactory).Name() + var gormEvents []GormEvent entities := entity.GetNewChanges() savePointID := "s" + ksuid.New().String() //NOTE the save point can't start with a number @@ -80,6 +85,9 @@ func (e *EventRepositoryGorm) Persist(ctxt context.Context, entity AggregateInte if event.Meta.Group == "" { event.Meta.Group = e.GroupID } + if event.Meta.SchemaName == "" { + event.Meta.SchemaName = schemaName + } if !event.IsValid() { for _, terr := range event.GetErrors() { e.logger.Errorf("error encountered persisting entity '%s', '%s'", event.Meta.EntityID, terr) @@ -288,20 +296,20 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { } //Content may not be applicable to this func since there would be an instance of it being called at server.go run. Therefore we won't have a "proper" content which would contain the EntityFactory -func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time) error { +func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory) (int, int, int, error) { var events []GormEvent if date.IsZero() { result := e.DB.Table("gorm_events").Find(&events) if result.Error != nil { e.logger.Errorf("got error pulling events '%s'", result.Error) - return result.Error + return 0, 0, 0, result.Error } } else { result := e.DB.Table("gorm_events").Where("created_at = ?", date).Find(&events) if result.Error != nil { e.logger.Errorf("got error pulling events '%s'", result.Error) - return result.Error + return 0, 0, 0, result.Error } } @@ -319,16 +327,50 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time) Module: event.ApplicationID, User: event.User, SequenceNo: event.SequenceNo, + SchemaName: event.SchemaName, }, Version: 0, }) } + totalEvents := len(tEvents) + successfulEvents := 0 + failedEvents := 0 + entity := map[string]interface{}{} + dispatchEntity := true + for _, event := range tEvents { - e.eventDispatcher.Dispatch(ctxt, *event) + if entityFactories[event.Meta.SchemaName] == nil { + e.logger.Errorf("no entity factory found for schema %s", event.Meta.SchemaName) + + failedEvents++ + } else { + newContext := context.WithValue(ctxt, context2.ENTITY_FACTORY, entityFactories[event.Meta.SchemaName]) + + if !e.DB.Migrator().HasTable(event.Meta.SchemaName) { + e.eventDispatcher.Dispatch(newContext, *event) + successfulEvents++ + } else { + result := e.DB.Table(event.Meta.SchemaName).Find(&entity, "weos_id = ? ", event.Meta.EntityID) + if result.Error != nil { + e.logger.Errorf("got error pulling events '%s'", result.Error) + return 0, 0, 0, result.Error + } + + //entity["weos_id"] != event.Meta.EntityID || entity["weos_id"] == event.Meta.EntityID + if result.RowsAffected != 0 { + dispatchEntity = false + failedEvents++ + } + + if dispatchEntity == true { + e.eventDispatcher.Dispatch(newContext, *event) + successfulEvents++ + } + } + } } - - return nil + return totalEvents, successfulEvents, failedEvents, nil } func NewBasicEventRepository(gormDB *gorm.DB, logger Log, useUnitOfWork bool, accountID string, applicationID string) (EventRepository, error) { diff --git a/model/repositories_test.go b/model/repositories_test.go new file mode 100644 index 00000000..122f5259 --- /dev/null +++ b/model/repositories_test.go @@ -0,0 +1,175 @@ +package model_test + +import ( + "context" + weoscontext "github.com/wepala/weos/context" + "github.com/wepala/weos/controllers/rest" + "github.com/wepala/weos/model" + "os" + "testing" + "time" +) + +func TestEventRepository_ReplayEvents(t *testing.T) { + + ctx := context.Background() + + api, err := rest.New("../controllers/rest/fixtures/blog.yaml") + err = api.Initialize(context.TODO()) + if err != nil { + t.Fatal(err) + } + + schemaName := "Blog" + + factories := api.GetEntityFactories() + newContext := context.WithValue(ctx, weoscontext.ENTITY_FACTORY, factories[schemaName]) + + mockPayload1 := map[string]interface{}{"weos_id": "12345", "sequence_no": int64(1), "title": "Test Blog", "url": "testing.com"} + entity1 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "12345", + }, + SequenceNo: int64(0), + }, + Property: mockPayload1, + } + event1 := model.NewEntityEvent("create", entity1, "12345", mockPayload1) + entity1.NewChange(event1) + + mockPayload2 := map[string]interface{}{"weos_id": "123456", "sequence_no": int64(1), "title": "Test Blog1", "url": "testing1.com"} + entity2 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "123456", + }, + SequenceNo: int64(0), + }, + Property: mockPayload2, + } + event2 := model.NewEntityEvent("create", entity2, "123456", mockPayload2) + entity2.NewChange(event2) + + repo, err := api.GetEventStore("Default") + if err != nil { + t.Fatal(err) + } + + eventRepo := repo.(*model.EventRepositoryGorm) + + eventRepo.Persist(newContext, entity1) + eventRepo.Persist(newContext, entity2) + + t.Run("replay events - drop tables", func(t *testing.T) { + err = eventRepo.DB.Migrator().DropTable("Blog") + if err != nil { + t.Fatal(err) + } + + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories) + if err != nil { + t.Fatal(err) + } + + if total != 2 { + t.Fatalf("expected total events to be %d, got %d", 2, total) + } + + if successful != 2 { + t.Fatalf("expected successful events to be %d, got %d", 2, successful) + } + + if failed != 0 { + t.Fatalf("expected failed events to be %d, got %d", 0, failed) + } + }) + os.Remove("test.db") +} + +func TestEventRepository_ReplayEvents2(t *testing.T) { + + ctx := context.Background() + + api, err := rest.New("../controllers/rest/fixtures/blog.yaml") + err = api.Initialize(context.TODO()) + if err != nil { + t.Fatal(err) + } + + schemaName := "Blog" + + factories := api.GetEntityFactories() + newContext := context.WithValue(ctx, weoscontext.ENTITY_FACTORY, factories[schemaName]) + + mockPayload1 := map[string]interface{}{"weos_id": "12345", "sequence_no": int64(1), "title": "Test Blog", "url": "testing.com"} + entity1 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "12345", + }, + SequenceNo: int64(0), + }, + Property: mockPayload1, + } + event1 := model.NewEntityEvent("create", entity1, "12345", mockPayload1) + entity1.NewChange(event1) + + mockPayload2 := map[string]interface{}{"weos_id": "123456", "sequence_no": int64(1), "title": "Test Blog1", "url": "testing1.com"} + entity2 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "123456", + }, + SequenceNo: int64(0), + }, + Property: mockPayload2, + } + event2 := model.NewEntityEvent("create", entity2, "123456", mockPayload2) + entity2.NewChange(event2) + + mockPayload3 := map[string]interface{}{"weos_id": "1234567", "sequence_no": int64(1), "title": "Test Blog2", "url": "testing2.com"} + entity3 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "1234567", + }, + SequenceNo: int64(0), + }, + Property: mockPayload3, + } + event3 := model.NewEntityEvent("create", entity3, "1234567", mockPayload3) + entity3.NewChange(event3) + + repo, err := api.GetEventStore("Default") + if err != nil { + t.Fatal(err) + } + + eventRepo := repo.(*model.EventRepositoryGorm) + + eventRepo.Persist(newContext, entity1) + eventRepo.Persist(newContext, entity2) + eventRepo.Persist(newContext, entity3) + + t.Run("replay events - existing data", func(t *testing.T) { + + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories) + if err != nil { + t.Fatal(err) + } + + if total != 3 { + t.Fatalf("expected total events to be %d, got %d", 3, total) + } + + if successful != 0 { + t.Fatalf("expected successful events to be %d, got %d", 0, successful) + } + + if failed != 3 { + t.Fatalf("expected failed events to be %d, got %d", 3, failed) + } + }) + os.Remove("test.db") +} diff --git a/projections/gorm.go b/projections/gorm.go index 3c2d44f4..c7a37963 100644 --- a/projections/gorm.go +++ b/projections/gorm.go @@ -146,6 +146,18 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) } + + //If the table was dropped, recreate it + if !p.db.Migrator().HasTable(event.Meta.SchemaName) { + schemas := make(map[string]ds.Builder) + schemas[event.Meta.SchemaName] = entityFactory.Builder(context.Background()) + + err := p.Migrate(context.Background(), schemas) + if err != nil { + p.logger.Errorf("error migrating table: %s", event.Meta.SchemaName) + } + } + db := p.db.Table(entityFactory.Name()).Create(eventPayload) if db.Error != nil { p.logger.Errorf("error creating %s, got %s", entityFactory.Name(), db.Error) @@ -187,6 +199,17 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { } } + //If the table was dropped, recreate it + if !p.db.Migrator().HasTable(event.Meta.SchemaName) { + schemas := make(map[string]ds.Builder) + schemas[event.Meta.SchemaName] = entityFactory.Builder(context.Background()) + + err := p.Migrate(context.Background(), schemas) + if err != nil { + p.logger.Errorf("error migrating table: %s", event.Meta.SchemaName) + } + } + //update database value db := p.db.Table(entityFactory.Name()).Updates(eventPayload) if db.Error != nil { @@ -199,6 +222,18 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { if err != nil { p.logger.Errorf("error creating entity '%s'", err) } + + //If the table was dropped, recreate it + if !p.db.Migrator().HasTable(event.Meta.SchemaName) { + schemas := make(map[string]ds.Builder) + schemas[event.Meta.SchemaName] = entityFactory.Builder(context.Background()) + + err := p.Migrate(context.Background(), schemas) + if err != nil { + p.logger.Errorf("error migrating table: %s", event.Meta.SchemaName) + } + } + db := p.db.Table(entityFactory.Name()).Where("weos_id = ?", event.Meta.EntityID).Delete(entity.Property) if db.Error != nil { p.logger.Errorf("error deleting %s, got %s", entityFactory.Name(), db.Error) diff --git a/server.go b/server.go index 2488f128..84db54fe 100644 --- a/server.go +++ b/server.go @@ -1,18 +1,18 @@ package main import ( + "context" "flag" weos "github.com/wepala/weos/controllers/rest" - "golang.org/x/net/context" "os" "time" ) var port = flag.String("port", "8681", "-port=8681") var schema = flag.String("spec", "./api.yaml", "schema for initialization") -var replay = flag.Bool("replay events", true, "replay events from gorm events") +var replay = flag.Bool("replay events", false, "replay events from gorm events") -//Pass flag for replay events +//TODO Add a flag for the time func main() { flag.Parse() @@ -28,9 +28,8 @@ func main() { if *replay == true { e, _ := restAPI.GetEventStore("default") - - //Entity factory will be needed in this context. - e.ReplayEvents(context.Background(), time.Time{}) + factories := restAPI.GetEntityFactories() + e.ReplayEvents(context.Background(), time.Time{}, factories) } } From 8710b744dee6c61df068294a37a3e2e49d8f1a91 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Wed, 9 Feb 2022 01:35:56 -0400 Subject: [PATCH 08/18] feature:WEOS-1327 - Working on e2e tests --- end2end_test.go | 89 +++++++++++++++++-- .../hydrate-datastore-from-events.feature | 4 +- 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/end2end_test.go b/end2end_test.go index 853fbe78..57bbf965 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "github.com/testcontainers/testcontainers-go" + "github.com/wepala/weos/model" "github.com/wepala/weos/projections" "mime/multipart" "net/http" @@ -56,6 +57,9 @@ var page int var contentType string var result api.ListApiResponse var scenarioContext context.Context +var total int +var success int +var failed int type User struct { Name string @@ -82,6 +86,7 @@ func InitializeSuite(ctx *godog.TestSuiteContext) { contentTypeID = map[string]bool{} Developer = &User{} result = api.ListApiResponse{} + total, success, failed = 0, 0, 0 os.Remove("e2e.db") openAPI = `openapi: 3.0.3 info: @@ -149,6 +154,7 @@ func reset(ctx context.Context, sc *godog.Scenario) (context.Context, error) { if err != nil { fmt.Errorf("unexpected error '%s'", err) } + total, success, failed = 0, 0, 0 db.Exec("PRAGMA foreign_keys = ON") e = echo.New() openAPI = `openapi: 3.0.3 @@ -1007,24 +1013,93 @@ func theShouldBeDeleted(contentEntity string, id int) error { return nil } -func aWithIdWasDeleted(arg1, arg2 string) error { +func aWithIdWasDeleted(contentType, id string) error { return godog.ErrPending } func callsTheReplayMethodOnTheEventRepository(arg1 string) error { - return godog.ErrPending + repo, err := API.GetEventStore("Default") + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } + eventRepo := repo.(*model.EventRepositoryGorm) + + factories := API.GetEntityFactories() + total, success, failed, err = eventRepo.ReplayEvents(context.Background(), time.Time{}, factories) + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } + return nil } -func sojournerDeletesTheTable(arg1 string) error { - return godog.ErrPending +func sojournerDeletesTheTable(tableName string) error { + repo, err := API.GetEventStore("Default") + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } + eventRepo := repo.(*model.EventRepositoryGorm) + + err = eventRepo.DB.Migrator().DropTable(strings.Title(tableName)) + if err != nil { + return fmt.Errorf("error dropping table: %s got err: %s", tableName, err) + } + return nil } -func theTableShouldBePopulatedWith(arg1 string, arg2 *godog.Table) error { - return godog.ErrPending +func theTableShouldBePopulatedWith(contentType string, details *godog.Table) error { + contentEntity := map[string]interface{}{} + var result *gorm.DB + + head := details.Rows[0].Cells + compare := map[string]interface{}{} + + for i := 1; i < len(details.Rows); i++ { + for n, cell := range details.Rows[i].Cells { + compare[head[n].Value] = cell.Value + } + + for key, value := range compare { + apiProjection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("unexpected error getting projection: %s", err) + } + apiProjection1 := apiProjection.(*projections.GORMProjection) + result = apiProjection1.DB().Table(strings.Title(contentType)).Find(&contentEntity, key+" = ?", value) + if contentEntity != nil { + break + } + } + + if contentEntity == nil { + return fmt.Errorf("unexpected error finding content type in db") + } + + if result.Error != nil { + return fmt.Errorf("unexpected error finding content type: %s", result.Error) + } + + for key, value := range compare { + if key == "sequence_no" { + strSeq := strconv.Itoa(int(contentEntity[key].(int64))) + if strSeq != value { + return fmt.Errorf("expected %s %s %s, got %s", contentType, key, value, contentEntity[key]) + } + } else { + if contentEntity[key] != value { + return fmt.Errorf("expected %s %s %s, got %s", contentType, key, value, contentEntity[key]) + } + } + } + + } + return nil } func theTotalNoEventsAndProcessedAndFailuresShouldBeReturned() error { - return godog.ErrPending + if total == 0 && success == 0 && failed == 0 { + return fmt.Errorf("expected total, success and failed to be non 0 values") + } + return nil } func InitializeScenario(ctx *godog.ScenarioContext) { diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature index b9b94db5..9bb4a66a 100644 --- a/features/hydrate-datastore-from-events.feature +++ b/features/hydrate-datastore-from-events.feature @@ -246,7 +246,7 @@ Feature: Hydrate database using events Given Sojourner" deletes the "Blogs" table When "Sojourner" calls the replay method on the event repository - Then the "Blogs" table should be populated with + Then the "Blog" table should be populated with | id | weos_id | sequence_no | title | description | | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | @@ -266,7 +266,7 @@ Feature: Hydrate database using events Given a "Blog" with id "1237" was deleted And a "Blog" with id "164" was deleted When "Sojourner" calls the replay method on the event repository - Then the "Blogs" table should be populated with + Then the "Blog" table should be populated with | id | weos_id | sequence_no | title | description | | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | From 53a2fdba07dad312271c48b1321075ca7010f6d2 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Wed, 9 Feb 2022 08:57:31 -0400 Subject: [PATCH 09/18] feature:WEOS-1327 - Working on E2E tests - Edited feature file Blogs -> Blog - Edited replayEvents --- end2end_test.go | 30 ++++++++++++++----- .../hydrate-datastore-from-events.feature | 2 +- model/repositories.go | 2 +- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/end2end_test.go b/end2end_test.go index 57bbf965..b1b9d5ad 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1013,8 +1013,20 @@ func theShouldBeDeleted(contentEntity string, id int) error { return nil } -func aWithIdWasDeleted(contentType, id string) error { - return godog.ErrPending +func aWithIdWasDeleted(contentEntity, id string) error { + output := map[string]interface{}{} + + apiProjection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("unexpected error getting projection: %s", err) + } + apiProjection1 := apiProjection.(*projections.GORMProjection) + + searchResult := apiProjection1.DB().Table(strings.Title(contentEntity)).Where("id = ?", id).Delete(&output) + if searchResult.Error != nil { + return fmt.Errorf("got error from db query: %s", searchResult.Error) + } + return nil } func callsTheReplayMethodOnTheEventRepository(arg1 string) error { @@ -1033,15 +1045,17 @@ func callsTheReplayMethodOnTheEventRepository(arg1 string) error { } func sojournerDeletesTheTable(tableName string) error { - repo, err := API.GetEventStore("Default") + //output := map[string]interface{}{} + + apiProjection, err := API.GetProjection("Default") if err != nil { - return fmt.Errorf("error getting event store: %s", err) + return fmt.Errorf("unexpected error getting projection: %s", err) } - eventRepo := repo.(*model.EventRepositoryGorm) + apiProjection1 := apiProjection.(*projections.GORMProjection) - err = eventRepo.DB.Migrator().DropTable(strings.Title(tableName)) - if err != nil { - return fmt.Errorf("error dropping table: %s got err: %s", tableName, err) + result := apiProjection1.DB().Migrator().DropTable(strings.Title(tableName)) + if result != nil { + return fmt.Errorf("error dropping table: %s got err: %s", tableName, result) } return nil } diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature index 9bb4a66a..e2e9b63a 100644 --- a/features/hydrate-datastore-from-events.feature +++ b/features/hydrate-datastore-from-events.feature @@ -244,7 +244,7 @@ Feature: Hydrate database using events A developer should be able to configure an event repository to replay all it's events on startup. This should trigger the associated projections - Given Sojourner" deletes the "Blogs" table + Given Sojourner" deletes the "Blog" table When "Sojourner" calls the replay method on the event repository Then the "Blog" table should be populated with | id | weos_id | sequence_no | title | description | diff --git a/model/repositories.go b/model/repositories.go index a05a8cc0..f21b8b0d 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -337,9 +337,9 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, successfulEvents := 0 failedEvents := 0 entity := map[string]interface{}{} - dispatchEntity := true for _, event := range tEvents { + dispatchEntity := true if entityFactories[event.Meta.SchemaName] == nil { e.logger.Errorf("no entity factory found for schema %s", event.Meta.SchemaName) From 242ea1e1e68858b8697632bf00e39b0a8ab9bd09 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Wed, 9 Feb 2022 13:46:01 -0400 Subject: [PATCH 10/18] feature:WEOS-1327 - Fixed E2E tests - reverted gorm - Edited Interfaces - Regen Mocks - Edited server.go - Edited Replay func - Edited Replay test --- controllers/rest/weos_mocks_test.go | 14 +- end2end_test.go | 8 +- .../hydrate-datastore-from-events.feature | 2 +- model/interfaces.go | 2 +- model/mocks_test.go | 14 +- model/repositories.go | 53 ++++---- model/repositories_test.go | 126 ++++++++---------- projections/gorm.go | 33 ----- server.go | 5 +- 9 files changed, 110 insertions(+), 147 deletions(-) diff --git a/controllers/rest/weos_mocks_test.go b/controllers/rest/weos_mocks_test.go index 80edbb38..c5d84a24 100644 --- a/controllers/rest/weos_mocks_test.go +++ b/controllers/rest/weos_mocks_test.go @@ -55,7 +55,7 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, -// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { // panic("mock out the ReplayEvents method") // }, // } @@ -96,7 +96,7 @@ type EventRepositoryMock struct { PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error // ReplayEventsFunc mocks the ReplayEvents method. - ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) // calls tracks calls to the methods. calls struct { @@ -166,6 +166,8 @@ type EventRepositoryMock struct { Date time.Time // EntityFactories is the entityFactories argument value. EntityFactories map[string]model.EntityFactory + // Projection is the projection argument value. + Projection model.Projection } } lockAddSubscriber sync.RWMutex @@ -506,7 +508,7 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { } // ReplayEvents calls ReplayEventsFunc. -func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { if mock.ReplayEventsFunc == nil { panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") } @@ -514,15 +516,17 @@ func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Ti Ctxt context.Context Date time.Time EntityFactories map[string]model.EntityFactory + Projection model.Projection }{ Ctxt: ctxt, Date: date, EntityFactories: entityFactories, + Projection: projection, } mock.lockReplayEvents.Lock() mock.calls.ReplayEvents = append(mock.calls.ReplayEvents, callInfo) mock.lockReplayEvents.Unlock() - return mock.ReplayEventsFunc(ctxt, date, entityFactories) + return mock.ReplayEventsFunc(ctxt, date, entityFactories, projection) } // ReplayEventsCalls gets all the calls that were made to ReplayEvents. @@ -532,11 +536,13 @@ func (mock *EventRepositoryMock) ReplayEventsCalls() []struct { Ctxt context.Context Date time.Time EntityFactories map[string]model.EntityFactory + Projection model.Projection } { var calls []struct { Ctxt context.Context Date time.Time EntityFactories map[string]model.EntityFactory + Projection model.Projection } mock.lockReplayEvents.RLock() calls = mock.calls.ReplayEvents diff --git a/end2end_test.go b/end2end_test.go index b1b9d5ad..7f4eb28b 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1035,9 +1035,13 @@ func callsTheReplayMethodOnTheEventRepository(arg1 string) error { return fmt.Errorf("error getting event store: %s", err) } eventRepo := repo.(*model.EventRepositoryGorm) + projection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("error getting event store: %s", err) + } factories := API.GetEntityFactories() - total, success, failed, err = eventRepo.ReplayEvents(context.Background(), time.Time{}, factories) + total, success, failed, err = eventRepo.ReplayEvents(context.Background(), time.Time{}, factories, projection) if err != nil { return fmt.Errorf("error getting event store: %s", err) } @@ -1187,7 +1191,7 @@ func TestBDD(t *testing.T) { Options: &godog.Options{ Format: "pretty", //Tags: "~skipped && ~long", - Tags: "WEOS-1327", + Tags: "focus", //Tags: "WEOS-1110 && ~skipped", }, }.Run() diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature index e2e9b63a..9907c2b8 100644 --- a/features/hydrate-datastore-from-events.feature +++ b/features/hydrate-datastore-from-events.feature @@ -261,7 +261,7 @@ Feature: Hydrate database using events @WEOS-1327 Scenario: Repair data tables after some was deleted - @WEOS-1327 + @WEOS-1327 @focus Scenario: Repair tables after some content has been deleted Given a "Blog" with id "1237" was deleted And a "Blog" with id "164" was deleted diff --git a/model/interfaces.go b/model/interfaces.go index 5d8750b4..7fc327fb 100644 --- a/model/interfaces.go +++ b/model/interfaces.go @@ -69,7 +69,7 @@ type EventRepository interface { GetByAggregateAndSequenceRange(ID string, start int64, end int64) ([]*Event, error) AddSubscriber(handler EventHandler) GetSubscribers() ([]EventHandler, error) - ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory) (int, int, int, error) + ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projection Projection) (int, int, int, error) } type Datastore interface { diff --git a/model/mocks_test.go b/model/mocks_test.go index 80722ccc..b98cb113 100644 --- a/model/mocks_test.go +++ b/model/mocks_test.go @@ -55,7 +55,7 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, -// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { // panic("mock out the ReplayEvents method") // }, // } @@ -96,7 +96,7 @@ type EventRepositoryMock struct { PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error // ReplayEventsFunc mocks the ReplayEvents method. - ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) // calls tracks calls to the methods. calls struct { @@ -166,6 +166,8 @@ type EventRepositoryMock struct { Date time.Time // EntityFactories is the entityFactories argument value. EntityFactories map[string]model.EntityFactory + // Projection is the projection argument value. + Projection model.Projection } } lockAddSubscriber sync.RWMutex @@ -506,7 +508,7 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { } // ReplayEvents calls ReplayEventsFunc. -func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory) (int, int, int, error) { +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { if mock.ReplayEventsFunc == nil { panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") } @@ -514,15 +516,17 @@ func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Ti Ctxt context.Context Date time.Time EntityFactories map[string]model.EntityFactory + Projection model.Projection }{ Ctxt: ctxt, Date: date, EntityFactories: entityFactories, + Projection: projection, } mock.lockReplayEvents.Lock() mock.calls.ReplayEvents = append(mock.calls.ReplayEvents, callInfo) mock.lockReplayEvents.Unlock() - return mock.ReplayEventsFunc(ctxt, date, entityFactories) + return mock.ReplayEventsFunc(ctxt, date, entityFactories, projection) } // ReplayEventsCalls gets all the calls that were made to ReplayEvents. @@ -532,11 +536,13 @@ func (mock *EventRepositoryMock) ReplayEventsCalls() []struct { Ctxt context.Context Date time.Time EntityFactories map[string]model.EntityFactory + Projection model.Projection } { var calls []struct { Ctxt context.Context Date time.Time EntityFactories map[string]model.EntityFactory + Projection model.Projection } mock.lockReplayEvents.RLock() calls = mock.calls.ReplayEvents diff --git a/model/repositories.go b/model/repositories.go index f21b8b0d..e9b7c402 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -2,6 +2,7 @@ package model import ( "encoding/json" + ds "github.com/ompluscator/dynamic-struct" "github.com/segmentio/ksuid" context2 "github.com/wepala/weos/context" "golang.org/x/net/context" @@ -296,7 +297,18 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { } //Content may not be applicable to this func since there would be an instance of it being called at server.go run. Therefore we won't have a "proper" content which would contain the EntityFactory -func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory) (int, int, int, error) { +func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projections Projection) (int, int, int, error) { + schemas := make(map[string]ds.Builder) + + for _, value := range entityFactories { + schemas[value.Name()] = value.Builder(context.Background()) + } + + err := projections.Migrate(ctxt, schemas) + if err != nil { + e.logger.Errorf("error migrating tables: %s", err) + } + var events []GormEvent if date.IsZero() { @@ -339,35 +351,20 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entity := map[string]interface{}{} for _, event := range tEvents { - dispatchEntity := true - if entityFactories[event.Meta.SchemaName] == nil { - e.logger.Errorf("no entity factory found for schema %s", event.Meta.SchemaName) + newContext := context.WithValue(ctxt, context2.ENTITY_FACTORY, entityFactories[event.Meta.SchemaName]) + + result := e.DB.Table(event.Meta.SchemaName).Find(&entity, "weos_id = ? and sequence_no = ?", event.Meta.EntityID, event.Meta.SequenceNo) + if result.Error != nil { + e.logger.Errorf("got error pulling events '%s'", result.Error) + return 0, 0, 0, result.Error + } + + if result.RowsAffected != 0 { failedEvents++ - } else { - newContext := context.WithValue(ctxt, context2.ENTITY_FACTORY, entityFactories[event.Meta.SchemaName]) - - if !e.DB.Migrator().HasTable(event.Meta.SchemaName) { - e.eventDispatcher.Dispatch(newContext, *event) - successfulEvents++ - } else { - result := e.DB.Table(event.Meta.SchemaName).Find(&entity, "weos_id = ? ", event.Meta.EntityID) - if result.Error != nil { - e.logger.Errorf("got error pulling events '%s'", result.Error) - return 0, 0, 0, result.Error - } - - //entity["weos_id"] != event.Meta.EntityID || entity["weos_id"] == event.Meta.EntityID - if result.RowsAffected != 0 { - dispatchEntity = false - failedEvents++ - } - - if dispatchEntity == true { - e.eventDispatcher.Dispatch(newContext, *event) - successfulEvents++ - } - } + } else if result.RowsAffected == 0 { + e.eventDispatcher.Dispatch(newContext, *event) + successfulEvents++ } } return totalEvents, successfulEvents, failedEvents, nil diff --git a/model/repositories_test.go b/model/repositories_test.go index 122f5259..e764f938 100644 --- a/model/repositories_test.go +++ b/model/repositories_test.go @@ -51,15 +51,33 @@ func TestEventRepository_ReplayEvents(t *testing.T) { event2 := model.NewEntityEvent("create", entity2, "123456", mockPayload2) entity2.NewChange(event2) + mockPayload3 := map[string]interface{}{"weos_id": "1234567", "sequence_no": int64(1), "title": "Test Blog2", "url": "testing2.com"} + entity3 := &model.ContentEntity{ + AggregateRoot: model.AggregateRoot{ + BasicEntity: model.BasicEntity{ + ID: "1234567", + }, + SequenceNo: int64(0), + }, + Property: mockPayload3, + } + event3 := model.NewEntityEvent("create", entity3, "1234567", mockPayload3) + entity3.NewChange(event3) + repo, err := api.GetEventStore("Default") if err != nil { t.Fatal(err) } eventRepo := repo.(*model.EventRepositoryGorm) + projection, err := api.GetProjection("Default") + if err != nil { + t.Fatal(err) + } eventRepo.Persist(newContext, entity1) eventRepo.Persist(newContext, entity2) + eventRepo.Persist(newContext, entity3) t.Run("replay events - drop tables", func(t *testing.T) { err = eventRepo.DB.Migrator().DropTable("Blog") @@ -67,94 +85,58 @@ func TestEventRepository_ReplayEvents(t *testing.T) { t.Fatal(err) } - total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories) + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) if err != nil { t.Fatal(err) } - if total != 2 { - t.Fatalf("expected total events to be %d, got %d", 2, total) + if total != 3 { + t.Fatalf("expected total events to be %d, got %d", 3, total) } - if successful != 2 { - t.Fatalf("expected successful events to be %d, got %d", 2, successful) + if successful != 3 { + t.Fatalf("expected successful events to be %d, got %d", 3, successful) } if failed != 0 { t.Fatalf("expected failed events to be %d, got %d", 0, failed) } }) - os.Remove("test.db") -} - -func TestEventRepository_ReplayEvents2(t *testing.T) { - - ctx := context.Background() - - api, err := rest.New("../controllers/rest/fixtures/blog.yaml") - err = api.Initialize(context.TODO()) - if err != nil { - t.Fatal(err) - } - - schemaName := "Blog" - - factories := api.GetEntityFactories() - newContext := context.WithValue(ctx, weoscontext.ENTITY_FACTORY, factories[schemaName]) + t.Run("replay events - existing data", func(t *testing.T) { - mockPayload1 := map[string]interface{}{"weos_id": "12345", "sequence_no": int64(1), "title": "Test Blog", "url": "testing.com"} - entity1 := &model.ContentEntity{ - AggregateRoot: model.AggregateRoot{ - BasicEntity: model.BasicEntity{ - ID: "12345", - }, - SequenceNo: int64(0), - }, - Property: mockPayload1, - } - event1 := model.NewEntityEvent("create", entity1, "12345", mockPayload1) - entity1.NewChange(event1) + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) + if err != nil { + t.Fatal(err) + } - mockPayload2 := map[string]interface{}{"weos_id": "123456", "sequence_no": int64(1), "title": "Test Blog1", "url": "testing1.com"} - entity2 := &model.ContentEntity{ - AggregateRoot: model.AggregateRoot{ - BasicEntity: model.BasicEntity{ - ID: "123456", - }, - SequenceNo: int64(0), - }, - Property: mockPayload2, - } - event2 := model.NewEntityEvent("create", entity2, "123456", mockPayload2) - entity2.NewChange(event2) + if total != 3 { + t.Fatalf("expected total events to be %d, got %d", 3, total) + } - mockPayload3 := map[string]interface{}{"weos_id": "1234567", "sequence_no": int64(1), "title": "Test Blog2", "url": "testing2.com"} - entity3 := &model.ContentEntity{ - AggregateRoot: model.AggregateRoot{ - BasicEntity: model.BasicEntity{ - ID: "1234567", - }, - SequenceNo: int64(0), - }, - Property: mockPayload3, - } - event3 := model.NewEntityEvent("create", entity3, "1234567", mockPayload3) - entity3.NewChange(event3) + if successful != 0 { + t.Fatalf("expected successful events to be %d, got %d", 0, successful) + } - repo, err := api.GetEventStore("Default") - if err != nil { - t.Fatal(err) - } + if failed != 3 { + t.Fatalf("expected failed events to be %d, got %d", 3, failed) + } + }) + t.Run("replay events - remove rows", func(t *testing.T) { + output := map[string]interface{}{} - eventRepo := repo.(*model.EventRepositoryGorm) + //apiProjection := projection.(*projections.GORMProjection) - eventRepo.Persist(newContext, entity1) - eventRepo.Persist(newContext, entity2) - eventRepo.Persist(newContext, entity3) + searchResult := eventRepo.DB.Table("Blog").Where("weos_id = ?", "12345").Delete(&output) + if searchResult.Error != nil { + t.Fatal(searchResult.Error) + } - t.Run("replay events - existing data", func(t *testing.T) { + searchResult = eventRepo.DB.Table("Blog").Where("weos_id = ?", "123456").Delete(&output) + if searchResult.Error != nil { + t.Fatal(searchResult.Error) + } - total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories) + total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) if err != nil { t.Fatal(err) } @@ -163,12 +145,12 @@ func TestEventRepository_ReplayEvents2(t *testing.T) { t.Fatalf("expected total events to be %d, got %d", 3, total) } - if successful != 0 { - t.Fatalf("expected successful events to be %d, got %d", 0, successful) + if successful != 2 { + t.Fatalf("expected successful events to be %d, got %d", 2, successful) } - if failed != 3 { - t.Fatalf("expected failed events to be %d, got %d", 3, failed) + if failed != 1 { + t.Fatalf("expected failed events to be %d, got %d", 1, failed) } }) os.Remove("test.db") diff --git a/projections/gorm.go b/projections/gorm.go index c7a37963..085d858a 100644 --- a/projections/gorm.go +++ b/projections/gorm.go @@ -147,17 +147,6 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { p.logger.Errorf("error unmarshalling event '%s'", err) } - //If the table was dropped, recreate it - if !p.db.Migrator().HasTable(event.Meta.SchemaName) { - schemas := make(map[string]ds.Builder) - schemas[event.Meta.SchemaName] = entityFactory.Builder(context.Background()) - - err := p.Migrate(context.Background(), schemas) - if err != nil { - p.logger.Errorf("error migrating table: %s", event.Meta.SchemaName) - } - } - db := p.db.Table(entityFactory.Name()).Create(eventPayload) if db.Error != nil { p.logger.Errorf("error creating %s, got %s", entityFactory.Name(), db.Error) @@ -199,17 +188,6 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { } } - //If the table was dropped, recreate it - if !p.db.Migrator().HasTable(event.Meta.SchemaName) { - schemas := make(map[string]ds.Builder) - schemas[event.Meta.SchemaName] = entityFactory.Builder(context.Background()) - - err := p.Migrate(context.Background(), schemas) - if err != nil { - p.logger.Errorf("error migrating table: %s", event.Meta.SchemaName) - } - } - //update database value db := p.db.Table(entityFactory.Name()).Updates(eventPayload) if db.Error != nil { @@ -223,17 +201,6 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { p.logger.Errorf("error creating entity '%s'", err) } - //If the table was dropped, recreate it - if !p.db.Migrator().HasTable(event.Meta.SchemaName) { - schemas := make(map[string]ds.Builder) - schemas[event.Meta.SchemaName] = entityFactory.Builder(context.Background()) - - err := p.Migrate(context.Background(), schemas) - if err != nil { - p.logger.Errorf("error migrating table: %s", event.Meta.SchemaName) - } - } - db := p.db.Table(entityFactory.Name()).Where("weos_id = ?", event.Meta.EntityID).Delete(entity.Property) if db.Error != nil { p.logger.Errorf("error deleting %s, got %s", entityFactory.Name(), db.Error) diff --git a/server.go b/server.go index 84db54fe..ba18fd53 100644 --- a/server.go +++ b/server.go @@ -12,7 +12,7 @@ var port = flag.String("port", "8681", "-port=8681") var schema = flag.String("spec", "./api.yaml", "schema for initialization") var replay = flag.Bool("replay events", false, "replay events from gorm events") -//TODO Add a flag for the time +//TODO Add a flag for the time ? func main() { flag.Parse() @@ -29,7 +29,8 @@ func main() { if *replay == true { e, _ := restAPI.GetEventStore("default") factories := restAPI.GetEntityFactories() - e.ReplayEvents(context.Background(), time.Time{}, factories) + projection, _ := restAPI.GetProjection("default") + e.ReplayEvents(context.Background(), time.Time{}, factories, projection) } } From 1afe5323b47b6257a64ec9edd97fe69d993f3f28 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Wed, 9 Feb 2022 14:46:49 -0400 Subject: [PATCH 11/18] feature:WEOS-1327 - Edited server.go - Edited main.go - Made small edits to e2e --- controllers/rest/main.go | 17 +++++++++++++++-- end2end_test.go | 4 ++-- server.go | 19 +++---------------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/controllers/rest/main.go b/controllers/rest/main.go index 3a5c316b..0815b2ed 100644 --- a/controllers/rest/main.go +++ b/controllers/rest/main.go @@ -1,12 +1,15 @@ package rest import ( + "context" "encoding/json" "github.com/getkin/kin-openapi/openapi3" "github.com/labstack/echo/v4" + "github.com/wepala/weos/model" "io/ioutil" "os" "strings" + "time" ) //New instantiates and initializes the api @@ -21,8 +24,7 @@ func New(apiConfig string) (*RESTAPI, error) { return api, err } -//Start API -func Start(port string, apiConfig string) *RESTAPI { +func Start(port string, apiConfig string, replay bool) *RESTAPI { api, err := New(apiConfig) if err != nil { api.EchoInstance().Logger.Error(err) @@ -31,6 +33,17 @@ func Start(port string, apiConfig string) *RESTAPI { if err != nil { api.EchoInstance().Logger.Fatal(err) } + + if replay == true { + e, _ := api.GetEventStore("Default") + eventRepo := e.(*model.EventRepositoryGorm) + projection, _ := api.GetProjection("Default") + factories := api.GetEntityFactories() + + total, success, failed, err := eventRepo.ReplayEvents(context.Background(), time.Time{}, factories, projection) + api.EchoInstance().Logger.Debugf("total: %d, success: %d, failed: %d, err: %s", total, success, failed, err) + } + api.EchoInstance().Logger.Fatal(api.EchoInstance().Start(":" + port)) return api } diff --git a/end2end_test.go b/end2end_test.go index 7f4eb28b..53bba02b 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1190,8 +1190,8 @@ func TestBDD(t *testing.T) { TestSuiteInitializer: InitializeSuite, Options: &godog.Options{ Format: "pretty", - //Tags: "~skipped && ~long", - Tags: "focus", + Tags: "~skipped && ~long", + //Tags: "focus", //Tags: "WEOS-1110 && ~skipped", }, }.Run() diff --git a/server.go b/server.go index ba18fd53..e1361a60 100644 --- a/server.go +++ b/server.go @@ -1,36 +1,23 @@ package main import ( - "context" "flag" weos "github.com/wepala/weos/controllers/rest" "os" - "time" ) var port = flag.String("port", "8681", "-port=8681") var schema = flag.String("spec", "./api.yaml", "schema for initialization") -var replay = flag.Bool("replay events", false, "replay events from gorm events") - -//TODO Add a flag for the time ? +var replay = flag.Bool("replay events", true, "replay events from gorm events") func main() { flag.Parse() apiFlag := *schema var apiEnv string - var restAPI *weos.RESTAPI apiEnv = os.Getenv("WEOS_SPEC") if apiEnv != "" { - restAPI = weos.Start(*port, apiEnv) + weos.Start(*port, apiEnv, *replay) } else if *schema != "" { - restAPI = weos.Start(*port, apiFlag) - } - - if *replay == true { - e, _ := restAPI.GetEventStore("default") - factories := restAPI.GetEntityFactories() - projection, _ := restAPI.GetProjection("default") - e.ReplayEvents(context.Background(), time.Time{}, factories, projection) + weos.Start(*port, apiFlag, *replay) } - } From 4479823cd6ede1269bbbd5852c0e1583903655db Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Thu, 10 Feb 2022 11:06:05 -0400 Subject: [PATCH 12/18] feature:WEOS-1327 - Updated E2E test - Removed SchemaName from event - Updated the event dispatcher to return and error - Updated EventHandler to return an error - Updated replay events - Updated replay tests - Regenerated Mocks --- controllers/rest/weos_mocks_test.go | 6 ++--- end2end_test.go | 4 +-- model/event.go | 1 - model/event_dispatcher.go | 15 ++++++++--- model/event_dispatcher_test.go | 8 ++++-- model/interfaces.go | 2 +- model/mocks_test.go | 6 ++--- model/module_test.go | 4 +-- model/repositories.go | 42 ++++++++++++++--------------- model/repositories_test.go | 14 +++++----- projections/gorm.go | 14 +++++++++- 11 files changed, 67 insertions(+), 49 deletions(-) diff --git a/controllers/rest/weos_mocks_test.go b/controllers/rest/weos_mocks_test.go index c5d84a24..70cd6bcd 100644 --- a/controllers/rest/weos_mocks_test.go +++ b/controllers/rest/weos_mocks_test.go @@ -55,7 +55,7 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, -// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { // panic("mock out the ReplayEvents method") // }, // } @@ -96,7 +96,7 @@ type EventRepositoryMock struct { PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error // ReplayEventsFunc mocks the ReplayEvents method. - ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) // calls tracks calls to the methods. calls struct { @@ -508,7 +508,7 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { } // ReplayEvents calls ReplayEventsFunc. -func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { if mock.ReplayEventsFunc == nil { panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") } diff --git a/end2end_test.go b/end2end_test.go index 53bba02b..47f3f4dd 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -60,6 +60,7 @@ var scenarioContext context.Context var total int var success int var failed int +var errArray [][]error type User struct { Name string @@ -1041,7 +1042,7 @@ func callsTheReplayMethodOnTheEventRepository(arg1 string) error { } factories := API.GetEntityFactories() - total, success, failed, err = eventRepo.ReplayEvents(context.Background(), time.Time{}, factories, projection) + total, success, failed, errArray = eventRepo.ReplayEvents(context.Background(), time.Time{}, factories, projection) if err != nil { return fmt.Errorf("error getting event store: %s", err) } @@ -1191,7 +1192,6 @@ func TestBDD(t *testing.T) { Options: &godog.Options{ Format: "pretty", Tags: "~skipped && ~long", - //Tags: "focus", //Tags: "WEOS-1110 && ~skipped", }, }.Run() diff --git a/model/event.go b/model/event.go index 3b6d5bc5..49a47e3c 100644 --- a/model/event.go +++ b/model/event.go @@ -101,7 +101,6 @@ type EventMeta struct { RootID string `json:"root_id"` Group string `json:"group"` Created string `json:"created"` - SchemaName string `json:"schema_name"` } func (e *Event) IsValid() bool { diff --git a/model/event_dispatcher.go b/model/event_dispatcher.go index ec9842c2..7a92e0c7 100644 --- a/model/event_dispatcher.go +++ b/model/event_dispatcher.go @@ -11,8 +11,10 @@ type DefaultEventDisptacher struct { dispatch sync.Mutex } -func (e *DefaultEventDisptacher) Dispatch(ctx context.Context, event Event) { +func (e *DefaultEventDisptacher) Dispatch(ctx context.Context, event Event) []error { //mutex helps keep state between routines + var errors []error + e.dispatch.Lock() defer e.dispatch.Unlock() var wg sync.WaitGroup @@ -26,11 +28,16 @@ func (e *DefaultEventDisptacher) Dispatch(ctx context.Context, event Event) { } wg.Done() }() - handler(ctx, event) + + err := handler(ctx, event) + if err != nil { + errors = append(errors, err) + } + }() } - wg.Wait() + return errors } func (e *DefaultEventDisptacher) AddSubscriber(handler EventHandler) { @@ -41,4 +48,4 @@ func (e *DefaultEventDisptacher) GetSubscribers() []EventHandler { return e.handlers } -type EventHandler func(ctx context.Context, event Event) +type EventHandler func(ctx context.Context, event Event) error diff --git a/model/event_dispatcher_test.go b/model/event_dispatcher_test.go index e9cdc87f..9820bd20 100644 --- a/model/event_dispatcher_test.go +++ b/model/event_dispatcher_test.go @@ -1,6 +1,7 @@ package model_test import ( + "fmt" weos "github.com/wepala/weos/model" "golang.org/x/net/context" "testing" @@ -19,15 +20,18 @@ func TestEventDisptacher_Dispatch(t *testing.T) { } dispatcher := &weos.DefaultEventDisptacher{} handlersCalled := 0 - dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) { + dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) error { handlersCalled += 1 + return nil }) - dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) { + dispatcher.AddSubscriber(func(ctx context.Context, event weos.Event) error { handlersCalled += 1 if event.Type != mockEvent.Type { t.Errorf("expected the type to be '%s', got '%s'", mockEvent.Type, event.Type) + return fmt.Errorf("expected the type to be '%s', got '%s'", mockEvent.Type, event.Type) } + return nil }) dispatcher.Dispatch(context.TODO(), *mockEvent) diff --git a/model/interfaces.go b/model/interfaces.go index 7fc327fb..76f87018 100644 --- a/model/interfaces.go +++ b/model/interfaces.go @@ -69,7 +69,7 @@ type EventRepository interface { GetByAggregateAndSequenceRange(ID string, start int64, end int64) ([]*Event, error) AddSubscriber(handler EventHandler) GetSubscribers() ([]EventHandler, error) - ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projection Projection) (int, int, int, error) + ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projection Projection) (int, int, int, [][]error) } type Datastore interface { diff --git a/model/mocks_test.go b/model/mocks_test.go index b98cb113..c6985683 100644 --- a/model/mocks_test.go +++ b/model/mocks_test.go @@ -55,7 +55,7 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, -// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { // panic("mock out the ReplayEvents method") // }, // } @@ -96,7 +96,7 @@ type EventRepositoryMock struct { PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error // ReplayEventsFunc mocks the ReplayEvents method. - ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) // calls tracks calls to the methods. calls struct { @@ -508,7 +508,7 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { } // ReplayEvents calls ReplayEventsFunc. -func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, error) { +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { if mock.ReplayEventsFunc == nil { panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") } diff --git a/model/module_test.go b/model/module_test.go index 21cb9012..93f67e8b 100644 --- a/model/module_test.go +++ b/model/module_test.go @@ -219,8 +219,8 @@ func TestWeOSApp_AddProjection(t *testing.T) { } mockProjection := &ProjectionMock{ GetEventHandlerFunc: func() weos.EventHandler { - return func(ctx context.Context, event weos.Event) { - + return func(ctx context.Context, event weos.Event) error { + return nil } }, MigrateFunc: func(ctx context.Context, builders map[string]dynamicstruct.Builder) error { diff --git a/model/repositories.go b/model/repositories.go index e9b7c402..35d46245 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -34,7 +34,6 @@ type GormEvent struct { ApplicationID string `gorm:"index"` User string `gorm:"index"` SequenceNo int64 - SchemaName string `gorm:"index"` } //NewGormEvent converts a domain event to something that is a bit easier for Gorm to work with @@ -54,7 +53,6 @@ func NewGormEvent(event *Event) (GormEvent, error) { ApplicationID: event.Meta.Module, User: event.Meta.User, SequenceNo: event.Meta.SequenceNo, - SchemaName: event.Meta.SchemaName, }, nil } @@ -86,8 +84,8 @@ func (e *EventRepositoryGorm) Persist(ctxt context.Context, entity AggregateInte if event.Meta.Group == "" { event.Meta.Group = e.GroupID } - if event.Meta.SchemaName == "" { - event.Meta.SchemaName = schemaName + if event.Meta.EntityType == "ContentEntity" || event.Meta.EntityType == "" { + event.Meta.EntityType = schemaName } if !event.IsValid() { for _, terr := range event.GetErrors() { @@ -297,7 +295,10 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { } //Content may not be applicable to this func since there would be an instance of it being called at server.go run. Therefore we won't have a "proper" content which would contain the EntityFactory -func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projections Projection) (int, int, int, error) { +func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projections Projection) (int, int, int, [][]error) { + var errors [][]error + var errArray []error + schemas := make(map[string]ds.Builder) for _, value := range entityFactories { @@ -315,13 +316,17 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, result := e.DB.Table("gorm_events").Find(&events) if result.Error != nil { e.logger.Errorf("got error pulling events '%s'", result.Error) - return 0, 0, 0, result.Error + errArray = append(errArray, result.Error) + errors = append(errors, errArray) + return 0, 0, 0, errors } } else { result := e.DB.Table("gorm_events").Where("created_at = ?", date).Find(&events) if result.Error != nil { e.logger.Errorf("got error pulling events '%s'", result.Error) - return 0, 0, 0, result.Error + errArray = append(errArray, result.Error) + errors = append(errors, errArray) + return 0, 0, 0, errors } } @@ -339,7 +344,6 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, Module: event.ApplicationID, User: event.User, SequenceNo: event.SequenceNo, - SchemaName: event.SchemaName, }, Version: 0, }) @@ -348,26 +352,20 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, totalEvents := len(tEvents) successfulEvents := 0 failedEvents := 0 - entity := map[string]interface{}{} for _, event := range tEvents { - newContext := context.WithValue(ctxt, context2.ENTITY_FACTORY, entityFactories[event.Meta.SchemaName]) - - result := e.DB.Table(event.Meta.SchemaName).Find(&entity, "weos_id = ? and sequence_no = ?", event.Meta.EntityID, event.Meta.SequenceNo) - if result.Error != nil { - e.logger.Errorf("got error pulling events '%s'", result.Error) - return 0, 0, 0, result.Error - } - - if result.RowsAffected != 0 { - failedEvents++ - } else if result.RowsAffected == 0 { - e.eventDispatcher.Dispatch(newContext, *event) + newContext := context.WithValue(ctxt, context2.ENTITY_FACTORY, entityFactories[event.Meta.EntityType]) + errArray = e.eventDispatcher.Dispatch(newContext, *event) + if len(errArray) == 0 { successfulEvents++ + } else { + errors = append(errors, errArray) + failedEvents++ } + } - return totalEvents, successfulEvents, failedEvents, nil + return totalEvents, successfulEvents, failedEvents, errors } func NewBasicEventRepository(gormDB *gorm.DB, logger Log, useUnitOfWork bool, accountID string, applicationID string) (EventRepository, error) { diff --git a/model/repositories_test.go b/model/repositories_test.go index e764f938..08b78699 100644 --- a/model/repositories_test.go +++ b/model/repositories_test.go @@ -20,10 +20,10 @@ func TestEventRepository_ReplayEvents(t *testing.T) { t.Fatal(err) } - schemaName := "Blog" + entityType := "Blog" factories := api.GetEntityFactories() - newContext := context.WithValue(ctx, weoscontext.ENTITY_FACTORY, factories[schemaName]) + newContext := context.WithValue(ctx, weoscontext.ENTITY_FACTORY, factories[entityType]) mockPayload1 := map[string]interface{}{"weos_id": "12345", "sequence_no": int64(1), "title": "Test Blog", "url": "testing.com"} entity1 := &model.ContentEntity{ @@ -105,8 +105,8 @@ func TestEventRepository_ReplayEvents(t *testing.T) { t.Run("replay events - existing data", func(t *testing.T) { total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) - if err != nil { - t.Fatal(err) + if err == nil { + t.Fatalf("expected there to be errors (unique constraint)") } if total != 3 { @@ -124,8 +124,6 @@ func TestEventRepository_ReplayEvents(t *testing.T) { t.Run("replay events - remove rows", func(t *testing.T) { output := map[string]interface{}{} - //apiProjection := projection.(*projections.GORMProjection) - searchResult := eventRepo.DB.Table("Blog").Where("weos_id = ?", "12345").Delete(&output) if searchResult.Error != nil { t.Fatal(searchResult.Error) @@ -137,8 +135,8 @@ func TestEventRepository_ReplayEvents(t *testing.T) { } total, successful, failed, err := eventRepo.ReplayEvents(ctx, time.Time{}, factories, projection) - if err != nil { - t.Fatal(err) + if err == nil { + t.Fatalf("expected there to be errors (unique constraint)") } if total != 3 { diff --git a/projections/gorm.go b/projections/gorm.go index 085d858a..209faa19 100644 --- a/projections/gorm.go +++ b/projections/gorm.go @@ -123,7 +123,7 @@ func (p *GORMProjection) Migrate(ctx context.Context, builders map[string]ds.Bui } func (p *GORMProjection) GetEventHandler() weos.EventHandler { - return func(ctx context.Context, event weos.Event) { + return func(ctx context.Context, event weos.Event) error { entityFactory := weos.GetEntityFactory(ctx) switch event.Type { case "create": @@ -132,12 +132,14 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { entity, err := entityFactory.NewEntity(ctx) if err != nil { p.logger.Errorf("error get a copy of the entity '%s'", err) + return err } eventPayload := entity.Property mapPayload := map[string]interface{}{} err = json.Unmarshal(event.Payload, &mapPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } mapPayload["sequence_no"] = event.Meta.SequenceNo @@ -145,11 +147,13 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { err = json.Unmarshal(bytes, &eventPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } db := p.db.Table(entityFactory.Name()).Create(eventPayload) if db.Error != nil { p.logger.Errorf("error creating %s, got %s", entityFactory.Name(), db.Error) + return db.Error } } case "update": @@ -157,12 +161,14 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { entity, err := entityFactory.NewEntity(ctx) if err != nil { p.logger.Errorf("error creating entity '%s'", err) + return err } eventPayload := entity.Property mapPayload := map[string]interface{}{} err = json.Unmarshal(event.Payload, &mapPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } //set sequence number @@ -172,6 +178,7 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { err = json.Unmarshal(bytes, &eventPayload) if err != nil { p.logger.Errorf("error unmarshalling event '%s'", err) + return err } reader := ds.NewReader(eventPayload) @@ -184,6 +191,7 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { err = p.db.Model(eventPayload).Association(strings.Title(key)).Replace(field.Interface()) if err != nil { p.logger.Errorf("error clearing association %s for %s, got %s", strings.Title(key), entityFactory.Name(), err) + return err } } } @@ -192,6 +200,7 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { db := p.db.Table(entityFactory.Name()).Updates(eventPayload) if db.Error != nil { p.logger.Errorf("error creating %s, got %s", entityFactory.Name(), db.Error) + return db.Error } } case "delete": @@ -199,14 +208,17 @@ func (p *GORMProjection) GetEventHandler() weos.EventHandler { entity, err := entityFactory.NewEntity(ctx) if err != nil { p.logger.Errorf("error creating entity '%s'", err) + return err } db := p.db.Table(entityFactory.Name()).Where("weos_id = ?", event.Meta.EntityID).Delete(entity.Property) if db.Error != nil { p.logger.Errorf("error deleting %s, got %s", entityFactory.Name(), db.Error) + return db.Error } } } + return nil } } From b1cbf2b1ee5bf4375f62343158b1885747baaa8a Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Thu, 10 Feb 2022 11:35:37 -0400 Subject: [PATCH 13/18] feature:WEOS-1327 - Updated E2E test --- end2end_test.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/end2end_test.go b/end2end_test.go index 47f3f4dd..87212e59 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1077,17 +1077,12 @@ func theTableShouldBePopulatedWith(contentType string, details *godog.Table) err compare[head[n].Value] = cell.Value } - for key, value := range compare { - apiProjection, err := API.GetProjection("Default") - if err != nil { - return fmt.Errorf("unexpected error getting projection: %s", err) - } - apiProjection1 := apiProjection.(*projections.GORMProjection) - result = apiProjection1.DB().Table(strings.Title(contentType)).Find(&contentEntity, key+" = ?", value) - if contentEntity != nil { - break - } + apiProjection, err := API.GetProjection("Default") + if err != nil { + return fmt.Errorf("unexpected error getting projection: %s", err) } + apiProjection1 := apiProjection.(*projections.GORMProjection) + result = apiProjection1.DB().Table(strings.Title(contentType)).Find(&contentEntity, "weos_ID = ?", compare["weos_id"]) if contentEntity == nil { return fmt.Errorf("unexpected error finding content type in db") @@ -1191,7 +1186,8 @@ func TestBDD(t *testing.T) { TestSuiteInitializer: InitializeSuite, Options: &godog.Options{ Format: "pretty", - Tags: "~skipped && ~long", + //Tags: "~skipped && ~long", + Tags: "focus", //Tags: "WEOS-1110 && ~skipped", }, }.Run() From bb43e86c918c9f1bc6318664351c95cba78243f8 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Thu, 10 Feb 2022 13:45:39 -0400 Subject: [PATCH 14/18] feature:WEOS-1327 - Made requested changes from pr --- controllers/rest/weos_mocks_test.go | 6 +++--- end2end_test.go | 6 +++--- model/interfaces.go | 2 +- model/mocks_test.go | 6 +++--- model/repositories.go | 12 +++++------- 5 files changed, 15 insertions(+), 17 deletions(-) diff --git a/controllers/rest/weos_mocks_test.go b/controllers/rest/weos_mocks_test.go index 70cd6bcd..0c8b90b6 100644 --- a/controllers/rest/weos_mocks_test.go +++ b/controllers/rest/weos_mocks_test.go @@ -55,7 +55,7 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, -// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { // panic("mock out the ReplayEvents method") // }, // } @@ -96,7 +96,7 @@ type EventRepositoryMock struct { PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error // ReplayEventsFunc mocks the ReplayEvents method. - ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) // calls tracks calls to the methods. calls struct { @@ -508,7 +508,7 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { } // ReplayEvents calls ReplayEventsFunc. -func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { if mock.ReplayEventsFunc == nil { panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") } diff --git a/end2end_test.go b/end2end_test.go index 87212e59..8f9f7933 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -60,7 +60,7 @@ var scenarioContext context.Context var total int var success int var failed int -var errArray [][]error +var errArray []error type User struct { Name string @@ -1186,8 +1186,8 @@ func TestBDD(t *testing.T) { TestSuiteInitializer: InitializeSuite, Options: &godog.Options{ Format: "pretty", - //Tags: "~skipped && ~long", - Tags: "focus", + Tags: "~skipped && ~long", + //Tags: "focus", //Tags: "WEOS-1110 && ~skipped", }, }.Run() diff --git a/model/interfaces.go b/model/interfaces.go index 76f87018..55dd09c7 100644 --- a/model/interfaces.go +++ b/model/interfaces.go @@ -69,7 +69,7 @@ type EventRepository interface { GetByAggregateAndSequenceRange(ID string, start int64, end int64) ([]*Event, error) AddSubscriber(handler EventHandler) GetSubscribers() ([]EventHandler, error) - ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projection Projection) (int, int, int, [][]error) + ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projection Projection) (int, int, int, []error) } type Datastore interface { diff --git a/model/mocks_test.go b/model/mocks_test.go index c6985683..2a4c6a4d 100644 --- a/model/mocks_test.go +++ b/model/mocks_test.go @@ -55,7 +55,7 @@ var _ model.EventRepository = &EventRepositoryMock{} // PersistFunc: func(ctxt context.Context, entity model.AggregateInterface) error { // panic("mock out the Persist method") // }, -// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { +// ReplayEventsFunc: func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { // panic("mock out the ReplayEvents method") // }, // } @@ -96,7 +96,7 @@ type EventRepositoryMock struct { PersistFunc func(ctxt context.Context, entity model.AggregateInterface) error // ReplayEventsFunc mocks the ReplayEvents method. - ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) + ReplayEventsFunc func(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) // calls tracks calls to the methods. calls struct { @@ -508,7 +508,7 @@ func (mock *EventRepositoryMock) PersistCalls() []struct { } // ReplayEvents calls ReplayEventsFunc. -func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, [][]error) { +func (mock *EventRepositoryMock) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]model.EntityFactory, projection model.Projection) (int, int, int, []error) { if mock.ReplayEventsFunc == nil { panic("EventRepositoryMock.ReplayEventsFunc: method is nil but EventRepository.ReplayEvents was just called") } diff --git a/model/repositories.go b/model/repositories.go index 35d46245..3c998dd6 100644 --- a/model/repositories.go +++ b/model/repositories.go @@ -295,8 +295,8 @@ func (e *EventRepositoryGorm) Remove(entities []Entity) error { } //Content may not be applicable to this func since there would be an instance of it being called at server.go run. Therefore we won't have a "proper" content which would contain the EntityFactory -func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projections Projection) (int, int, int, [][]error) { - var errors [][]error +func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, entityFactories map[string]EntityFactory, projections Projection) (int, int, int, []error) { + var errors []error var errArray []error schemas := make(map[string]ds.Builder) @@ -316,16 +316,14 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, result := e.DB.Table("gorm_events").Find(&events) if result.Error != nil { e.logger.Errorf("got error pulling events '%s'", result.Error) - errArray = append(errArray, result.Error) - errors = append(errors, errArray) + errors = append(errors, result.Error) return 0, 0, 0, errors } } else { result := e.DB.Table("gorm_events").Where("created_at = ?", date).Find(&events) if result.Error != nil { e.logger.Errorf("got error pulling events '%s'", result.Error) - errArray = append(errArray, result.Error) - errors = append(errors, errArray) + errors = append(errors, result.Error) return 0, 0, 0, errors } } @@ -360,7 +358,7 @@ func (e *EventRepositoryGorm) ReplayEvents(ctxt context.Context, date time.Time, if len(errArray) == 0 { successfulEvents++ } else { - errors = append(errors, errArray) + errors = append(errors, errArray...) failedEvents++ } From da579ebf2507c05e0631b97146adc74f394cde08 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Fri, 11 Feb 2022 09:04:12 -0400 Subject: [PATCH 15/18] feature:WEOS-1327 - Changed the replay flag to false by default --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index e1361a60..94e71f2c 100644 --- a/server.go +++ b/server.go @@ -8,7 +8,7 @@ import ( var port = flag.String("port", "8681", "-port=8681") var schema = flag.String("spec", "./api.yaml", "schema for initialization") -var replay = flag.Bool("replay events", true, "replay events from gorm events") +var replay = flag.Bool("replay events", false, "replay events from gorm events") func main() { flag.Parse() From 9c072d8442af67e454bc9148ac140f98b274fad7 Mon Sep 17 00:00:00 2001 From: akeemphilbert Date: Fri, 11 Feb 2022 13:44:51 -0400 Subject: [PATCH 16/18] feature: WEOS-1327 Removed scenario that was tricky to implement * Recovering a table that was partially updates is tricky --- .../hydrate-datastore-from-events.feature | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/features/hydrate-datastore-from-events.feature b/features/hydrate-datastore-from-events.feature index 9907c2b8..76b2c6e2 100644 --- a/features/hydrate-datastore-from-events.feature +++ b/features/hydrate-datastore-from-events.feature @@ -257,33 +257,3 @@ Feature: Hydrate database using events | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | And the total no. events and processed and failures should be returned - - @WEOS-1327 - Scenario: Repair data tables after some was deleted - - @WEOS-1327 @focus - Scenario: Repair tables after some content has been deleted - Given a "Blog" with id "1237" was deleted - And a "Blog" with id "164" was deleted - When "Sojourner" calls the replay method on the event repository - Then the "Blog" table should be populated with - | id | weos_id | sequence_no | title | description | - | 1 | 24Kj7ExtIFvuGgTOTLBgpZgCl0n | 2 | Blog 1 | Some Blog | - | 2 | 24KjDkwfmp8PCslCQ6Detx6yr1N | 1 | Blog 2 | Some Blog 2 | - | 164 | 24KjFbp82wGq4qb5LAxLdA5GbR2 | 1 | Blog 6 | Some Blog 6 | - | 3 | 24KjHaQbjEv0ZxfKxFup1dI6iKP | 4 | Blog 3 | Some Blog 3 | - | 4 | 24KjIq8KJhIhWa7d8sNJhRilGpA | 1 | Blog 4 | Some Blog 4 | - | 5 | 24KjLAP17p3KvTy5YCMWUIRlOSS | 1 | Blog 5 | Some Blog 5 | - | 890 | 24KjMP9uTPxW5Xuhziv1balYskX | 1 | Blog 7 | Some Blog 7 | - | 1237 | 24KjNifBFHrIQcfEe2QCaiHXd22 | 1 | Blog 8 | Some Blog 8 | - And the total no. events and processed and failures should be returned - - Scenario: Continue loading events if error occurs - - The event that failed should be logged out WITHOUT the payload - - - - Scenario: Repair specific schemas - - Scenario: Set database to a specific state on a given date \ No newline at end of file From 9086ecb0389b2fe1075d3f23e9cfcfa64be25614 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Fri, 11 Feb 2022 13:49:48 -0400 Subject: [PATCH 17/18] feature:WEOS-1327 - Removed step for delete single id --- end2end_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/end2end_test.go b/end2end_test.go index 21f83d3f..2b22fe82 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1088,22 +1088,6 @@ func aFilterOnTheFieldNeWithValue(field, value string) error { return nil } -func aWithIdWasDeleted(contentEntity, id string) error { - output := map[string]interface{}{} - - apiProjection, err := API.GetProjection("Default") - if err != nil { - return fmt.Errorf("unexpected error getting projection: %s", err) - } - apiProjection1 := apiProjection.(*projections.GORMProjection) - - searchResult := apiProjection1.DB().Table(strings.Title(contentEntity)).Where("id = ?", id).Delete(&output) - if searchResult.Error != nil { - return fmt.Errorf("got error from db query: %s", searchResult.Error) - } - return nil -} - func callsTheReplayMethodOnTheEventRepository(arg1 string) error { repo, err := API.GetEventStore("Default") if err != nil { @@ -1253,7 +1237,6 @@ func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Step(`^a filter on the field "([^"]*)" "like" with value "([^"]*)"$`, aFilterOnTheFieldLikeWithValue) ctx.Step(`^a filter on the field "([^"]*)" "lt" with value "([^"]*)"$`, aFilterOnTheFieldLtWithValue) ctx.Step(`^a filter on the field "([^"]*)" "ne" with value "([^"]*)"$`, aFilterOnTheFieldNeWithValue) - ctx.Step(`^a "([^"]*)" with id "([^"]*)" was deleted$`, aWithIdWasDeleted) ctx.Step(`^"([^"]*)" calls the replay method on the event repository$`, callsTheReplayMethodOnTheEventRepository) ctx.Step(`^Sojourner" deletes the "([^"]*)" table$`, sojournerDeletesTheTable) ctx.Step(`^the "([^"]*)" table should be populated with$`, theTableShouldBePopulatedWith) From e4562f8c2cb20bbbeb486c4f93b2e0329050b280 Mon Sep 17 00:00:00 2001 From: RandyDeo Date: Mon, 14 Feb 2022 12:57:36 -0400 Subject: [PATCH 18/18] feature:WEOS-1327 - Merged Dev and did some changes (gormdb) --- api.yaml | 6 ++++++ end2end_test.go | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/api.yaml b/api.yaml index 09525f85..87bf46c1 100755 --- a/api.yaml +++ b/api.yaml @@ -295,6 +295,12 @@ paths: application/json: schema: $ref: "#/components/schemas/Blog" + application/x-www-form-urlencoded: + schema: + $ref: "#/components/schemas/Blog" + multipart/form-data: + schema: + $ref: "#/components/schemas/Blog" responses: 200: description: Update Blog diff --git a/end2end_test.go b/end2end_test.go index 31a0d40f..a4ab171f 100644 --- a/end2end_test.go +++ b/end2end_test.go @@ -1114,7 +1114,7 @@ func sojournerDeletesTheTable(tableName string) error { if err != nil { return fmt.Errorf("unexpected error getting projection: %s", err) } - apiProjection1 := apiProjection.(*projections.GORMProjection) + apiProjection1 := apiProjection.(*projections.GORMDB) result := apiProjection1.DB().Migrator().DropTable(strings.Title(tableName)) if result != nil { @@ -1139,7 +1139,7 @@ func theTableShouldBePopulatedWith(contentType string, details *godog.Table) err if err != nil { return fmt.Errorf("unexpected error getting projection: %s", err) } - apiProjection1 := apiProjection.(*projections.GORMProjection) + apiProjection1 := apiProjection.(*projections.GORMDB) result = apiProjection1.DB().Table(strings.Title(contentType)).Find(&contentEntity, "weos_ID = ?", compare["weos_id"]) if contentEntity == nil {