Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Skip processing ADD events of init Pods and Namespaces #636

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions pkg/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ type NetworkPolicyController struct {
// heartbeatCh is an internal channel for testing. It's used to know whether all tasks have been
// processed, and to count executions of each function.
heartbeatCh chan heartbeat

// initPodSet caches UIDs of the Pods that exist before starting computation.
// We don't need to handle their ADD events as all AddressGroups and AppliedToGroups
// will be computed once at the beginning and they have counted these Pods in.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of "all AddressGroups and AppliedToGroups will be computed once at the beginning and they have counted these Pods in.", do you think it would be more accurate to say "all AddressGroups and AppliedToGroups will be computed when the ADD events for NetworkPolicies are processed and they will account for these Pods"?

initPodSet sets.String

// initNamespaceSet caches UIDs of the Namespaces that exist before starting computation.
// We don't need to handle their ADD events as all AddressGroups will be computed once at
// the beginning and they have counted these Namespaces in.
initNamespaceSet sets.String
}

type heartbeat struct {
Expand Down Expand Up @@ -177,25 +187,9 @@ func NewNetworkPolicyController(kubeClient clientset.Interface,
appliedToGroupQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "appliedToGroup"),
addressGroupQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "addressGroup"),
internalNetworkPolicyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "internalNetworkPolicy"),
initPodSet: sets.NewString(),
initNamespaceSet: sets.NewString(),
}
// Add handlers for Pod events.
podInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: n.addPod,
UpdateFunc: n.updatePod,
DeleteFunc: n.deletePod,
},
resyncPeriod,
)
// Add handlers for Namespace events.
namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: n.addNamespace,
UpdateFunc: n.updateNamespace,
DeleteFunc: n.deleteNamespace,
},
resyncPeriod,
)
// Add handlers for NetworkPolicy events.
networkPolicyInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -675,6 +669,9 @@ func (n *NetworkPolicyController) addPod(obj interface{}) {
defer n.heartbeat("addPod")
pod := obj.(*v1.Pod)
klog.V(2).Infof("Processing Pod %s/%s ADD event, labels: %v", pod.Namespace, pod.Name, pod.Labels)
if n.initPodSet.Has(string(pod.UID)) {
return
}
// Find all AppliedToGroup keys which match the Pod's labels.
appliedToGroupKeySet := n.filterAppliedToGroupsForPod(pod)
// Find all AddressGroup keys which match the Pod's labels.
Expand Down Expand Up @@ -776,6 +773,9 @@ func (n *NetworkPolicyController) addNamespace(obj interface{}) {
defer n.heartbeat("addNamespace")
namespace := obj.(*v1.Namespace)
klog.V(2).Infof("Processing Namespace %s ADD event, labels: %v", namespace.Name, namespace.Labels)
if n.initNamespaceSet.Has(string(namespace.UID)) {
return
}
addressGroupKeys := n.filterAddressGroupsForNamespace(namespace)
for group := range addressGroupKeys {
n.enqueueAddressGroup(group)
Expand Down Expand Up @@ -915,6 +915,36 @@ func (n *NetworkPolicyController) Run(stopCh <-chan struct{}) {
}
klog.Info("Caches are synced for NetworkPolicy controller")

// Initialize initPodSet first before registering Pod event handlers.
pods, _ := n.podLister.List(labels.Everything())
for i := range pods {
n.initPodSet.Insert(string(pods[i].UID))
}
// Add handlers for Pod events.
n.podInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: n.addPod,
UpdateFunc: n.updatePod,
DeleteFunc: n.deletePod,
},
resyncPeriod,
)

// Initialize initNamespaceSet first before registering Namespace event handlers.
namespaces, _ := n.namespaceLister.List(labels.Everything())
for i := range namespaces {
n.initNamespaceSet.Insert(string(namespaces[i].UID))
}
// Add handlers for Namespace events.
n.namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: n.addNamespace,
UpdateFunc: n.updateNamespace,
DeleteFunc: n.deleteNamespace,
},
resyncPeriod,
)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(n.appliedToGroupWorker, time.Second, stopCh)
go wait.Until(n.addressGroupWorker, time.Second, stopCh)
Expand Down Expand Up @@ -1208,11 +1238,11 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error {
// Lock the internal NetworkPolicy store as we may have a case where in the
// same internal NetworkPolicy is being updated in the NetworkPolicy UPDATE
// handler.
n.internalNetworkPolicyMutex.Lock()
n.internalNetworkPolicyMutex.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be an orthogonal change, do you think you can put it in a separate PR that we can merge right away?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we do update the store at L1271.. should this be R?

internalNPObj, found, err := n.internalNetworkPolicyStore.Get(key)
if !found {
// Make sure to unlock the store before returning.
n.internalNetworkPolicyMutex.Unlock()
n.internalNetworkPolicyMutex.RUnlock()
return fmt.Errorf("internal NetworkPolicy %s not found: %v", key, err)
}
internalNP := internalNPObj.(*antreatypes.NetworkPolicy)
Expand Down Expand Up @@ -1241,7 +1271,7 @@ func (n *NetworkPolicyController) syncInternalNetworkPolicy(key string) error {
n.internalNetworkPolicyStore.Update(updatedNetworkPolicy)
// Internal NetworkPolicy update is complete. Safe to unlock the
// critical section.
n.internalNetworkPolicyMutex.Unlock()
n.internalNetworkPolicyMutex.RUnlock()
if nodeNames.Equal(oldNodeNames) {
// Node span for internal NetworkPolicy was not modified. No need to enqueue
// AddressGroups.
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/networkpolicy/networkpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func newController(objects ...runtime.Object) (*fake.Clientset, *networkPolicyCo
addressGroupStore := store.NewAddressGroupStore()
internalNetworkPolicyStore := store.NewNetworkPolicyStore()
npController := NewNetworkPolicyController(client, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Namespaces(), informerFactory.Networking().V1().NetworkPolicies(), addressGroupStore, appliedToGroupStore, internalNetworkPolicyStore)
npController.podListerSynced = alwaysReady
npController.namespaceListerSynced = alwaysReady
npController.networkPolicyListerSynced = alwaysReady
//npController.podListerSynced = alwaysReady
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just checking: do we need to remove this because otherwise the List operations may not return the entire set of Pods / Namespaces?

do you know why we used alwaysReady in the first place, I can't remember?

//npController.namespaceListerSynced = alwaysReady
//npController.networkPolicyListerSynced = alwaysReady
return client, &networkPolicyController{
npController,
informerFactory.Core().V1().Pods().Informer().GetStore(),
Expand Down