-
Notifications
You must be signed in to change notification settings - Fork 370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change flow export mechanism for Flow Aggregator #1949
Change flow export mechanism for Flow Aggregator #1949
Conversation
de8c85e
to
4d15b90
Compare
Codecov Report
@@ Coverage Diff @@
## main #1949 +/- ##
===========================================
- Coverage 65.39% 42.01% -23.38%
===========================================
Files 197 125 -72
Lines 17217 15448 -1769
===========================================
- Hits 11259 6491 -4768
- Misses 4785 8415 +3630
+ Partials 1173 542 -631
Flags with carried forward coverage won't be shown. Click here to find out more.
|
4d15b90
to
845cf6a
Compare
845cf6a
to
886c22f
Compare
67995f5
to
02fe6a2
Compare
/test-all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments @zyiou
e2e tests are failing. Still working on fixing them. Moving the PR to draft state.
Reverting this commit 7c0a0ae. Reason is the PR(antrea-io/antrea#1949) that depends on this commit is not yet ready. And to move the tag of go-ipfix in Antrea for other PRs this commit should be present in the latest release tags. Therefore, reverting this.
Reverting this commit 7c0a0ae. Reason is the PR(antrea-io/antrea#1949) that depends on this commit is not yet ready. And to move the tag of go-ipfix in Antrea for other PRs this commit should be present in the latest release tags. Therefore, reverting this.
02fe6a2
to
5c3396a
Compare
Codecov Report
@@ Coverage Diff @@
## main #1949 +/- ##
==========================================
+ Coverage 61.30% 65.09% +3.79%
==========================================
Files 273 273
Lines 20646 20644 -2
==========================================
+ Hits 12656 13438 +782
+ Misses 6688 5823 -865
- Partials 1302 1383 +81
Flags with carried forward coverage won't be shown. Click here to find out more.
|
64bf933
to
f4bbf2a
Compare
/test-all |
/test-e2e |
pkg/flowaggregator/flowaggregator.go
Outdated
} | ||
|
||
func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) { | ||
expireTicker := time.NewTicker(fa.activeFlowRecordTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think a ticker is the best construct anymore? We essentially call fa.aggregationProcess.GetExpiryFromExpirePriorityQueue
and set the ticker to the returned value, but we will only use the ticker for a single tick... Maybe use a timer instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the timer makes more sense and it's simpler too. Modified.
@@ -609,7 +609,8 @@ func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollector string) error | |||
} | |||
flowAggregatorConf, _ := configMap.Data[flowAggregatorConfName] | |||
flowAggregatorConf = strings.Replace(flowAggregatorConf, "#externalFlowCollectorAddr: \"\"", fmt.Sprintf("externalFlowCollectorAddr: \"%s\"", ipfixCollector), 1) | |||
flowAggregatorConf = strings.Replace(flowAggregatorConf, "#flowExportInterval: 60s", "flowExportInterval: 5s", 1) | |||
flowAggregatorConf = strings.Replace(flowAggregatorConf, "#activeFlowRecordTimeout: 60s", "activeFlowRecordTimeout: 4s", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this change for? Maybe add a comment about this choice of values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment.
2576412
to
cf8e1ad
Compare
cf8e1ad
to
cd1e6ac
Compare
/test-all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the change.
What is left is to wait a v0.5.2 tag with klog fix. I don't know why it's not shown here.
pkg/flowaggregator/flowaggregator.go
Outdated
// Initializing exporting process fails, will retry in next exportInterval | ||
klog.Errorf("Error when initializing exporting process: %v, will retry in %s", err, fa.activeFlowRecordTimeout) | ||
// Initializing exporting process fails, will retry in next cycle. | ||
expireTimer = time.NewTimer(fa.activeFlowRecordTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can use expireTimer.Reset
to avoid creating new timers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not aware of the method.. Reset makes sense here.
pkg/flowaggregator/flowaggregator.go
Outdated
return err | ||
} | ||
|
||
klog.V(4).Infof("Data set sent successfully: %d Bytes sent.", sentBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove the trailing period to keep same log style
c08f7af
to
05e584b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Introduce active and inactive flow timeout configuration knobs. With active flow record timeout, every flow record is sent to the collector based on its own active expiry timeout value. With inactive flow record timeout, a flow record is sent to the collector if no records are seen by the flow aggregator for the flow, since it was last updated in the flow aggregator map. Used priority queue approach to keep track of individual expiry values for records. In addition, we changed the default active flow timeout at the flow exporter to 30s.
05e584b
to
36b301c
Compare
/test-ipv6-e2e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/test-all |
/test-ipv6-e2e |
IPv6 jenkins e2e test seem to be broken. There are v6 specific changes. Merging for now. |
Introduced active and inactive flow timeout configuration knobs.
With active flow record timeout, every flow record is sent to the collector based on its own active expiry timeout value.
With inactive flow record timeout, a flow record is sent to the collector if no records are seen by the flow aggregator for the flow since it was last updated.
Followed the mechanism proposed in RFC: https://tools.ietf.org/html/rfc6183#section-5.3.1
Used priority queue approach to keep track of individual expiry values for records.
In addition, changing the Flow Exporter
activeFlowTimeout
to 30s from 60s, to keep it lower than theactiveFlowTimeout
at Flow Aggregator.Fixes: #1637