Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swap message after sometime #459

Open
dramirezp opened this issue Jul 24, 2024 · 1 comment
Open

Swap message after sometime #459

dramirezp opened this issue Jul 24, 2024 · 1 comment

Comments

@dramirezp
Copy link

I am using Watermill to develop software where I send a message, and it goes through service1, service2, and the last service. I use a slice to control the order of the messages (FIFO, as GoChannel should respect FIFO). After several runs, I am encountering an issue where Watermill is swapping messages. For example, I send message A and message B, but in the last service, message B arrives first and then message A. Attached is a small script where this problem is reflected. It seems to be a race condition because it doesn't always happen, but when the script is run and the issue occurs, it shows a message like this: Slice value 68ad9d74-c0eb-476f-9cc0-5da98d947b61 value in message f01fff7d-1fb6-45a4-bda6-07e021511d3f.

/*
This application is a test of Watermill, a Go library for working efficiently with message streams.
Sending and recieving menssages from a channel.
*/

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var pubSub1 *gochannel.GoChannel
var safeSlice *SafeSlice

// Safe Slice struct just for control of the messages
type SafeSlice struct {
	mu    sync.Mutex
	slice []string
}

func (s *SafeSlice) Append(value string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.slice = append(s.slice, value)
}

func (s *SafeSlice) Get(index int) (string, bool) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if index < 0 || index >= len(s.slice) {
		return "Index out of scope", false
	}
	return s.slice[index], true
}

func (s *SafeSlice) Remove(index int) bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	if index < 0 || index >= len(s.slice) {
		return false
	}
	s.slice = append(s.slice[:index], s.slice[index+1:]...)
	return true
}

// service1 function is a handler for the "service-1" service. It appends the message UUID to the
// safe slice and publishes the message to the "service-2-input" channel.
func service1(msg *message.Message) error {
	safeSlice.Append(msg.UUID)
	err := pubSub1.Publish("service-2-input", msg)
	if err != nil {
		panic(err)
	}

	return nil
}

// service2 function is a handler for the "service-2" service. It receives a message, performs
// some logic, and returns a slice of messages.
func service2(msg *message.Message) ([]*message.Message, error) {
	fmt.Printf("Message in service 2 %v\n", msg)

	// Add some logic

	return message.Messages{msg}, nil
}

// service_last function is a handler for the "service_last" service. It compares the message
// UUID with the first UUID in the safe slice and removes the first UUID if they match.
func service_last(msg *message.Message) error {
	uuid, _ := safeSlice.Get(0)

	fmt.Printf("service_last %v\n", msg)

	if msg.UUID == uuid {
		fmt.Println("OK")
		safeSlice.Remove(0)
	} else {
		fmt.Printf("Slice value %s value in message %s\n", uuid, msg.UUID)
		os.Exit(0)
	}

	return nil
}

func main() {

	logger := watermill.NewStdLogger(true, true)
	safeSlice = &SafeSlice{}

	pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)

	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Fatalf("could not create router: %v", err)
	}

	// Create handlers for each service
	router.AddNoPublisherHandler("service-1", "service-1-input", pubSub1, service1)
	router.AddHandler("service-2", "service-2-input", pubSub1, "service_last-input", pubSub1, service2)
	router.AddNoPublisherHandler("service_last", "service_last-input", pubSub1, service_last)

	// Start the router
	go func() {
		if err := router.Run(context.Background()); err != nil {
			log.Fatalf("could not run router: %v", err)
		}
	}()

	time.Sleep(1 * time.Second)

	for {
		// Publish a message to start the pipeline
		msg := message.NewMessage(watermill.NewUUID(), []byte{})
		if err := pubSub1.Publish("service-1-input", msg); err != nil {
			log.Fatalf("could not publish message: %v", err)
		}

		//time.Sleep(1000 * time.Millisecond)
	}

	// Allow some time for the message to be processed
	select {}
}

@yashb042
Copy link

The GoChannel approach is using no go-routines internally if you don't explicitly add Multiplier.

I tried running the code and seems like there's some problem indeed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants