Skip to content

Commit

Permalink
Return receive-only channel in GetMsgChan() (#350)
Browse files Browse the repository at this point in the history
This is a small improvement: the message channel retrieved from the
collecting process should be "receive-only".

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored May 29, 2024
1 parent 1c17dec commit 5acb2ca
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (cp *CollectingProcess) GetAddress() net.Addr {
return cp.netAddress
}

func (cp *CollectingProcess) GetMsgChan() chan *entities.Message {
func (cp *CollectingProcess) GetMsgChan() <-chan *entities.Message {
return cp.messageChan
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type AggregationProcess struct {
// mutex allows multiple readers or one writer at the same time
mutex sync.RWMutex
// messageChan is the channel to receive the message
messageChan chan *entities.Message
messageChan <-chan *entities.Message
// workerNum is the number of workers to process the messages
workerNum int
// workerList is the list of workers
Expand Down Expand Up @@ -71,7 +71,7 @@ type AggregationProcess struct {
}

type AggregationInput struct {
MessageChan chan *entities.Message
MessageChan <-chan *entities.Message
WorkerNum int
CorrelateFields []string
AggregateElements *AggregationElements
Expand Down
4 changes: 2 additions & 2 deletions pkg/intermediate/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (

type worker struct {
id int
messageChan chan *entities.Message
messageChan <-chan *entities.Message
errChan chan bool
job func(*entities.Message) error
}

func createWorker(id int, messageChan chan *entities.Message, job func(*entities.Message) error) *worker {
func createWorker(id int, messageChan <-chan *entities.Message, job func(*entities.Message) error) *worker {
return &worker{
id,
messageChan,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/producer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitM
// PublishIPFIXMessages takes in a message channel as input and converts all the messages on
// the message channel to flow messages in proto schema. This function exits when
// the input message channel is closed.
func (kp *KafkaProducer) PublishIPFIXMessages(msgCh chan *entities.Message) {
func (kp *KafkaProducer) PublishIPFIXMessages(msgCh <-chan *entities.Message) {
for msg := range msgCh {
flowMsgs := kp.input.ProtoSchemaConvertor.ConvertIPFIXMsgToFlowMsgs(msg)
for _, flowMsg := range flowMsgs {
Expand Down

0 comments on commit 5acb2ca

Please sign in to comment.