Skip to content

Commit

Permalink
fix(command/cmdbus): track dispatch before publishing event
Browse files Browse the repository at this point in the history
fix(builtin_test.go): prevent panic on context.Canceled error to avoid unnecessary panic when context is cancelled
fix(handler_test.go): replace context.WithTimeout with context.WithCancel to avoid unnecessary timeout
fix(bus.go): change WithArtificialDelay return type to Option for better type safety
  • Loading branch information
bounoable committed Nov 6, 2023
1 parent 5cf3f58 commit 6e491ac
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
5 changes: 4 additions & 1 deletion command/builtin/builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package builtin_test

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -297,7 +298,9 @@ func TestDeleteAggregate_CustomEvent_MatchAll(t *testing.T) {

func panicOn(errs <-chan error) {
for err := range errs {
panic(err)
if !errors.Is(err, context.Canceled) {
panic(err)
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions command/cmdbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,6 @@ func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts
Payload: load,
})

b.debugLog("publishing %q event ...", evt.Name())

if err := b.bus.Publish(ctx, evt.Any()); err != nil {
return fmt.Errorf("publish %q event: %w", evt.Name(), err)
}

out := make(chan error)
accepted := make(chan struct{})
aborted := make(chan struct{})
Expand All @@ -313,6 +307,12 @@ func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts

defer b.cleanupDispatch(cmd.ID())

b.debugLog("publishing %q event ...", evt.Name())

if err := b.bus.Publish(ctx, evt.Any()); err != nil {
return fmt.Errorf("publish %q event: %w", evt.Name(), err)
}

var timeout <-chan time.Time
if b.assignTimeout > 0 {
timer := time.NewTimer(b.assignTimeout)
Expand Down
6 changes: 2 additions & 4 deletions command/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func TestHandler_Handle(t *testing.T) {
enc := newEncoder()
ebus := eventbus.New()
subBus := cmdbus.New[int](enc, ebus)
pubBus := cmdbus.New[int](enc, ebus)
pubBus := cmdbus.New[int](enc, ebus, cmdbus.AssignTimeout(0))
h := command.NewHandler[any](subBus)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

handled := make(chan command.Command)
Expand Down Expand Up @@ -52,9 +52,7 @@ func TestHandler_Handle(t *testing.T) {
case err, ok := <-errs:
if ok {
t.Fatal(err)
break
}
break
case h := <-handled:
if h.ID() != cmd.ID() || h.Name() != cmd.Name() || !reflect.DeepEqual(h.Payload(), cmd.Payload()) {
t.Fatalf("handled Command differs from dispatched Command. want=%v got=%v", cmd, h)
Expand Down
2 changes: 1 addition & 1 deletion event/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Option func(*chanbus)
// the rate of event publishing. The delay duration is specified by the provided
// time.Duration value. The function returns an Option that can be used to
// configure a chanbus instance.
func WithArtificialDelay(delay time.Duration) func(*chanbus) {
func WithArtificialDelay(delay time.Duration) Option {
return func(c *chanbus) {
c.artificialDelay = delay
}
Expand Down

0 comments on commit 6e491ac

Please sign in to comment.