Skip to content

Commit

Permalink
Fix deadlock when accessing dirtyRules in fqdn controller (antrea-io#…
Browse files Browse the repository at this point in the history
…5566)

Signed-off-by: Dyanngg <[email protected]>
  • Loading branch information
Dyanngg authored Oct 16, 2023
1 parent 0be4217 commit 62a440a
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type ruleRealizationUpdate struct {
// ruleSyncTracker tracks the realization status of FQDN rules that are
// applied to workloads on this Node.
type ruleSyncTracker struct {
mutex sync.Mutex
mutex sync.RWMutex
// updateCh is the channel used by the rule reconciler to report rule realization status.
updateCh chan ruleRealizationUpdate
// ruleToSubscribers keeps track of the subscribers that are currently subscribed
Expand Down Expand Up @@ -497,14 +497,12 @@ func (f *fqdnController) syncDirtyRules(fqdn string, waitCh chan error, addressU
utilsets.MergeString(dirtyRules, f.selectorItemToRuleIDs[selectorItem])
}
if !addressUpdate {
f.ruleSyncTracker.mutex.Lock()
defer f.ruleSyncTracker.mutex.Unlock()
// If there is no address update for this FQDN, and rules selecting this FQDN
// were all previously realized successfully, then there will be no dirty rules
// left to be synced. On the contrary, if some rules that select this FQDN are
// still in the dirtyRules set of the ruleSyncTracker, then only those rules
// should be retried for reconciliation, and packetOut shall be blocked.
dirtyRules = f.ruleSyncTracker.dirtyRules.Intersection(dirtyRules)
dirtyRules = f.ruleSyncTracker.getDirtyRules().Intersection(dirtyRules)
}
if len(dirtyRules) > 0 {
klog.V(4).InfoS("Dirty rules blocking packetOut", "dirtyRules", dirtyRules)
Expand Down Expand Up @@ -533,6 +531,13 @@ func (rst *ruleSyncTracker) subscribe(waitCh chan error, dirtyRules sets.Set[str
}
}

// getDirtyRules retrieves the current dirty rule set of ruleSyncTracker.
func (rst *ruleSyncTracker) getDirtyRules() sets.Set[string] {
rst.mutex.RLock()
defer rst.mutex.RUnlock()
return rst.dirtyRules
}

func (rst *ruleSyncTracker) Run(stopCh <-chan struct{}) {
for {
select {
Expand Down
134 changes: 134 additions & 0 deletions pkg/agent/controller/networkpolicy/fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package networkpolicy

import (
"context"
"fmt"
"net"
"testing"
"time"
Expand Down Expand Up @@ -384,3 +385,136 @@ func TestGetIPsForFQDNSelectors(t *testing.T) {
})
}
}

func TestSyncDirtyRules(t *testing.T) {
testFQDN := "test.antrea.io"
selectorItem := fqdnSelectorItem{
matchName: testFQDN,
}
testFQDN2 := "dev.antrea.io"
selectorItem2 := fqdnSelectorItem{
matchName: testFQDN2,
}
tests := []struct {
name string
fqdnsToSync []string
waitChs []chan error
addressUpdates []bool
prevDirtyRules sets.Set[string]
notifications []ruleRealizationUpdate
expectedDirtyRuleSyncCalls []string
expectedDirtyRulesRemaining sets.Set[string]
expectErr bool
}{
{
name: "test non-blocking dirty rule sync with address update",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.New[string](),
addressUpdates: []bool{true},
waitChs: []chan error{nil},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", nil}},
expectedDirtyRuleSyncCalls: []string{"1", "2"},
expectedDirtyRulesRemaining: sets.New[string](),
expectErr: false,
},
{
name: "test blocking dirty rule sync with address update",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.New[string](),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{true},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", nil}},
expectedDirtyRuleSyncCalls: []string{"1", "2"},
expectedDirtyRulesRemaining: sets.New[string](),
expectErr: false,
},
{
name: "test blocking dirty rule sync with failed rule realization",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.New[string](),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{true},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", fmt.Errorf("ovs err")}},
expectedDirtyRuleSyncCalls: []string{"1", "2"},
expectedDirtyRulesRemaining: sets.New[string]("2"),
expectErr: true,
},
{
name: "test blocking dirty rule sync without address update but previously failed rule realization",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.New[string]("2"),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{false},
notifications: []ruleRealizationUpdate{{"2", nil}},
expectedDirtyRuleSyncCalls: []string{"2"},
expectedDirtyRulesRemaining: sets.New[string](),
expectErr: false,
},
{
name: "test blocking dirty rule sync without address update",
fqdnsToSync: []string{testFQDN},
prevDirtyRules: sets.New[string](),
waitChs: []chan error{make(chan error, 1)},
addressUpdates: []bool{false},
notifications: []ruleRealizationUpdate{},
expectedDirtyRuleSyncCalls: []string{},
expectedDirtyRulesRemaining: sets.New[string](),
expectErr: false,
},
{
name: "test blocking single dirty rule multiple FQDN concurrent updates",
fqdnsToSync: []string{testFQDN, testFQDN2},
prevDirtyRules: sets.New[string](),
waitChs: []chan error{make(chan error, 1), make(chan error, 1)},
addressUpdates: []bool{true, false},
notifications: []ruleRealizationUpdate{{"1", nil}, {"2", nil}},
expectedDirtyRuleSyncCalls: []string{"1", "2", "2"},
expectedDirtyRulesRemaining: sets.New[string](),
expectErr: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)
f, _ := newMockFQDNController(t, controller, nil)
var dirtyRuleSyncCalls []string
f.dirtyRuleHandler = func(s string) {
dirtyRuleSyncCalls = append(dirtyRuleSyncCalls, s)
}
f.addFQDNSelector("1", []string{testFQDN})
f.addFQDNSelector("2", []string{testFQDN})
f.addFQDNSelector("2", []string{testFQDN2})
f.setFQDNMatchSelector(testFQDN, selectorItem)
f.setFQDNMatchSelector(testFQDN2, selectorItem2)
// This simulates failed rule syncs in previous syncDirtyRules() calls
if len(tc.prevDirtyRules) > 0 {
f.ruleSyncTracker.dirtyRules = tc.prevDirtyRules
}
stopCh := make(chan struct{})
defer close(stopCh)
go f.runRuleSyncTracker(stopCh)

for i, fqdn := range tc.fqdnsToSync {
f.syncDirtyRules(fqdn, tc.waitChs[i], tc.addressUpdates[i])
}
for _, update := range tc.notifications {
f.ruleSyncTracker.updateCh <- update
}
assert.ElementsMatch(t, tc.expectedDirtyRuleSyncCalls, dirtyRuleSyncCalls)
for _, waitCh := range tc.waitChs {
if waitCh != nil {
assert.Eventually(t, func() bool {
select {
case err := <-waitCh:
if err != nil && !tc.expectErr {
return false
}
}
return true
}, ruleRealizationTimeout, time.Millisecond*10, "Failed to successfully wait for rule syncs")
}
}
assert.Equal(t, tc.expectedDirtyRulesRemaining, f.ruleSyncTracker.getDirtyRules())
})
}
}

0 comments on commit 62a440a

Please sign in to comment.