diff --git a/pkg/collector/process.go b/pkg/collector/process.go index bad76377..66c7db3a 100644 --- a/pkg/collector/process.go +++ b/pkg/collector/process.go @@ -127,7 +127,7 @@ func (cp *CollectingProcess) GetAddress() net.Addr { return cp.netAddress } -func (cp *CollectingProcess) GetMsgChan() chan *entities.Message { +func (cp *CollectingProcess) GetMsgChan() <-chan *entities.Message { return cp.messageChan } diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index d63188da..55ae3e3b 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -43,7 +43,7 @@ type AggregationProcess struct { // mutex allows multiple readers or one writer at the same time mutex sync.RWMutex // messageChan is the channel to receive the message - messageChan chan *entities.Message + messageChan <-chan *entities.Message // workerNum is the number of workers to process the messages workerNum int // workerList is the list of workers @@ -71,7 +71,7 @@ type AggregationProcess struct { } type AggregationInput struct { - MessageChan chan *entities.Message + MessageChan <-chan *entities.Message WorkerNum int CorrelateFields []string AggregateElements *AggregationElements diff --git a/pkg/intermediate/worker.go b/pkg/intermediate/worker.go index fab9c7ff..1051c94b 100644 --- a/pkg/intermediate/worker.go +++ b/pkg/intermediate/worker.go @@ -8,12 +8,12 @@ import ( type worker struct { id int - messageChan chan *entities.Message + messageChan <-chan *entities.Message errChan chan bool job func(*entities.Message) error } -func createWorker(id int, messageChan chan *entities.Message, job func(*entities.Message) error) *worker { +func createWorker(id int, messageChan <-chan *entities.Message, job func(*entities.Message) error) *worker { return &worker{ id, messageChan, diff --git a/pkg/kafka/producer/kafka.go b/pkg/kafka/producer/kafka.go index 41737e7b..f7bbf757 100644 --- a/pkg/kafka/producer/kafka.go +++ b/pkg/kafka/producer/kafka.go @@ -186,7 +186,7 @@ func (kp *KafkaProducer) SendFlowMessage(msg protoreflect.Message, kafkaDelimitM // PublishIPFIXMessages takes in a message channel as input and converts all the messages on // the message channel to flow messages in proto schema. This function exits when // the input message channel is closed. -func (kp *KafkaProducer) PublishIPFIXMessages(msgCh chan *entities.Message) { +func (kp *KafkaProducer) PublishIPFIXMessages(msgCh <-chan *entities.Message) { for msg := range msgCh { flowMsgs := kp.input.ProtoSchemaConvertor.ConvertIPFIXMsgToFlowMsgs(msg) for _, flowMsg := range flowMsgs {