Skip to content

Commit

Permalink
Revert "add buffer to subscription manager"
Browse files Browse the repository at this point in the history
This reverts commit ed43bcf.
  • Loading branch information
mathnogueira committed Apr 6, 2023
1 parent ed43bcf commit ec41795
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 198 deletions.
114 changes: 0 additions & 114 deletions server/subscription/buffer.go

This file was deleted.

68 changes: 45 additions & 23 deletions server/subscription/manager.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,67 @@
package subscription

import "time"
import "sync"

type Manager struct {
subscriptions *BufferedSubscriptions
cleanupPeriod time.Duration
subscriptions map[string][]Subscriber
mutex sync.Mutex
}

func NewManager() *Manager {
manager := &Manager{
subscriptions: NewBufferedSubscriptions(),
cleanupPeriod: 10 * time.Second,
return &Manager{
subscriptions: make(map[string][]Subscriber),
mutex: sync.Mutex{},
}

manager.startCleanupProcess()

return manager
}

func (m *Manager) Subscribe(resourceID string, subscriber Subscriber) {
m.subscriptions.AddSubscriber(resourceID, subscriber)
m.mutex.Lock()
defer m.mutex.Unlock()

array := make([]Subscriber, 0)

if existingArray, ok := m.subscriptions[resourceID]; ok {
array = existingArray
}

array = append(array, subscriber)
m.subscriptions[resourceID] = array
}

func (m *Manager) Unsubscribe(resourceID string, subscriptionID string) {
m.subscriptions.RemoveSubscriber(resourceID, subscriptionID)
m.mutex.Lock()
defer m.mutex.Unlock()

array, exists := m.subscriptions[resourceID]
if !exists {
return
}

newArray := make([]Subscriber, 0, len(array)-1)
for _, item := range array {
if item.ID() != subscriptionID {
newArray = append(newArray, item)
}
}

m.subscriptions[resourceID] = newArray
}

func (m *Manager) PublishUpdate(message Message) {
m.subscriptions.NotifySubscribers(message.ResourceID, message)
if subscribers, ok := m.subscriptions[message.ResourceID]; ok {
for _, subscriber := range subscribers {
subscriber.Notify(message)
}
}
}

func (m *Manager) Publish(resourceID string, message any) {
m.subscriptions.NotifySubscribers(resourceID, Message{ResourceID: resourceID, Content: message})
}

func (m *Manager) startCleanupProcess() {
go func() {
ticker := time.NewTicker(m.cleanupPeriod)
select {
case <-ticker.C:
m.subscriptions.CleanUp()
if subscribers, ok := m.subscriptions[resourceID]; ok {
for _, subscriber := range subscribers {
subscriber.Notify(Message{
ResourceID: resourceID,
Content: message,
})
}
}()
}
}
61 changes: 0 additions & 61 deletions server/subscription/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package subscription_test

import (
"testing"
"time"

"github.com/kubeshop/tracetest/server/subscription"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -107,63 +106,3 @@ func TestManagerUnsubscribe(t *testing.T) {
assert.Equal(t, message1.Content, receivedMessage.Content, "subscriber should not be notified after unsubscribed")

}

func TestBufferedMessages(t *testing.T) {
manager := subscription.NewManager()
receivedMessages := make([]subscription.Message, 0)

subscriber := subscription.NewSubscriberFunction(func(message subscription.Message) error {
receivedMessages = append(receivedMessages, message)
return nil
})

message1 := subscription.Message{
ResourceID: "test:1",
Type: "test_update",
Content: "Test was updated",
}

message2 := subscription.Message{
ResourceID: "test:1",
Type: "test_deleted",
Content: "Test was deleted",
}

manager.PublishUpdate(message1)
time.Sleep(1 * time.Second)
manager.Subscribe("test:1", subscriber)

manager.PublishUpdate(message2)

assert.Len(t, receivedMessages, 2)
}

func TestBufferedMessagesDisapearAfter10S(t *testing.T) {
manager := subscription.NewManager()
receivedMessages := make([]subscription.Message, 0)

subscriber := subscription.NewSubscriberFunction(func(message subscription.Message) error {
receivedMessages = append(receivedMessages, message)
return nil
})

message1 := subscription.Message{
ResourceID: "test:1",
Type: "test_update",
Content: "Test was updated",
}

message2 := subscription.Message{
ResourceID: "test:1",
Type: "test_deleted",
Content: "Test was deleted",
}

manager.PublishUpdate(message1)
time.Sleep(10 * time.Second)
manager.Subscribe("test:1", subscriber)

manager.PublishUpdate(message2)

assert.Len(t, receivedMessages, 1)
}

0 comments on commit ec41795

Please sign in to comment.