-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Go in a more idiomatic way #126
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
a439489
peer: remove unnecessary accidental complexity
daniel-abramov 8d0a14e
message_sink: clarify the meaning of `sealed`
daniel-abramov 4386f4d
conference: remove a buffer in peer messages sink
daniel-abramov 11d83f6
message_sink: refactor the usage of seal logic
daniel-abramov 36b3fc7
subscription: reduce the buffer on worker
daniel-abramov 5376a7a
router: remove the buffer on incoming channel
daniel-abramov b03eb1a
heartbeat: refactor heartbeat channel usage
daniel-abramov 9486575
channel: get rid of last `UnboundedChannelSize`
daniel-abramov 9315323
router: refactor the router in a more idiomatic Go
daniel-abramov 4ce8b07
track: move track info out of `common`
daniel-abramov 974f469
worker: move to its own package
daniel-abramov 20b5085
sink: move to the `channel` module
daniel-abramov 94d0b62
sink: handle an important edge-case
daniel-abramov 4ae95a8
conference: get back buffers
daniel-abramov eff418b
router: rename `RunRouter` to `StartRouter`
daniel-abramov b40d6ad
signaling: don't panic on errors inside a package
daniel-abramov 7c4ea8f
conference: don't store `done` channel in state
daniel-abramov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package channel | ||
|
||
import ( | ||
"errors" | ||
"sync/atomic" | ||
) | ||
|
||
var ErrSinkSealed = errors.New("The channel is sealed") | ||
|
||
// SinkWithSender is a helper struct that allows to send messages to a message sink. | ||
// The SinkWithSender abstracts the message sink which has a certain sender, so that | ||
// the sender does not have to be specified every time a message is sent. | ||
// At the same it guarantees that the caller can't alter the `sender`, which means that | ||
// the sender can't impersonate another sender (and we guarantee this on a compile-time). | ||
type SinkWithSender[SenderType comparable, MessageType any] struct { | ||
// The sender of the messages. This is useful for multiple-producer-single-consumer scenarios. | ||
sender SenderType | ||
// The message sink to which the messages are sent. | ||
messageSink chan<- Message[SenderType, MessageType] | ||
// A channel that is used to indicate that our channel is considered sealed. It's akin | ||
// to a close indication without really closing the channel. We don't want to close | ||
// the channel here since we know that the sink is shared between multiple producers, | ||
// so we only disallow sending to the sink at this point. | ||
sealed chan struct{} | ||
// A "mutex" that is used to protect the act of closing `sealed`. | ||
alreadySealed atomic.Bool | ||
} | ||
|
||
// Creates a new MessageSink. The function is generic allowing us to use it for multiple use cases. | ||
// Note that since the current implementation accepts a channel, it's **not responsible** for closing it. | ||
func NewSink[S comparable, M any](sender S, messageSink chan<- Message[S, M]) *SinkWithSender[S, M] { | ||
return &SinkWithSender[S, M]{ | ||
sender: sender, | ||
messageSink: messageSink, | ||
sealed: make(chan struct{}), | ||
} | ||
} | ||
|
||
// Sends a message to the message sink. Blocks if the sink is full! | ||
func (s *SinkWithSender[S, M]) Send(message M) error { | ||
if s.alreadySealed.Load() { | ||
return ErrSinkSealed | ||
} | ||
|
||
messageWithSender := Message[S, M]{ | ||
Sender: s.sender, | ||
Content: message, | ||
} | ||
|
||
select { | ||
case <-s.sealed: | ||
return ErrSinkSealed | ||
case s.messageSink <- messageWithSender: | ||
return nil | ||
} | ||
} | ||
|
||
// Seals the channel, which means that no messages could be sent via this channel. | ||
// Any attempt to send a message after `Seal()` returns will result in an error. | ||
// Note that it does not mean (does not guarantee) that any existing senders that are | ||
// waiting on the send to unblock won't send the message to the recipient (this case | ||
// can happen if buffered channels are used). The existing senders will either unblock | ||
// at this point and get an error that the channel is sealed or will unblock by sending | ||
// the message to the recipient (should the recipient be ready to consume at this point). | ||
func (s *SinkWithSender[S, M]) Seal() { | ||
if !s.alreadySealed.CompareAndSwap(false, true) { | ||
return | ||
} | ||
|
||
select { | ||
case <-s.sealed: | ||
return | ||
default: | ||
close(s.sealed) | ||
} | ||
} | ||
|
||
// Messages that are sent from the peer to the conference in order to communicate with other peers. | ||
// Since each peer is isolated from others, it can't influence the state of other peers directly. | ||
type Message[SenderType comparable, MessageType any] struct { | ||
// The sender of the message. | ||
Sender SenderType | ||
// The content of the message. | ||
Content MessageType | ||
} |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do not know which of the two cases is executed at runtime. With 'default' you make sure that the case is checked first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is it not different semantically? I.e. what if the sender gets to the
s.messageSink <- messageWithSender
part and blocks there waiting for the reader to get ready? We must ensure that once the reader is not ready to accept new messages (e.g. not interested in them anymore), then we don't continue waiting on the channel. Essentially that's what I tried to describe, i.e. it's semantically I wanted thisThough your concern is valid, I tried to catch this case by checking the atomic variable at the beginning of the function (this does not guarantee that those who called
Send()
just beforeClose()
won't send a value though).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok got it better send a message more than blocking.