Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Event Handler groups and new CQRS public API #367

Merged
merged 34 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
23422ab
[cqrs] added support for handler groups
roblaszczak Jun 5, 2023
ee51b3b
[forwarder] allow to pass external router
roblaszczak Jun 5, 2023
9a279b0
[fanout] closing router before closing handlers
roblaszczak Jun 5, 2023
0e68e66
update Go to 1.20
roblaszczak Jun 5, 2023
703218f
added OnSend and OnProcess for CommandBus/EventBus + unify Command an…
roblaszczak Jun 8, 2023
5002651
improve commands and events configs
roblaszczak Jun 8, 2023
e932252
rework topic generation functions
roblaszczak Jun 8, 2023
c7180a6
cleanups of commands/events configs
roblaszczak Jun 8, 2023
1ab3d08
simplify backward-compatible functions
roblaszczak Jun 8, 2023
5fb1849
added and updated tests
roblaszczak Jun 10, 2023
c588ba1
added missing tests
roblaszczak Jun 10, 2023
7135a23
cleanups and docs
roblaszczak Jun 20, 2023
10c30f3
more cleanups
roblaszczak Jun 20, 2023
b6bd8f2
added missing tests
roblaszczak Jun 20, 2023
c0e20f1
added missing tests
roblaszczak Jun 20, 2023
b41f6a2
added missing CommandName to params
roblaszczak Jun 20, 2023
5d06d3a
update example
roblaszczak Jun 20, 2023
e12fa6d
update CQRS docs
roblaszczak Jun 20, 2023
0ad663f
update CQRS docs
roblaszczak Jun 20, 2023
09b4f0f
improve configuration structure
roblaszczak Jun 22, 2023
6eb02dc
improve naming
roblaszczak Jun 22, 2023
76b5c23
update cqrs example
roblaszczak Jun 22, 2023
d7fd704
removed not needed check
roblaszczak Jun 22, 2023
ea045fb
add log with handlers count
roblaszczak Jun 22, 2023
a48ff5e
remove deprecated rand.Seed
roblaszczak Jun 22, 2023
1fb6fee
Merge branch 'master' into event-handler-groups-cleanup
roblaszczak Jun 22, 2023
0911a7e
fix lint & tests
roblaszczak Jun 22, 2023
526b704
Merge remote-tracking branch 'origin/event-handler-groups-cleanup' in…
roblaszczak Jun 22, 2023
8e0bbce
Apply suggestions from code review
roblaszczak Jun 23, 2023
14fd999
apply code review comments
roblaszczak Jun 23, 2023
7fe0ca7
adding command/event handlers to router automatically
roblaszczak Jun 23, 2023
66ce909
update docs and example
roblaszczak Jun 23, 2023
28b215c
fix docs indents
roblaszczak Jun 24, 2023
461e392
Apply suggestions from code review
roblaszczak Jun 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions _examples/basic/2-realtime-feed/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ func main() {
logger := watermill.NewStdLogger(false, false)
logger.Info("Starting the producer", watermill.LogFields{})

rand.Seed(time.Now().Unix())

publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: brokers,
Expand Down
2 changes: 1 addition & 1 deletion _examples/basic/5-cqrs-protobuf/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
golang:
image: golang:1.19
image: golang:1.20
restart: unless-stopped
ports:
- 8080:8080
Expand Down
2 changes: 1 addition & 1 deletion _examples/basic/5-cqrs-protobuf/go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module main.go

require (
github.com/ThreeDotsLabs/watermill v1.2.0-rc.11
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1
Expand Down
8 changes: 8 additions & 0 deletions _examples/basic/5-cqrs-protobuf/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 h1:tQJ3L/AnfliXaxaq+ElHOfzi0Vx+AN8cAnIOLcUTrxo=
github.com/ThreeDotsLabs/watermill v1.2.0-rc.11/go.mod h1:QLZSaklpSZ/7yv288LL2DFOgCEi86VYEmQvzmaMlHoA=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620142403-c0e20f18aef0 h1:920Tfprg3Lwn7ieUOtnSZSz73UhZLeqzkf/fozF5vZ4=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620142403-c0e20f18aef0/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620191859-b41f6a2770be h1:7c3tZkJ3w2jB0S9xRkUvMNVUD/AE49c6wAFr1AyMy/g=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620191859-b41f6a2770be/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230622094202-6eb02dc0b0b8 h1:S+6+P94VcfyiJCtmP6q8mGoMuSGmeAZc8lfIbDJ1R/E=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230622094202-6eb02dc0b0b8/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc h1:j8hIjk/pE05TmiEoiljsJPTZCL/4OJEUifWoFlMs0HI=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7 h1:AUSXLqdsA1LXWuoQSkIRG9FhMo6EYM9GSgd+bnf1W0w=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7/go.mod h1:DKOBUoMVtPMV8jBEbRP3NL6TgnOMyRvVza3W297cdqU=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
Expand Down
163 changes: 128 additions & 35 deletions _examples/basic/5-cqrs-protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,56 +207,149 @@ func main() {
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)

// cqrs.Facade is facade for Command and Event buses and processors.
// You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
GenerateCommandsTopic: func(commandName string) string {
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return commandName
return params.CommandName, nil
},
CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
return []cqrs.CommandHandler{
BookRoomHandler{eb},
OrderBeerHandler{eb},
}
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})

params.Message.Metadata.Set("sent_at", time.Now().String())

return nil
},
CommandsPublisher: commandsPublisher,
CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}

commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()

err := params.Handler.Handle(params.Message.Context(), params.Command)

logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})

return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
GenerateEventsTopic: func(eventName string) string {
)
if err != nil {
panic(err)
}

eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events"
return "events", nil

// we can also use topic per event type
// return eventName
// return params.EventName, nil
},
EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
OrderBeerOnRoomBooked{cb},
NewBookingsFinancialReport(),
}
},
EventsPublisher: eventsPublisher,
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(handlerName),
)

return amqp.NewSubscriber(config, logger)

OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})

params.Message.Metadata.Set("published_at", time.Now().String())

return nil
},
Router: router,
CommandEventMarshaler: cqrsMarshaler,
Logger: logger,

Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}

eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)

return amqp.NewSubscriber(config, logger)
},

OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()

err := params.Handler.Handle(params.Message.Context(), params.Event)

logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})

return err
},

Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}

err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}

err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
m110 marked this conversation as resolved.
Show resolved Hide resolved

NewBookingsFinancialReport(),
m110 marked this conversation as resolved.
Show resolved Hide resolved

cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}

// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(cqrsFacade.CommandBus())
go publishCommands(commandBus)

// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions _examples/real-world-examples/consumer-groups/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"context"
"math/rand"
"net/http"
"sync"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream"
Expand All @@ -16,7 +14,6 @@ import (
)

func main() {
rand.Seed(time.Now().UnixNano())
logger := watermill.NewStdLogger(false, false)

router, err := message.NewRouter(message.RouterConfig{}, logger)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package main

import (
"math/rand"
"net/http"
"time"

"github.com/ThreeDotsLabs/watermill"
)

func main() {
rand.Seed(time.Now().Unix())

logger := watermill.NewStdLogger(false, false)

postsStorage := NewPostsStorage()
Expand Down
Loading