Skip to content

Commit

Permalink
Merge branch 'main' into zyiou/ci_security
Browse files Browse the repository at this point in the history
  • Loading branch information
zyiou authored Apr 21, 2021
2 parents ca730d3 + 3b4806b commit ea6bbf6
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 42 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@ All notable changes to this project will be documented in this file. The format
Changelog](https://keepachangelog.com/en/1.0.0/).

## Unreleased
## 0.5.0 04-16-2020
Includes all the bug fixes from [0.4.1](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#041-12-09-2020),
[0.4.2](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#042-12-15-2020),
[0.4.3](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#043-02-04-2021),
[0.4.4](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#044-02-10-2021),
[0.4.5](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#045-02-17-2021),
[0.4.6](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#046-02-25-2021),
[0.4.7](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#047-03-15-2021),
and [0.4.8](https://github.com/vmware/go-ipfix/blob/main/CHANGELOG.md#048-03-19-2021).
### Added
- Added Kafka Producer that is initialized given the address of Kafka broker
system. It gathers the IPFIX messages from the collecting process
and turns them into Kafka messages. (#88, @stati)
- Demonstrate the ability to support multiple proto schemas in Kafka Producer. (#99, @stati)
- Add new fields to Antrea Registry for enhancing network policy info and adding
all the tcp states of the connection. (#165, @zyiou)
### Changed
- Change the name of master branch to main. (#144, @zyiou)
- Change the names of Flow Types. (#171, @zyiou)
- Enhance the debug logs with useful info. (#170, @zyiou)
### Fixed
- Fix the default expiration time of TLS certificates in tests by increasing it
from one month to one year. (#127, @zyiou)
- Fix the code in pkg/producer when cherrypicking commits from v0.4.5 release. (#143, @stati)
- Fix the branch name in go-ipfix collector image workflow. (#160, @stati)
- Fix an issue of cleaning up slice in the IPFIX set Reset method. (#163, @stati)
## 0.4.8 03-19-2021
### Changed
- Move from klog to klog/v2. (#155, @zyiou)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.5.0-dev
v0.6.0-dev
29 changes: 8 additions & 21 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,32 +148,19 @@ func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack)
return nil
}

// GetLastUpdatedTimeOfFlow provides the last updated time in the format of IPFIX
// field "flowEndSeconds".
func (a *AggregationProcess) GetLastUpdatedTimeOfFlow(flowKey FlowKey) (uint32, error) {
func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey) {
a.mutex.Lock()
defer a.mutex.Unlock()
record, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return 0, fmt.Errorf("flow key is not present in the map")
}
flowEndField, exists := record.Record.GetInfoElementWithValue("flowEndSeconds")
if exists {
return flowEndField.Value.(uint32), nil
} else {
return 0, fmt.Errorf("flowEndSeconds field is not present in the record")
}
delete(a.flowKeyRecordMap, flowKey)
}

func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) error {
a.mutex.Lock()
defer a.mutex.Unlock()
_, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return fmt.Errorf("flow key is not present in the map")
}
// DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already
// acquired the lock. For example, this can be used in a callback of ForAllRecordsDo
// function.
// TODO:Remove this when there is notion of invalid flows supported in aggregation
// process.
func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey) {
delete(a.flowKeyRecordMap, flowKey)
return nil
}

// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
Expand Down
46 changes: 26 additions & 20 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,20 +515,21 @@ func TestCorrelateRecordsForInterNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, record2, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record2, record1, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
err = ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)

// Test IPv6 fields.
// Test the scenario, where record1 is added first and then record2.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, record2, true, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
Expand All @@ -548,8 +549,7 @@ func TestCorrelateRecordsForIntraNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false)
Expand All @@ -568,8 +568,7 @@ func TestCorrelateRecordsForToExternalFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, true)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, true).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, true)
Expand Down Expand Up @@ -617,15 +616,13 @@ func TestDeleteFlowKeyFromMapWithLock(t *testing.T) {
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
err := aggregationProcess.DeleteFlowKeyFromMap(flowKey2)
assert.Error(t, err)
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey2)
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
err = aggregationProcess.DeleteFlowKeyFromMap(flowKey1)
assert.NoError(t, err)
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey1)
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
messageChan := make(chan *entities.Message)
input := AggregationInput{
MessageChan: messageChan,
Expand All @@ -635,13 +632,22 @@ func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
aggregationProcess, _ := InitAggregationProcess(input)
message := createDataMsgForSrc(t, false, false, false, false)
flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678}
_, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.Error(t, err)
err = aggregationProcess.addOrUpdateRecordInMap(&flowKey1, message.GetSet().GetRecords()[0])
assert.NoError(t, err)
flowUpdatedTime, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.NoError(t, err)
assert.Equal(t, uint32(1), flowUpdatedTime)
flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678}
aggFlowRecord := AggregationFlowRecord{
message.GetSet().GetRecords()[0],
true,
true,
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey2)
aggregationProcess.mutex.Unlock()
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey1)
aggregationProcess.mutex.Unlock()
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, isToExternal bool) {
Expand Down

0 comments on commit ea6bbf6

Please sign in to comment.