Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return receive-only channel in GetMsgChan() #350

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (cp *CollectingProcess) GetAddress() net.Addr {
return cp.netAddress
}

func (cp *CollectingProcess) GetMsgChan() chan *entities.Message {
func (cp *CollectingProcess) GetMsgChan() <-chan *entities.Message {
return cp.messageChan
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type AggregationProcess struct {
// mutex allows multiple readers or one writer at the same time
mutex sync.RWMutex
// messageChan is the channel to receive the message
messageChan chan *entities.Message
messageChan <-chan *entities.Message
// workerNum is the number of workers to process the messages
workerNum int
// workerList is the list of workers
Expand Down Expand Up @@ -71,7 +71,7 @@ type AggregationProcess struct {
}

type AggregationInput struct {
MessageChan chan *entities.Message
MessageChan <-chan *entities.Message
WorkerNum int
CorrelateFields []string
AggregateElements *AggregationElements
Expand Down
4 changes: 2 additions & 2 deletions pkg/intermediate/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (

type worker struct {
id int
messageChan chan *entities.Message
messageChan <-chan *entities.Message
errChan chan bool
job func(*entities.Message) error
}

func createWorker(id int, messageChan chan *entities.Message, job func(*entities.Message) error) *worker {
func createWorker(id int, messageChan <-chan *entities.Message, job func(*entities.Message) error) *worker {
return &worker{
id,
messageChan,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/producer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitM
// PublishIPFIXMessages takes in a message channel as input and converts all the messages on
// the message channel to flow messages in proto schema. This function exits when
// the input message channel is closed.
func (kp *KafkaProducer) PublishIPFIXMessages(msgCh chan *entities.Message) {
func (kp *KafkaProducer) PublishIPFIXMessages(msgCh <-chan *entities.Message) {
for msg := range msgCh {
flowMsgs := kp.input.ProtoSchemaConvertor.ConvertIPFIXMsgToFlowMsgs(msg)
for _, flowMsg := range flowMsgs {
Expand Down
Loading