Skip to content

Commit

Permalink
fix: Guarantee event processing order (#1352)
Browse files Browse the repository at this point in the history
* Guarantee event processing order

* Make Close synchronous

This fixes #1349 where events are attempted to be handled after database close
  • Loading branch information
AndrewSisley authored Apr 14, 2023
1 parent a5cf397 commit 5b08fa0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 35 deletions.
4 changes: 2 additions & 2 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ var _ Channel[int] = (*simpleChannel[int])(nil)
//
// At the moment this will always return a new simpleChannel, however that may change in
// the future as this feature gets fleshed out.
func New[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] {
return NewSimpleChannel[T](subscriberBufferSize, eventBufferSize)
func New[T any](commandBufferSize int, eventBufferSize int) Channel[T] {
return NewSimpleChannel[T](commandBufferSize, eventBufferSize)
}

// Events hold the supported event types
Expand Down
80 changes: 47 additions & 33 deletions events/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,39 @@
package events

type simpleChannel[T any] struct {
subscribers []chan T
subscriptionChannel chan chan T
unsubscribeChannel chan chan T
eventChannel chan T
eventBufferSize int
closeChannel chan struct{}
isClosed bool
subscribers []chan T
// commandChannel manages all commands sent to this simpleChannel.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// WARNING: This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed bool
}

// NewSimpleChannel creates a new simpleChannel with the given subscriberBufferSize and
type subscribeCommand[T any] Subscription[T]

type unsubscribeCommand[T any] Subscription[T]

type publishCommand[T any] struct {
item T
}

type closeCommand struct{}

// NewSimpleChannel creates a new simpleChannel with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled subsequent calls to functions on this object may start to block.
func NewSimpleChannel[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] {
func NewSimpleChannel[T any](commandBufferSize int, eventBufferSize int) Channel[T] {
c := simpleChannel[T]{
subscriptionChannel: make(chan chan T, subscriberBufferSize),
unsubscribeChannel: make(chan chan T, subscriberBufferSize),
eventChannel: make(chan T, eventBufferSize),
eventBufferSize: eventBufferSize,
closeChannel: make(chan struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}

go c.handleChannel()
Expand All @@ -46,50 +59,54 @@ func (c *simpleChannel[T]) Subscribe() (Subscription[T], error) {
// It is important to set this buffer size too, else we may end up blocked in the handleChannel func
ch := make(chan T, c.eventBufferSize)

c.subscriptionChannel <- ch
c.commandChannel <- subscribeCommand[T](ch)
return ch, nil
}

func (c *simpleChannel[T]) Unsubscribe(ch Subscription[T]) {
if c.isClosed {
return
}
c.unsubscribeChannel <- ch
c.commandChannel <- unsubscribeCommand[T](ch)
}

func (c *simpleChannel[T]) Publish(item T) {
if c.isClosed {
return
}
c.eventChannel <- item
c.commandChannel <- publishCommand[T]{item}
}

func (c *simpleChannel[T]) Close() {
if c.isClosed {
return
}
c.isClosed = true
c.closeChannel <- struct{}{}
c.commandChannel <- closeCommand{}

// Wait for the close command to be handled, in order, before returning
<-c.hasClosedChan
}

func (c *simpleChannel[T]) handleChannel() {
for {
select {
case <-c.closeChannel:
close(c.closeChannel)
for cmd := range c.commandChannel {
switch command := cmd.(type) {
case closeCommand:
for _, subscriber := range c.subscribers {
close(subscriber)
}
close(c.subscriptionChannel)
close(c.unsubscribeChannel)
close(c.eventChannel)
close(c.commandChannel)
close(c.hasClosedChan)
return

case ch := <-c.unsubscribeChannel:
case subscribeCommand[T]:
c.subscribers = append(c.subscribers, command)

case unsubscribeCommand[T]:
var isFound bool
var index int
for i, subscriber := range c.subscribers {
if ch == subscriber {
if command == subscriber {
index = i
isFound = true
break
Expand All @@ -103,14 +120,11 @@ func (c *simpleChannel[T]) handleChannel() {
c.subscribers[index] = c.subscribers[len(c.subscribers)-1]
c.subscribers = c.subscribers[:len(c.subscribers)-1]

close(ch)

case newSubscriber := <-c.subscriptionChannel:
c.subscribers = append(c.subscribers, newSubscriber)
close(command)

case item := <-c.eventChannel:
case publishCommand[T]:
for _, subscriber := range c.subscribers {
subscriber <- item
subscriber <- command.item
}
}
}
Expand Down

0 comments on commit 5b08fa0

Please sign in to comment.