From a761abc4b143b13d8fc55a5efd362d8ae52cdfa2 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 13 Apr 2023 17:00:28 -0400 Subject: [PATCH 1/4] Guarantee event processing order --- events/events.go | 4 +-- events/simple.go | 75 +++++++++++++++++++++++++++--------------------- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/events/events.go b/events/events.go index 1113cad3ab..4f910ab454 100644 --- a/events/events.go +++ b/events/events.go @@ -41,8 +41,8 @@ var _ Channel[int] = (*simpleChannel[int])(nil) // // At the moment this will always return a new simpleChannel, however that may change in // the future as this feature gets fleshed out. -func New[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] { - return NewSimpleChannel[T](subscriberBufferSize, eventBufferSize) +func New[T any](commandBufferSize int, eventBufferSize int) Channel[T] { + return NewSimpleChannel[T](commandBufferSize, eventBufferSize) } // Events hold the supported event types diff --git a/events/simple.go b/events/simple.go index 59777a857f..eca6539003 100644 --- a/events/simple.go +++ b/events/simple.go @@ -11,26 +11,38 @@ package events type simpleChannel[T any] struct { - subscribers []chan T - subscriptionChannel chan chan T - unsubscribeChannel chan chan T - eventChannel chan T - eventBufferSize int - closeChannel chan struct{} - isClosed bool + subscribers []chan T + // commandChannel manages all commands sent to this simpleChannel. + // + // It is important that all stuff gets sent through this single channel to ensure + // that the order of operations is preserved. + commandChannel chan any + eventBufferSize int + isClosed bool } -// NewSimpleChannel creates a new simpleChannel with the given subscriberBufferSize and +type subscribeCommand[T any] struct { + subscriptionChannel Subscription[T] +} + +type unsubscribeCommand[T any] struct { + subscriptionChannel Subscription[T] +} + +type publishCommand[T any] struct { + item T +} + +type closeCommand struct{} + +// NewSimpleChannel creates a new simpleChannel with the given commandBufferSize and // eventBufferSize. // // Should the buffers be filled subsequent calls to functions on this object may start to block. -func NewSimpleChannel[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] { +func NewSimpleChannel[T any](commandBufferSize int, eventBufferSize int) Channel[T] { c := simpleChannel[T]{ - subscriptionChannel: make(chan chan T, subscriberBufferSize), - unsubscribeChannel: make(chan chan T, subscriberBufferSize), - eventChannel: make(chan T, eventBufferSize), - eventBufferSize: eventBufferSize, - closeChannel: make(chan struct{}), + commandChannel: make(chan any, commandBufferSize), + eventBufferSize: eventBufferSize, } go c.handleChannel() @@ -46,7 +58,7 @@ func (c *simpleChannel[T]) Subscribe() (Subscription[T], error) { // It is important to set this buffer size too, else we may end up blocked in the handleChannel func ch := make(chan T, c.eventBufferSize) - c.subscriptionChannel <- ch + c.commandChannel <- subscribeCommand[T]{ch} return ch, nil } @@ -54,14 +66,14 @@ func (c *simpleChannel[T]) Unsubscribe(ch Subscription[T]) { if c.isClosed { return } - c.unsubscribeChannel <- ch + c.commandChannel <- unsubscribeCommand[T]{ch} } func (c *simpleChannel[T]) Publish(item T) { if c.isClosed { return } - c.eventChannel <- item + c.commandChannel <- publishCommand[T]{item} } func (c *simpleChannel[T]) Close() { @@ -69,27 +81,27 @@ func (c *simpleChannel[T]) Close() { return } c.isClosed = true - c.closeChannel <- struct{}{} + c.commandChannel <- closeCommand{} } func (c *simpleChannel[T]) handleChannel() { - for { - select { - case <-c.closeChannel: - close(c.closeChannel) + for cmd := range c.commandChannel { + switch command := cmd.(type) { + case closeCommand: for _, subscriber := range c.subscribers { close(subscriber) } - close(c.subscriptionChannel) - close(c.unsubscribeChannel) - close(c.eventChannel) + close(c.commandChannel) return - case ch := <-c.unsubscribeChannel: + case subscribeCommand[T]: + c.subscribers = append(c.subscribers, command.subscriptionChannel) + + case unsubscribeCommand[T]: var isFound bool var index int for i, subscriber := range c.subscribers { - if ch == subscriber { + if command.subscriptionChannel == subscriber { index = i isFound = true break @@ -103,14 +115,11 @@ func (c *simpleChannel[T]) handleChannel() { c.subscribers[index] = c.subscribers[len(c.subscribers)-1] c.subscribers = c.subscribers[:len(c.subscribers)-1] - close(ch) - - case newSubscriber := <-c.subscriptionChannel: - c.subscribers = append(c.subscribers, newSubscriber) + close(command.subscriptionChannel) - case item := <-c.eventChannel: + case publishCommand[T]: for _, subscriber := range c.subscribers { - subscriber <- item + subscriber <- command.item } } } From 5e6928820d6d19796720a8802c1b3295e51a4b3e Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 13 Apr 2023 17:45:22 -0400 Subject: [PATCH 2/4] Make Close synchronous This fixes #1349 where events are attempted to be handled after database close --- events/simple.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/events/simple.go b/events/simple.go index eca6539003..08bacc1f4b 100644 --- a/events/simple.go +++ b/events/simple.go @@ -18,6 +18,7 @@ type simpleChannel[T any] struct { // that the order of operations is preserved. commandChannel chan any eventBufferSize int + hasClosedChan chan struct{} isClosed bool } @@ -42,6 +43,7 @@ type closeCommand struct{} func NewSimpleChannel[T any](commandBufferSize int, eventBufferSize int) Channel[T] { c := simpleChannel[T]{ commandChannel: make(chan any, commandBufferSize), + hasClosedChan: make(chan struct{}), eventBufferSize: eventBufferSize, } @@ -82,6 +84,9 @@ func (c *simpleChannel[T]) Close() { } c.isClosed = true c.commandChannel <- closeCommand{} + + // Wait for the close command to be handled, in order, before returning + <-c.hasClosedChan } func (c *simpleChannel[T]) handleChannel() { @@ -92,6 +97,7 @@ func (c *simpleChannel[T]) handleChannel() { close(subscriber) } close(c.commandChannel) + close(c.hasClosedChan) return case subscribeCommand[T]: From b138c079c42bfffd6df0bd768db08c8cf27314fa Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 13 Apr 2023 19:52:12 -0400 Subject: [PATCH 3/4] PR FIXUP - Simplify command declaration Language rules prevented the same for publishCommand as the generic is not allowed (would be ). --- events/simple.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/events/simple.go b/events/simple.go index 08bacc1f4b..6583931200 100644 --- a/events/simple.go +++ b/events/simple.go @@ -22,13 +22,9 @@ type simpleChannel[T any] struct { isClosed bool } -type subscribeCommand[T any] struct { - subscriptionChannel Subscription[T] -} +type subscribeCommand[T any] Subscription[T] -type unsubscribeCommand[T any] struct { - subscriptionChannel Subscription[T] -} +type unsubscribeCommand[T any] Subscription[T] type publishCommand[T any] struct { item T @@ -60,7 +56,7 @@ func (c *simpleChannel[T]) Subscribe() (Subscription[T], error) { // It is important to set this buffer size too, else we may end up blocked in the handleChannel func ch := make(chan T, c.eventBufferSize) - c.commandChannel <- subscribeCommand[T]{ch} + c.commandChannel <- subscribeCommand[T](ch) return ch, nil } @@ -68,7 +64,7 @@ func (c *simpleChannel[T]) Unsubscribe(ch Subscription[T]) { if c.isClosed { return } - c.commandChannel <- unsubscribeCommand[T]{ch} + c.commandChannel <- unsubscribeCommand[T](ch) } func (c *simpleChannel[T]) Publish(item T) { @@ -101,13 +97,13 @@ func (c *simpleChannel[T]) handleChannel() { return case subscribeCommand[T]: - c.subscribers = append(c.subscribers, command.subscriptionChannel) + c.subscribers = append(c.subscribers, command) case unsubscribeCommand[T]: var isFound bool var index int for i, subscriber := range c.subscribers { - if command.subscriptionChannel == subscriber { + if command == subscriber { index = i isFound = true break @@ -121,7 +117,7 @@ func (c *simpleChannel[T]) handleChannel() { c.subscribers[index] = c.subscribers[len(c.subscribers)-1] c.subscribers = c.subscribers[:len(c.subscribers)-1] - close(command.subscriptionChannel) + close(command) case publishCommand[T]: for _, subscriber := range c.subscribers { From 55b61c7899dd808b84ed4f4cac3aea19bc15d6ef Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 13 Apr 2023 19:55:27 -0400 Subject: [PATCH 4/4] PR FIXUP - Document command-buffer limitation This is a change from the prior solutino, as the (un)subscribe and event channels were separate. --- events/simple.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/events/simple.go b/events/simple.go index 6583931200..bf247a7a16 100644 --- a/events/simple.go +++ b/events/simple.go @@ -16,6 +16,9 @@ type simpleChannel[T any] struct { // // It is important that all stuff gets sent through this single channel to ensure // that the order of operations is preserved. + // + // WARNING: This does mean that non-event commands can block the database if the buffer + // size is breached (e.g. if many subscribe commands occupy the buffer). commandChannel chan any eventBufferSize int hasClosedChan chan struct{}