Skip to content

Commit

Permalink
Add stats support for intermediate process
Browse files Browse the repository at this point in the history
  • Loading branch information
srikartati committed Dec 6, 2020
1 parent 0c6473d commit cd87dd3
Show file tree
Hide file tree
Showing 6 changed files with 541 additions and 199 deletions.
248 changes: 194 additions & 54 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
// Copyright 2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package intermediate

import (
"bytes"
"encoding/binary"
"fmt"
"github.com/vmware/go-ipfix/pkg/util"
"net"
"strings"
"sync"
Expand All @@ -23,40 +40,26 @@ type AggregationProcess struct {
workerNum int
// workerList is the list of workers
workerList []*worker
// correlateFields are the fields to be filled in correlating process
// correlateFields are the fields to be filled when correlating records from
// two nodes.
correlateFields []string
// aggregateElements consists of stats and non-stats elements that need to be
// updated. In addition, new aggregation elements that has to be added to record
// to handle correlated records from two nodes should be given.
// TODO: Add checks to validate the lists inside such as no duplicates, order
// of stats etc.
aggregateElements *AggregationElements
// stopChan is the channel to receive stop message
stopChan chan bool
}

type FlowKey struct {
SourceAddress string
DestinationAddress string
Protocol uint8
SourcePort uint16
DestinationPort uint16
}

type AggregationInput struct {
MessageChan chan *entities.Message
WorkerNum int
CorrelateFields []string
}

type AggregationFlowRecord struct {
Record entities.Record
// ReadyToSend is an indicator that we received all required records for the
// given flow, i.e., records from source and destination nodes for the case
// inter-node flow and record from the node for the case of intra-node flow.
ReadyToSend bool
// IsActive is a flag that indicates whether the flow is active or not. If
// aggregation process stop receiving flows from collector process, we deem
// the flow as inactive.
IsActive bool
MessageChan chan *entities.Message
WorkerNum int
CorrelateFields []string
AggregateElements *AggregationElements
}

type FlowKeyRecordMapCallBack func(key FlowKey, records []entities.Record) error

// InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message)
// and correlateFields (fields to be correlated and filled).
func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error) {
Expand All @@ -72,6 +75,7 @@ func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)
input.WorkerNum,
make([]*worker, 0),
input.CorrelateFields,
input.AggregateElements,
make(chan bool),
}, nil
}
Expand Down Expand Up @@ -111,7 +115,9 @@ func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) er
if err != nil {
return err
}
a.aggregateRecord(flowKey, record)
if err = a.addOrUpdateRecordInMap(flowKey, record); err != nil {
return err
}
}
return nil
}
Expand All @@ -136,9 +142,9 @@ func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) {
delete(a.flowKeyRecordMap, flowKey)
}

// aggregateRecord either adds the record to flowKeyMap or update the record in
// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
// flowKeyMap by doing correlation or updating the stats.
func (a *AggregationProcess) aggregateRecord(flowKey *FlowKey, record entities.Record) {
func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record entities.Record) error {
a.mutex.Lock()
defer a.mutex.Unlock()

Expand All @@ -150,18 +156,43 @@ func (a *AggregationProcess) aggregateRecord(flowKey *FlowKey, record entities.R
if !aggregationRecord.ReadyToSend && !areRecordsFromSameNode(record, aggregationRecord.Record) {
a.correlateRecords(record, aggregationRecord.Record)
aggregationRecord.ReadyToSend = true
}
// Aggregation of incoming flow record with existing by updating stats
// and flow timestamps.
if isRecordFromSrc(record) {
if err := a.aggregateRecords(record, aggregationRecord.Record, true, false); err != nil {
return err
}
} else {
// If the record from the node is already present, update the stats
// and timestamps.
if err := a.aggregateRecords(record, aggregationRecord.Record, false, true); err != nil {
return err
}
}
} else {
// For intra-node flows, just do aggregation of the flow record with
// existing record by updating the stats and flow timestamps. Correlation
// is not required.

if err := a.aggregateRecords(record, aggregationRecord.Record, true, true); err != nil {
return err
}
}

} else {
// Add all the new stat fields and initialize them.
if !isRecordIntraNode(record) {
if isRecordFromSrc(record) {
if err := a.addFieldsForStatsAggregation(record, true, false); err != nil {
return err
}
} else {
if err := a.addFieldsForStatsAggregation(record, false, true); err != nil {
return err
}
}
} else {
if err := a.addFieldsForStatsAggregation(record, true, true); err != nil {
return err
}
}
aggregationRecord = AggregationFlowRecord{
record,
false,
Expand All @@ -172,7 +203,8 @@ func (a *AggregationProcess) aggregateRecord(flowKey *FlowKey, record entities.R
}
}

a.addRecordToMap(flowKey, aggregationRecord)
a.flowKeyRecordMap[*flowKey] = aggregationRecord
return nil
}

// correlateRecords correlate the incomingRecord with existingRecord using correlation
Expand All @@ -190,7 +222,7 @@ func (a *AggregationProcess) correlateRecords(incomingRecord, existingRecord ent
existingIeWithValue.Value = ieWithValue.Value
}
case entities.Unsigned16:
if ieWithValue.Value != uint16(0) {
if ieWithValue.Value != uint16(0) {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)
if existingIeWithValue.Value != uint16(0) {
klog.Warningf("This field with name %v should not have been filled with value %v in existing record.", field, existingIeWithValue.Value)
Expand Down Expand Up @@ -224,20 +256,126 @@ func (a *AggregationProcess) correlateRecords(incomingRecord, existingRecord ent
}
}

// addRecordToMap is currently used only in aggregateRecord().
// For other uses, please acquire the flowKeyRecordLock for protection.
func (a *AggregationProcess) addRecordToMap(flowKey *FlowKey, record AggregationFlowRecord) {
if _, exist := a.flowKeyRecordMap[*flowKey]; !exist {
a.flowKeyRecordMap[*flowKey] = record
// aggregateRecords aggregate the incomingRecord with existingRecord by updating
// stats and flow timestamps.
func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord entities.Record, fillSrcStats, fillDstStats bool) error {
if a.aggregateElements == nil {
return nil
}

for _, element := range a.aggregateElements.nonStatsElements {
if ieWithValue, exist := incomingRecord.GetInfoElementWithValue(element); exist {
switch ieWithValue.Element.Name {
case "flowEndSeconds":
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(element)
// Update flow end timestamp if it is latest.
if ieWithValue.Value.(uint32) > existingIeWithValue.Value.(uint32) {
existingIeWithValue.Value = ieWithValue.Value
}
default:
klog.Errorf("Fields with name %v is not supported in aggregation fields list.", element)
}
} else {
return fmt.Errorf("element with name %v in nonStatsElements not present in the incoming record", element)
}
}

statsElementList := a.aggregateElements.statsElements
antreaSourceStatsElements := a.aggregateElements.aggregatedSourceStatsElements
antreaDestinationStatsElements := a.aggregateElements.aggregatedDestinationStatsElements
for i, element := range statsElementList {
isDelta := false
if strings.Contains(element, "Delta") {
isDelta = true
}
if ieWithValue, exist := incomingRecord.GetInfoElementWithValue(element); exist {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(element)
// Update the corresponding element in existing record.
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
} else {
// We are simply adding the delta stats now. We expect delta stats to be
// reset after sending the record from flowKeyMap in aggregation process.
// Delta stats from source and destination nodes are added, so we will have
// two times the stats approximately.
// For delta stats, it is better to use source and destination specific
// stats.
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
}
// Update the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
} else {
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
}
}
// Update the corresponding destination element in antreaStatsElement list.
if fillDstStats {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(antreaDestinationStatsElements[i])
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
} else {
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
}
}
} else {
return fmt.Errorf("element with name %v in statsElements not present in the incoming record", element)
}
}
return nil
}

func (a *AggregationProcess) addFieldsForStatsAggregation(record entities.Record, fillSrcStats, fillDstStats bool) error {
if a.aggregateElements == nil {
return nil
}
statsElementList := a.aggregateElements.statsElements
antreaSourceStatsElements := a.aggregateElements.aggregatedSourceStatsElements
antreaDestinationStatsElements := a.aggregateElements.aggregatedDestinationStatsElements
antreaElements := append(antreaSourceStatsElements, antreaDestinationStatsElements...)

for _, element := range antreaElements {
// Get the new info element from Antrea registry.
// TODO: Take antrea registry enterpriseID as input to make this generic.
ie, err := registry.GetInfoElement(element, registry.AntreaEnterpriseID)
if err != nil {
return err
}
value := new(bytes.Buffer)
if err = util.Encode(value, binary.BigEndian, uint64(0)); err != nil {
return err
}
ieWithValue := entities.NewInfoElementWithValue(ie, value)
_, err = record.AddInfoElement(ieWithValue, true)
if err != nil {
return err
}
}
// Initialize the values of newly added stats info elements.
for i, element := range statsElementList {
if ieWithValue, exist := record.GetInfoElementWithValue(element); exist {
// Initialize the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := record.GetInfoElementWithValue(antreaSourceStatsElements[i])
existingIeWithValue.Value = ieWithValue.Value
}
// Initialize the corresponding destination element in antreaStatsElement list.
if fillDstStats {
existingIeWithValue, _ := record.GetInfoElementWithValue(antreaDestinationStatsElements[i])
existingIeWithValue.Value = ieWithValue.Value
}
}
}
return nil
}


// isRecordIntraNode returns true if record belongs to intra-node flow.
func isRecordIntraNode(record entities.Record) bool {
srcIEWithValue, exist := record.GetInfoElementWithValue("sourcePodName")
if exist && srcIEWithValue.Value != "" {
dstIEWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
if exist && dstIEWithValue.Value != "" {
if srcIEWithValue, exist := record.GetInfoElementWithValue("sourcePodName"); exist && srcIEWithValue.Value != "" {
if dstIEWithValue, exist := record.GetInfoElementWithValue("destinationPodName"); exist && dstIEWithValue.Value != "" {
return true
}
}
Expand All @@ -246,26 +384,28 @@ func isRecordIntraNode(record entities.Record) bool {

// isRecordFromSrc returns true if record belongs to inter-node flow and from source node.
func isRecordFromSrc(record entities.Record) bool {
if isRecordIntraNode(record) {
srcIEWithValue, exist := record.GetInfoElementWithValue("sourcePodName")
if !exist || srcIEWithValue.Value == "" {
return false
}
ieWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
if exist && ieWithValue.Value == "" {
return true
dstIEWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
if exist && dstIEWithValue.Value != "" {
return false
}
return false
return true
}

// isRecordFromDst returns true if record belongs to inter-node flow and from destination node.
func isRecordFromDst(record entities.Record) bool {
if isRecordIntraNode(record) {
dstIEWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
if !exist || dstIEWithValue.Value == "" {
return false
}
ieWithValue, exist := record.GetInfoElementWithValue("sourcePodName")
if exist && ieWithValue.Value == "" {
return true
srcIEWithValue, exist := record.GetInfoElementWithValue("sourcePodName")
if exist && srcIEWithValue.Value != "" {
return false
}
return false
return true
}

func areRecordsFromSameNode(record1 entities.Record, record2 entities.Record) bool {
Expand Down
Loading

0 comments on commit cd87dd3

Please sign in to comment.