Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Modify Aggregation process methods (#158)" #172

Merged
merged 1 commit into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,32 +148,19 @@ func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack)
return nil
}

// GetLastUpdatedTimeOfFlow provides the last updated time in the format of IPFIX
// field "flowEndSeconds".
func (a *AggregationProcess) GetLastUpdatedTimeOfFlow(flowKey FlowKey) (uint32, error) {
func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey) {
a.mutex.Lock()
defer a.mutex.Unlock()
record, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return 0, fmt.Errorf("flow key is not present in the map")
}
flowEndField, exists := record.Record.GetInfoElementWithValue("flowEndSeconds")
if exists {
return flowEndField.Value.(uint32), nil
} else {
return 0, fmt.Errorf("flowEndSeconds field is not present in the record")
}
delete(a.flowKeyRecordMap, flowKey)
}

func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) error {
a.mutex.Lock()
defer a.mutex.Unlock()
_, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return fmt.Errorf("flow key is not present in the map")
}
// DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already
// acquired the lock. For example, this can be used in a callback of ForAllRecordsDo
// function.
// TODO:Remove this when there is notion of invalid flows supported in aggregation
// process.
func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey) {
delete(a.flowKeyRecordMap, flowKey)
return nil
}

// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
Expand Down
46 changes: 26 additions & 20 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,20 +515,21 @@ func TestCorrelateRecordsForInterNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, record2, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record2, record1, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
err = ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)

// Test IPv6 fields.
// Test the scenario, where record1 is added first and then record2.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, record2, true, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
Expand All @@ -548,8 +549,7 @@ func TestCorrelateRecordsForIntraNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false)
Expand All @@ -568,8 +568,7 @@ func TestCorrelateRecordsForToExternalFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, true)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, true).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, true)
Expand Down Expand Up @@ -617,15 +616,13 @@ func TestDeleteFlowKeyFromMapWithLock(t *testing.T) {
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
err := aggregationProcess.DeleteFlowKeyFromMap(flowKey2)
assert.Error(t, err)
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey2)
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
err = aggregationProcess.DeleteFlowKeyFromMap(flowKey1)
assert.NoError(t, err)
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey1)
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
messageChan := make(chan *entities.Message)
input := AggregationInput{
MessageChan: messageChan,
Expand All @@ -635,13 +632,22 @@ func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
aggregationProcess, _ := InitAggregationProcess(input)
message := createDataMsgForSrc(t, false, false, false, false)
flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678}
_, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.Error(t, err)
err = aggregationProcess.addOrUpdateRecordInMap(&flowKey1, message.GetSet().GetRecords()[0])
assert.NoError(t, err)
flowUpdatedTime, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.NoError(t, err)
assert.Equal(t, uint32(1), flowUpdatedTime)
flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678}
aggFlowRecord := AggregationFlowRecord{
message.GetSet().GetRecords()[0],
true,
true,
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey2)
aggregationProcess.mutex.Unlock()
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey1)
aggregationProcess.mutex.Unlock()
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, isToExternal bool) {
Expand Down