Skip to content

Commit

Permalink
Change flow export mechanism for Flow Aggregator
Browse files Browse the repository at this point in the history
Introduce active and inactive flow timeout configuration
knobs.
With active flow record timeout, every flow record is sent to the
collector based on its own active expiry timeout value.
With inactive flow record timeout, a flow record is sent to the
collector if no records are seen by the flow aggregator for the flow,
since it was last updated.

Used priority queue approach to keep track of individual expiry values
for records.
  • Loading branch information
srikartati committed Mar 24, 2021
1 parent 5b5ed2b commit 4d15b90
Show file tree
Hide file tree
Showing 3 changed files with 461 additions and 137 deletions.
230 changes: 98 additions & 132 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,21 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) {
defer fa.collectingProcess.Stop()
go fa.aggregationProcess.Start()
defer fa.aggregationProcess.Stop()
go fa.activeFlowRecordExpiryCheck(stopCh)
go fa.inactiveFlowRecordExpiryCheck(stopCh)
go fa.flowRecordExpiryCheck(true, stopCh)
go fa.flowRecordExpiryCheck(false, stopCh)

<-stopCh
}

func (fa *flowAggregator) activeFlowRecordExpiryCheck(stopCh <-chan struct{}) {
expireTicker := time.NewTicker(fa.activeFlowRecordTimeout)
OUTER:
func (fa *flowAggregator) flowRecordExpiryCheck(isActiveTimeout bool, stopCh <-chan struct{}) {
var timeout time.Duration
if isActiveTimeout {
timeout = fa.activeFlowRecordTimeout
} else {
timeout = fa.inactiveFlowRecordTimeout
}

expireTicker := time.NewTicker(timeout)
for {
select {
case <-stopCh:
Expand All @@ -364,60 +370,105 @@ OUTER:
err := fa.initExportingProcess()
if err != nil {
klog.Errorf("Error when initializing exporting process: %v, will retry in %s", err, fa.activeFlowRecordTimeout)
// Initializing exporting process fails, will retry in next activeFlowRecordTimeout
// Initializing exporting process fails, will retry in next cycle.
continue
}
}
// Pop the flow record item from priority queue and send the flow records
if fa.activeExpiryPriorityQueue.Len() > 0 {
pqItem := fa.activeExpiryPriorityQueue.Peek().(ItemToExpire)
for pqItem.TimeToExpire == fa.currentActiveExpiry {
// Pop the record item from the priority queue
pqItem = fa.activeExpiryPriorityQueue.Pop().(ItemToExpire)
// Update the record metadata corresponding priority queue item.
recMetada, exists := fa.getRecordFromMetadataMap(pqItem.FlowKey)
if !exists {
klog.Warningf("Record metadata should be there in the metadata map.")
}
recMetada.itemInActiveTimeoutPQ = nil
if err := fa.sendFlowKeyRecord(pqItem.FlowKey, pqItem.FlowRecord); err != nil {
klog.Errorf("Error when triggering active timeout expiry actions: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
expireTicker.Stop()
expireTicker = time.NewTicker(fa.activeFlowRecordTimeout)
continue OUTER
}
if fa.activeExpiryPriorityQueue.Len() == 0 {
break
} else {
pqItem = fa.activeExpiryPriorityQueue.Peek().(ItemToExpire)
}
}
}

err := fa.aggregationProcess.ForAllRecordsDo(fa.updateActiveExpiryPriorityQueue)
if err != nil {
if err := fa.popAndSendFlowRecords(isActiveTimeout); err != nil {
klog.Errorf("Error when triggering active timeout expiry actions: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
expireTicker.Stop()
expireTicker = time.NewTicker(fa.activeFlowRecordTimeout)
expireTicker = time.NewTicker(timeout)
continue
}

if err := fa.aggregationProcess.ForAllRecordsDo(fa.updateActiveExpiryPriorityQueue); err != nil {
klog.Errorf("Error when updating active timeout priority queue: %v", err)
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
expireTicker.Stop()
expireTicker = time.NewTicker(timeout)
} else {
// Get the top item of the priority queue to change the ticker interval.
topFlowItem := fa.activeExpiryPriorityQueue.Peek().(*ItemToExpire)
fa.currentActiveExpiry = topFlowItem.TimeToExpire
expireTicker.Stop()
expireTicker = time.NewTicker(fa.currentActiveExpiry)
if isActiveTimeout {
expireTicker = time.NewTicker(fa.activeExpiryPriorityQueue.Peek().(*ItemToExpire).TimeToExpire)
} else {
expireTicker = time.NewTicker(fa.inactiveExpiryPriorityQueue.Peek().(*ItemToExpire).TimeToExpire)
}
}
}
}
}

// popAndSendFlowRecords pops the flow item from the indicated priority queue and
// sends the flow records.
func (fa *flowAggregator) popAndSendFlowRecords(isActivePQ bool) error {
var pq *TimeToExpirePriorityQueue
var expiryTime time.Duration
if isActivePQ {
pq = &fa.activeExpiryPriorityQueue
expiryTime = fa.currentActiveExpiry
} else {
pq = &fa.inactiveExpiryPriorityQueue
expiryTime = fa.currentActiveExpiry
}

if pq.Len() == 0 {
return nil
}
for pq.Len() > 0 {
topItem := pq.Peek().(*ItemToExpire)
if topItem.TimeToExpire != expiryTime {
// We do not have to check other items anymore.
break
}
// Pop the record item from the priority queue
pqItem := heap.Pop(pq).(*ItemToExpire)
// Update the record metadata corresponding priority queue item.
recMetadata, exists := fa.getRecordFromMetadataMap(pqItem.FlowKey)
if !exists {
klog.Warningf("Record metadata should be there in the metadata map.")
}
if isActivePQ {
recMetadata.itemInActiveTimeoutPQ = nil
} else {
// Add extra check for flow item in inactive timeout priority queue
// to see if the last flow updated time has changed. If it changes,
// we do not send the record, we move onto the next one. The popped
// record will be added back with reset queue.
flowEndField, fieldExists := pqItem.FlowRecord.Record.GetInfoElementWithValue("flowEndSeconds")
if !fieldExists {
klog.Warningf("record is invalid because flowEndSeconds IPFIX field is not present")
}
if recMetadata.lastUpdatedTime != flowEndField.Value.(uint32) {
recMetadata.itemInInactiveTimeoutPQ = nil
continue
}
recMetadata.itemInInactiveTimeoutPQ = nil
}
if err := fa.sendFlowKeyRecord(pqItem.FlowKey, pqItem.FlowRecord); err != nil {
return err
}
if !isActivePQ {
// If the routine is for inactive priority queue, delete the flow
// record from record map in the intermediate process.
fa.aggregationProcess.DeleteFlowKeyFromMapWithoutLock(pqItem.FlowKey)
// Delete the record from record metadata map.
if err := fa.deleteRecordFromMetadataMap(pqItem.FlowKey); err != nil {
klog.Warningf("Flow record should be present in record metadata map.")
}
}
// Add the updated record metadata to metadata map
fa.addRecordToMetadataMap(pqItem.FlowKey, recMetadata)
}
return nil
}

func (fa *flowAggregator) updateActiveExpiryPriorityQueue(key ipfixintermediate.FlowKey, record ipfixintermediate.AggregationFlowRecord) error {
if !record.ReadyToSend {
klog.V(4).Info("Skip the update for this record as it is not ready.")
Expand Down Expand Up @@ -448,81 +499,6 @@ func (fa *flowAggregator) updateActiveExpiryPriorityQueue(key ipfixintermediate.
return nil
}

func (fa *flowAggregator) inactiveFlowRecordExpiryCheck(stopCh <-chan struct{}) {
expireTicker := time.NewTicker(fa.inactiveFlowRecordTimeout)
OUTER:
for {
select {
case <-stopCh:
if fa.exportingProcess != nil {
fa.exportingProcess.CloseConnToCollector()
}
expireTicker.Stop()
return
case <-expireTicker.C:
if fa.exportingProcess == nil {
err := fa.initExportingProcess()
if err != nil {
klog.Errorf("Error when initializing exporting process: %v, will retry in %s", err, fa.activeFlowRecordTimeout)
// Initializing exporting process fails, will retry in next activeFlowRecordTimeout
continue
}
}
// Pop the flow record item from priority queue and send the flow records
if fa.inactiveExpiryPriorityQueue.Len() > 0 {
pqItem := fa.inactiveExpiryPriorityQueue.Peek().(ItemToExpire)
for pqItem.TimeToExpire == fa.currentInactiveExpiry {
// Pop the record item from the priority queue
pqItem = fa.inactiveExpiryPriorityQueue.Pop().(ItemToExpire)
// Update the record metadata corresponding priority queue item.
recMetada, exists := fa.getRecordFromMetadataMap(pqItem.FlowKey)
if !exists {
klog.Warningf("Record metadata should be there in the metadata map.")
}
recMetada.itemInInactiveTimeoutPQ = nil

if err := fa.sendFlowKeyRecord(pqItem.FlowKey, pqItem.FlowRecord); err != nil {
klog.Errorf("Error when triggering active timeout expiry actions: %v", err)
// If there is an error when sending flow records because of intermittent connectivity, we reset the connection
// to IPFIX collector and retry in the next export cycle to reinitialize the connection and send flow records.
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
expireTicker.Stop()
expireTicker = time.NewTicker(fa.inactiveFlowRecordTimeout)
continue OUTER
}
// Delete the flow record from record map in the intermediate process.
fa.aggregationProcess.DeleteFlowKeyFromMapWithoutLock(pqItem.FlowKey)
// Delete the record from record metadata map.
if err := fa.deleteRecordFromMetadataMap(pqItem.FlowKey); err != nil {
klog.Warningf("Flow record should be present in metadata map")
}
if fa.inactiveExpiryPriorityQueue.Len() == 0 {
break
} else {
pqItem = fa.inactiveExpiryPriorityQueue.Peek().(ItemToExpire)
}
}
}
err := fa.aggregationProcess.ForAllRecordsDo(fa.updateInactiveExpiryPriorityQueue)
if err != nil {
klog.Errorf("Error when triggering inactive timeout expiry actions : %v", err)
// If there is an error when triggering actions for flow record,
// we reset the connection to IPFIX collector and retry later.
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
} else {
// Peek into the top item of the priority queue to change the ticker
// interval dynamically.
topFlowItem := fa.inactiveExpiryPriorityQueue.Peek().(*ItemToExpire)
fa.currentInactiveExpiry = topFlowItem.TimeToExpire
expireTicker.Stop()
expireTicker = time.NewTicker(fa.currentInactiveExpiry)
}
}
}
}

func (fa *flowAggregator) updateInactiveExpiryPriorityQueue(key ipfixintermediate.FlowKey, record ipfixintermediate.AggregationFlowRecord) error {
if !record.ReadyToSend {
klog.V(4).Info("Skip sending the record as it is not ready.")
Expand Down Expand Up @@ -550,11 +526,10 @@ func (fa *flowAggregator) updateInactiveExpiryPriorityQueue(key ipfixintermediat
fa.addRecordToMetadataMap(key, recMetadata)
} else {
var newTimeoutVal time.Duration
// Check if the inactive expiry timeout valid or not, and update the value
// based on that. If the last updated time of the flow is changed, then
// reset the expiry timeout value.
// If the last updated time of the flow is changed, then
// reset the expiry timeout value, otherwise update the value.
if recMetadata.lastUpdatedTime == flowEndField.Value.(uint32) {
newTimeoutVal = fa.inactiveFlowRecordTimeout - fa.currentInactiveExpiry
newTimeoutVal = recMetadata.itemInInactiveTimeoutPQ.TimeToExpire - fa.currentInactiveExpiry
} else {
recMetadata.lastUpdatedTime = flowEndField.Value.(uint32)
newTimeoutVal = fa.inactiveFlowRecordTimeout
Expand All @@ -564,7 +539,6 @@ func (fa *flowAggregator) updateInactiveExpiryPriorityQueue(key ipfixintermediat
// Update the record Metadata in metadata map.
fa.addRecordToMetadataMap(key, recMetadata)
}

return nil
}

Expand All @@ -582,10 +556,11 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
if err != nil {
return fmt.Errorf("error when adding the record to the set: %v", err)
}
_, err = fa.sendDataSet(fa.set)
sentBytes, err := fa.exportingProcess.SendSet(fa.set)
if err != nil {
return err
return fmt.Errorf("error when sending data set: %v", err)
}
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes)
return nil
}

Expand Down Expand Up @@ -657,15 +632,6 @@ func (fa *flowAggregator) sendTemplateSet(templateSet ipfix.IPFIXSet, isIPv6 boo
return bytesSent, err
}

func (fa *flowAggregator) sendDataSet(dataSet ipfix.IPFIXSet) (int, error) {
sentBytes, err := fa.exportingProcess.SendSet(dataSet)
if err != nil {
return 0, fmt.Errorf("error when sending data set: %v", err)
}
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes)
return sentBytes, nil
}

func (fa *flowAggregator) addRecordToMetadataMap(flowKey ipfixintermediate.FlowKey, recMetadata recordMetadata) {
fa.recordMetadataMutex.Lock()
defer fa.recordMetadataMutex.Unlock()
Expand Down
Loading

0 comments on commit 4d15b90

Please sign in to comment.