Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify correlation and add stats support in aggregation process #99

Merged
merged 3 commits into from
Dec 8, 2020

Conversation

srikartati
Copy link
Contributor

@srikartati srikartati commented Dec 3, 2020

  • Create AggregationFlowRecord structure to store required metadata
  • Simplify the correlation process, so that we just store one record
  • Add stats support in Aggregation process.
  • There are issues with the present code for intra-node flows that do not need correlation. This PR fixes that.
  • Add tests for intra-node flows and stats support.

@codecov
Copy link

codecov bot commented Dec 3, 2020

Codecov Report

Merging #99 (cd87dd3) into master (72176b5) will decrease coverage by 1.31%.
The diff coverage is 69.41%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #99      +/-   ##
==========================================
- Coverage   80.26%   78.94%   -1.32%     
==========================================
  Files          13       13              
  Lines        1591     1729     +138     
==========================================
+ Hits         1277     1365      +88     
- Misses        212      242      +30     
- Partials      102      122      +20     
Flag Coverage Δ
integration-tests 57.77% <25.32%> (-9.31%) ⬇️
unit-tests 78.36% <69.41%> (-1.27%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
pkg/intermediate/aggregate.go 68.79% <66.23%> (-6.78%) ⬇️
pkg/registry/registry_antrea.go 100.00% <100.00%> (ø)

@srikartati
Copy link
Contributor Author

@zyiou
I modified the correlation part of the code to make it slightly simpler and to maintain some metadata.
I added some comments where stats can updated and maintained. PTAL.
Since you started working on the stats part, want to make sure the approach is as expected.
I can fill up the stats and timestamp code too, if you haven't made too much progress on that front.

@srikartati
Copy link
Contributor Author

srikartati commented Dec 3, 2020

Please consider this as a priority in reviewing. It is an important PR for the flow aggregator process. It will be great if we can ship it with go-ipfix v0.4.0.

Copy link
Contributor

@zyiou zyiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Current changes make sense to me. The only concern is that for inter-node case, if two consecutive records are from source node without receiving record from destination node, it will discard the second record from source node (only update the stats).

I can fill up the stats and timestamp code too, if you haven't made too much progress on that front.

Since the structure is changed quite a bit, I may need to start over for stats. It will be helpful if you have time to do that. I can also pick it up next week. Thanks!

Comment on lines 147 to 149
} else {
// If the record from the node is already present, update the stats
// and timestamps.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will discard this record then? there would be only one data point during two intervals if we consider the visualization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not discarding the record as the previous record from the same node (either source/destination) will have the same flow key fields and metadata fields except for new stats and timestamps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. we will only update existing record based on upcoming record. what if the existing record has not been exported and the upcoming record comes? will flow aggregator export one record instead of two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exporter in flow aggregator will export one record instead of two. The most updated version of it.

dummyIP := net.IP{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
if ipInString == dummyIP.To16().String() {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)
ipInString := existingIeWithValue.Value.(net.IP).To4().String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be To16() right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}
case entities.Ipv4Address:
ipInString := ieWithValue.Value.(net.IP).To4().String()
if ipInString == "0. 0. 0. 0" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be no space between 0. and 0.

if ipInString == "0. 0. 0. 0" {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)
ipInString := existingIeWithValue.Value.(net.IP).To4().String()
if ipInString != "0. 0. 0. 0" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

}
case entities.Ipv4Address:
ipInString := ieWithValue.Value.(net.IP).To4().String()
if ipInString == "0. 0. 0. 0" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And == is supposed to be !=, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -112,8 +117,8 @@ func createMsgwithDataSet2() *entities.Message {
ie5 := entities.NewInfoElementWithValue(entities.NewInfoElement("protocolIdentifier", 4, 1, 0, 1), proto)
ie6 := entities.NewInfoElementWithValue(entities.NewInfoElement("sourcePodName", 101, 13, 55829, 65535), srcPod)
ie7 := entities.NewInfoElementWithValue(entities.NewInfoElement("destinationPodName", 103, 13, 55829, 65535), dstPod)
ie8 := entities.NewInfoElementWithValue(entities.NewInfoElement("destinationClusterIP", 106, 18, 55829, 4), nil)
ie9 := entities.NewInfoElementWithValue(entities.NewInfoElement("destinationServicePort", 107, 2, 55829, 2), nil)
ie8 := entities.NewInfoElementWithValue(entities.NewInfoElement("destinationClusterIPv4", 106, 18, 55829, 4), net.IP{0, 0, 0, 0})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

net.IP{0,0,0,0} => bytes.NewBuffer(net.IP{0, 0, 0, 0})

Comment on lines 274 to 275
aggregationProcess.aggregateRecord(*flowKey1, record1)
aggregationProcess.aggregateRecord(*flowKey2, record2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic problem when correlating ipv4 fields is not reflected in the unit tests. I think we should add a case that reverse the sequence of sending these two records, which will cover the case that destination record is received first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there was a typo for the case of uint and ints in L187, it carried over.
We are missing that scenario. In addition, we are also missing tests for intra-node flows. Was planning to add them.

- Create AggregationFlowRecord structure to store required metadata
- Change correlation process, so that we can store one record
- Can be easily extended to maintain stats.
@srikartati srikartati changed the title Modify correlation in aggregation process Modify correlation and add stats support in aggregation process Dec 4, 2020
@srikartati srikartati requested a review from zyiou December 4, 2020 20:21
Copy link
Contributor

@zyiou zyiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the stats part.
I have a question about the concept of the aggregated stats, e.g. for octetDeltaCountFromSourceNode, we are summing up all the octetDeltaCount if the record is from source node, no matter what the source node is, right? What would be the use cases for that? I thought we would aggregate based on current source node name. Just want to make sure about this. Thanks!

}
// Update the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index i in L297 and L306 is under the assumption that provided statsElements, antreaSourceStatsElements, and antreaDestinationStatsElements should be matched respectively, right? maybe we can add the comment in the type.go

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would suggest to make statsElementList, antreaSourceStatsElements, and antreaDestinationStatsElements all independent string lists. Also it looks like there should be no duplicated elements in these three lists, otherwise the stats could be double or triple counted?

Copy link
Contributor Author

@srikartati srikartati Dec 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this part needs to have more checks that validate the assumptions taken in the code. As Antrea is the only user, we control the input for now. In addition, code could be changed to make it more generic as well for example flow records with different templates--at that time assumptions will change.
I want to add a TODO and move on, considering the time factor. What do you say?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with adding TODO comment.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK too

type AggregationInput struct {
MessageChan chan *entities.Message
WorkerNum int
CorrelateFields []string
aggregateElements *AggregationElements

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a private member, so callers can not pass it.
Do we need it in the public input struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
case entities.Unsigned16:
if ieWithValue.Value != uint16(0) {
Copy link

@shihhaoli shihhaoli Dec 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could 0 be a valid value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For example, for pod-to-pod flow the destinationServicePort (k8s service port) is not set.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 0 could be a valid value in some cases, do we still skip the storing when the incoming record has 0 value ? Or the 0 value actually has no useful info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we do not skip storing it, we store everything sent by exporter at collector. If there is a field in IPFIX template, the exporter has to send some value, otherwise collector cannot validate the corresponding IPFIX data record. 0 value of course do not have useful info, but also needed in some case where it has to be correlated with its corresponding record and filled up.

We have to use and manage multiple templates if the field has no use for specific use case for example destinationServicePortName/destinationService are not applicable for Pod-To-Pod flows. Currently, we use only one template in Antrea, the user of go-ipfix library.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here the default value for unint16 type is 0, so only a non-zero value from incoming record needs to be stored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we only need to correlate non-zero uint16 value and fill up the existing record in flowKeyRecord map.

if ieWithValue, exist := incomingRecord.GetInfoElementWithValue(field); exist {
switch ieWithValue.Element.Name {
case "flowEndSeconds":
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no need to check if the field exists in existing record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes not needed as we do the check, we already check for this in addFieldsForStatsAggregation. The assumption is that all the flow records for a given flow follow the same template.

if ieWithValue, exist := record.GetInfoElementWithValue(element); exist {
// Initialize the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := record.GetInfoElementWithValue(antreaSourceStatsElements[i])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as the previous one for here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well we check in the add new fields function and presume that records come with same template.

func isRecordFromSrc(record entities.Record) bool {
if isRecordIntraNode(record) {
return false
}
Copy link

@shihhaoli shihhaoli Dec 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me we maybe doing a checking on dst twice in some cases, such as src != "" && dst == ""?
How about doing something like the following?

func isRecordFromSrc(record entities.Record) bool {
	srcIEWithValue, exist := record.GetInfoElementWithValue("sourcePodName")
	if !exist || srcIEWithValue.Value == "" {
                return false
        }
        dstIEWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
	if exist && dstIEWithValue.Value != "" {
		return false
	}
	return true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Done.

return false
}
ieWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
if exist && ieWithValue.Value == "" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be
if !exist || ieWithValue.Value == "" {
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect destinationPodName to be present. Again we presume same template use case on both source and destination nodes.
In future, we have to extend where there are different templates for source and destination node, but we probably have to consider different cases. We need to have one template intra-node flow record, one template for inter-node source node flow record, one template for inter-node destination flow record etc. This case is not supported.

func isRecordFromDst(record entities.Record) bool {
if isRecordIntraNode(record) {
return false
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return err
}
value := new(bytes.Buffer)
if err = util.Encode(value, binary.BigEndian, uint64(0)); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, does the 64-bit integer here cover InfoElement ID, field length, and enterprise number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just initializes the value for new stat fields added for source and destination.

@srikartati
Copy link
Contributor Author

Thanks for adding the stats part.
I have a question about the concept of the aggregated stats, e.g. for octetDeltaCountFromSourceNode, we are summing up all the octetDeltaCount if the record is from source node, no matter what the source node is, right? What would be the use cases for that? I thought we would aggregate based on current source node name. Just want to make sure about this. Thanks!

Let me understand when you say different source node. At any point of time there is only one source node for a given flow in the cluster.
Presumption is 5-tuple key unique to the flow. If the pod destroys and comes again on a different node.. there will be a new flow. Source port varies and IP can be different.
As we are aggregating records for same flow key, source node stays the same.
Ultimately when the record from aggregation process is exported, user need to reset the stats.

@zyiou
Copy link
Contributor

zyiou commented Dec 5, 2020

Let me understand when you say different source node. At any point of time there is only one source node for a given flow in the cluster.
Presumption is 5-tuple key unique to the flow. If the pod destroys and comes again on a different node.. there will be a new flow. Source port varies and IP can be different.
As we are aggregating records for same flow key, source node stays the same.
Ultimately when the record from aggregation process is exported, user need to reset the stats.

Sorry I forgot the context we are aggregating under the same flow key. Then it makes sense to me. So the octetTotalCountFromSourceNode is actually total octet count from source node for this specific flow, but not total octet count for all flows whose source is current node, right?

@srikartati
Copy link
Contributor Author

Let me understand when you say different source node. At any point of time there is only one source node for a given flow in the cluster.
Presumption is 5-tuple key unique to the flow. If the pod destroys and comes again on a different node.. there will be a new flow. Source port varies and IP can be different.
As we are aggregating records for same flow key, source node stays the same.
Ultimately when the record from aggregation process is exported, user need to reset the stats.

Sorry I forgot the context we are aggregating under the same flow key. Then it makes sense to me. So the octetTotalCountFromSourceNode is actually total octet count from source node for this specific flow, but not total octet count for all flows whose source is current node, right?

Yup that's correct.

Copy link
Contributor Author

@srikartati srikartati left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @shihhaoli

Addressed comments.

type AggregationInput struct {
MessageChan chan *entities.Message
WorkerNum int
CorrelateFields []string
aggregateElements *AggregationElements
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
case entities.Unsigned16:
if ieWithValue.Value != uint16(0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we do not skip storing it, we store everything sent by exporter at collector. If there is a field in IPFIX template, the exporter has to send some value, otherwise collector cannot validate the corresponding IPFIX data record. 0 value of course do not have useful info, but also needed in some case where it has to be correlated with its corresponding record and filled up.

We have to use and manage multiple templates if the field has no use for specific use case for example destinationServicePortName/destinationService are not applicable for Pod-To-Pod flows. Currently, we use only one template in Antrea, the user of go-ipfix library.

if ieWithValue, exist := incomingRecord.GetInfoElementWithValue(field); exist {
switch ieWithValue.Element.Name {
case "flowEndSeconds":
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes not needed as we do the check, we already check for this in addFieldsForStatsAggregation. The assumption is that all the flow records for a given flow follow the same template.

}
// Update the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
Copy link
Contributor Author

@srikartati srikartati Dec 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this part needs to have more checks that validate the assumptions taken in the code. As Antrea is the only user, we control the input for now. In addition, code could be changed to make it more generic as well for example flow records with different templates--at that time assumptions will change.
I want to add a TODO and move on, considering the time factor. What do you say?

return err
}
value := new(bytes.Buffer)
if err = util.Encode(value, binary.BigEndian, uint64(0)); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just initializes the value for new stat fields added for source and destination.

if ieWithValue, exist := record.GetInfoElementWithValue(element); exist {
// Initialize the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := record.GetInfoElementWithValue(antreaSourceStatsElements[i])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well we check in the add new fields function and presume that records come with same template.

func isRecordFromSrc(record entities.Record) bool {
if isRecordIntraNode(record) {
return false
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Done.

return false
}
ieWithValue, exist := record.GetInfoElementWithValue("destinationPodName")
if exist && ieWithValue.Value == "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect destinationPodName to be present. Again we presume same template use case on both source and destination nodes.
In future, we have to extend where there are different templates for source and destination node, but we probably have to consider different cases. We need to have one template intra-node flow record, one template for inter-node source node flow record, one template for inter-node destination flow record etc. This case is not supported.

func isRecordFromDst(record entities.Record) bool {
if isRecordIntraNode(record) {
return false
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@srikartati srikartati requested a review from zyiou December 7, 2020 21:16
Copy link
Contributor

@zyiou zyiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. just ensure the TODO comment is added. Thanks!

Comment on lines 71 to 80
return &AggregationProcess{
make(map[FlowKey][]entities.Record),
make(map[FlowKey]AggregationFlowRecord),
sync.RWMutex{},
input.MessageChan,
input.WorkerNum,
make([]*worker, 0),
input.CorrelateFields,
input.AggregateElements,
make(chan bool),
}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it will be better to specify corresponding field names here for future maintenance.

}
// Update the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with adding TODO comment.

@srikartati
Copy link
Contributor Author

srikartati commented Dec 8, 2020

LGTM. just ensure the TODO comment is added. Thanks!

It was added in L45. Thanks for the review.

@srikartati srikartati merged commit acbdcfa into vmware:master Dec 8, 2020
@srikartati srikartati deleted the fix_correlation branch December 8, 2020 05:23
zyiou pushed a commit to zyiou/go-ipfix that referenced this pull request Feb 5, 2021
…re#99)

* Modify correlation in aggregation process

- Create AggregationFlowRecord structure to store required metadata
- Change correlation process, so that we can store one record
- Can be easily extended to maintain stats.
- Add stats support for intermediate process
- Cleanup of existing unit tests and add more unit tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants