Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] v1.0 preparations #17

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open

[WIP] v1.0 preparations #17

wants to merge 20 commits into from

Conversation

roblaszczak
Copy link
Member

No description provided.

README.md Outdated
@@ -16,8 +16,7 @@ You can run the unit tests by simply running the command: `go test -cover -race

#### Development and Testing

To try the in test mode (using [goaws](https://github.com/p4tin/goaws))
To try the in test mode (using [localstack](https://hub.docker.com/r/localstack/localstack))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goaws is not compatible with newer AWS SDK - I needed to replace it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why but I remember experiencing strange behaviours with localstack.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunetly goaws doesn't work with never AWS SDK: Admiral-Piett/goaws#279

but it looks lighter than localstack so we can potentially go back

@@ -93,7 +93,7 @@ func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {

sub, err := sqs.NewSubscriber(sqs.SubscriberConfig{
AWSConfig: cfg,
CreateQueueInitializerConfig: sqs.QueueConfigAtrributes{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a typo in QueueConfigAttributes

sqs/config.go Outdated
Comment on lines 20 to 28
CreateQueueInitializerConfig QueueConfigAttributes

GenerateCreateQueueInput GenerateCreateQueueInputFunc

GenerateGetQueueUrlInput GenerateGetQueueUrlInputFunc

GenerateReceiveMessageInput GenerateReceiveMessageInputFunc

GenerateDeleteMessageInput GenerateDeleteMessageInputFunc
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added config for all things where we can specify in AWS requests - all those has sane defaults

looking for feedback

I need to add comments here

return &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MessageAttributeNames: []string{"All"},
WaitTimeSeconds: 20, // 20 is max at the moment
Copy link
Member Author

@roblaszczak roblaszczak Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is time that for what request send by sdk will wait for messages if they are none yet - to avoid calling AWS SDK like crazy

@@ -13,7 +13,7 @@ type Marshaler interface {
Marshal(msg *message.Message) (*types.Message, error)
}

type UnMarshaler interface {
type Unmarshaler interface {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it consistent with stdlib and other watermill stuff

sqs/sqs.go Outdated
b, _ := json.Marshal(q)
// QueueConfigAttributesBool is a custom type for bool values in QueueConfigAttributes
// that supports marshaling to string.
type QueueConfigAttributesBool bool
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it didn't worked earlier - it errored because json wasn't able to marshal bool to string - this type fixes it

sqs/sqs.go Outdated
return []byte("false"), nil
}

func (q QueueConfigAttributes) Attributes() (map[string]string, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i started to return errors from here as it was silently failing earlier 🙈

sqs/sqs.go Outdated
getQueueOutput, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(topic),
})
func getQueueUrl(ctx context.Context, sqsClient *sqs.Client, topic string, generateGetQueueUrlInput GenerateGetQueueUrlInputFunc) (*string, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it private to not increase public API surface (and now it requires generateGetQueueUrlInput), but if anybody see it useful please let me know

@@ -94,33 +98,41 @@ func (s *Subscriber) receive(ctx context.Context, queueURL string, output chan *
"queue": queueURL,
}

go func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without that tests were hanging - I added err check for all places that may return canceled ctx after that

}
case <-msg.Nacked():
// Do not delete message, it will be redelivered
return false // we don't want to process next messages to preserve order
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we were not doing it earlier, but I guess that it's expected? if we'll continue we can process next messages and we may lose ordering

eventually we can add config here

return s.SubscribeInitializeWithContext(context.Background(), topic)
}

func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just in case if someone wanted to pass ctx

SqsTopic: sqsTopic,
SnsTopicArn: snsTopicArn,
SqsQueueArn: *sqsQueueArn,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing err check?


input, err := s.config.GenerateSubscribeInput(ctx, GenerateSubscribeInputParams{
SqsTopic: sqsTopic,
SnsTopicArn: snsTopicArn,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SnsTopicArn: snsTopicArn,
SqsTopicArn: snsTopicArn,

Typo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

czemu?

@Mariscal6
Copy link
Contributor

thanks @roblaszczak, looks much better

@roblaszczak roblaszczak marked this pull request as ready for review September 5, 2024 19:53
@@ -8,18 +8,20 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

// todo: check if it can be renamed
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mariscal6 is UUID special in any way? or we could rename it to _watermill_uuid to avoid conflicts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not special, we can modify it

// client side uuid
// there is a deduplication id that can be use for
// fifo queues
// todo: check how it works
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mariscal6 connected to previous question 😉

@@ -1,98 +0,0 @@
package main
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roblaszczak I found these examples useful. Maybe we can put them in an example folder....

@@ -8,18 +8,20 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

// todo: check if it can be renamed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not special, we can modify it

SnsTopicArn: snsTopicArn,
SqsURL: sqsURL,
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing err check

return nil
}

func (s *Subscriber) setSqsQuePolicy(ctx context.Context, sqsQueueArn sqs.QueueArn, snsTopicArn TopicArn, sqsURL sqs.QueueURL) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setSqsQuePolicy to setSqsQueuePolicy?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants