Skip to content

Commit

Permalink
Realize Egress for a Pod once its network is created
Browse files Browse the repository at this point in the history
Previously antrea-controller only included a Pod in an EgressGroup only
when its IP has been presented in K8s API. If a Pod tries to access
external right after it's up, Node IP will be used as the SNAT IP even
when an Egress applying to it has been created because its Pod IP may
haven't been reported to K8s API or antrea-controller may haven't
included the Pod in the EgressGroup.

This patch fixes it by making CNIServer notify EgressController that it
has processed CNI ADD request of a Pod, then EgressController can
reconcile the corresponding Egress immediately, instead of waiting for
the Pod to be reported to K8s API. As NetworkPolicyController relies on
that event as well, we introduce a channel implementation which supports
multiple subscribers.

Fixes #3361

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Feb 25, 2022
1 parent 26c039c commit 8697a9d
Show file tree
Hide file tree
Showing 19 changed files with 377 additions and 108 deletions.
20 changes: 11 additions & 9 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
"antrea.io/antrea/pkg/agent/stats"
"antrea.io/antrea/pkg/agent/types"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/controller/externalippool"
"antrea.io/antrea/pkg/features"
Expand All @@ -62,6 +61,7 @@ import (
ofconfig "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/cipher"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/version"
Expand Down Expand Up @@ -258,10 +258,10 @@ func run(o *Options) error {
}
}

// entityUpdates is a channel for receiving entity updates from CNIServer and
// notifying NetworkPolicyController to reconcile rules related to the
// updated entities.
entityUpdates := make(chan types.EntityReference, 100)
// podUpdateChannel is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController and EgressController to reconcile rules
// related to the updated Pods.
podUpdateChannel := channel.NewChannel("PodUpdate", 100)
// We set flow poll interval as the time interval for rule deletion in the async
// rule cache, which is implemented as part of the idAllocator. This is to preserve
// the rule info for populating NetworkPolicy fields in the Flow Exporter even
Expand All @@ -279,7 +279,7 @@ func run(o *Options) error {
ofClient,
ifaceStore,
nodeConfig.Name,
entityUpdates,
podUpdateChannel,
groupCounters,
groupIDUpdates,
antreaPolicyEnabled,
Expand Down Expand Up @@ -329,7 +329,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.Egress) {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer,
memberlistCluster, egressInformer, podUpdateChannel,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down Expand Up @@ -367,12 +367,12 @@ func run(o *Options) error {
var cniPodInfoStore cnipodcache.CNIPodInfoStore
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
cniPodInfoStore = cnipodcache.NewCNIPodInfoStore()
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, cniPodInfoStore)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, nil)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
}
Expand Down Expand Up @@ -461,6 +461,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go podUpdateChannel.Run(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)
Expand Down
31 changes: 14 additions & 17 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -63,9 +62,9 @@ type podConfigurator struct {
ifaceStore interfacestore.InterfaceStore
gatewayMAC net.HardwareAddr
ifConfigurator *ifConfigurator
// entityUpdates is a channel for notifying updates of local endpoints / entities (most notably Pod)
// to other components which may benefit from this information, i.e NetworkPolicyController.
entityUpdates chan<- types.EntityReference
// podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this
// information, i.e. NetworkPolicyController, EgressController.
podUpdateNotifier channel.Notifier
// consumed by secondary network creation.
podInfoStore cnipodcache.CNIPodInfoStore
}
Expand All @@ -78,22 +77,22 @@ func newPodConfigurator(
gatewayMAC net.HardwareAddr,
ovsDatapathType ovsconfig.OVSDatapathType,
isOvsHardwareOffloadEnabled bool,
entityUpdates chan<- types.EntityReference,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled)
if err != nil {
return nil, err
}
return &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
entityUpdates: entityUpdates,
podInfoStore: podInfoStore,
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podInfoStore: podInfoStore,
}, nil
}

Expand Down Expand Up @@ -486,9 +485,7 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta
// Add containerConfig into local cache
pc.ifaceStore.AddInterface(containerConfig)
// Notify the Pod update event to required components.
pc.entityUpdates <- types.EntityReference{
Pod: &v1beta2.PodReference{Name: containerConfig.PodName, Namespace: containerConfig.PodNamespace},
}
pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName))
return nil
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/util/k8s"
)

Expand All @@ -51,9 +49,7 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I
// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
// Notify the Pod update event to required components.
pc.entityUpdates <- types.EntityReference{
Pod: &v1beta2.PodReference{Name: ifConfig.PodName, Namespace: ifConfig.PodNamespace},
}
pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName))
return nil
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
)

const (
Expand Down Expand Up @@ -589,7 +589,7 @@ func (s *CNIServer) Initialize(
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
ifaceStore interfacestore.InterfaceStore,
entityUpdates chan<- types.EntityReference,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) error {
var err error
Expand All @@ -602,7 +602,7 @@ func (s *CNIServer) Initialize(

s.podConfigurator, err = newPodConfigurator(
ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), entityUpdates,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), podUpdateNotifier,
podInfoStore,
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
antreatypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
)

const (
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestValidatePrevResult(t *testing.T) {
cniConfig.Netns = "invalid_netns"
sriovVFDeviceID := ""
prevResult.Interfaces = []*current.Interface{hostIface, containerIface}
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, make(chan antreatypes.EntityReference, 100), nil)
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, channel.NewChannel("PodUpdate", 100), nil)
response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID)
checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "")
})
Expand All @@ -411,7 +411,7 @@ func TestValidatePrevResult(t *testing.T) {
cniConfig.Netns = "invalid_netns"
sriovVFDeviceID := "0000:03:00.6"
prevResult.Interfaces = []*current.Interface{hostIface, containerIface}
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, make(chan antreatypes.EntityReference, 100), nil)
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, channel.NewChannel("PodUpdate", 100), nil)
response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID)
checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "")
})
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestRemoveInterface(t *testing.T) {
ifaceStore := interfacestore.NewInterfaceStore()
routeMock := routetest.NewMockInterface(controller)
gwMAC, _ := net.ParseMAC("00:00:11:11:11:11")
podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, make(chan antreatypes.EntityReference, 100), nil)
podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, channel.NewChannel("PodUpdate", 100), nil)
require.Nil(t, err, "No error expected in podConfigurator constructor")

containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff")
Expand Down
19 changes: 17 additions & 2 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
Expand All @@ -48,6 +47,7 @@ import (
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2"
"antrea.io/antrea/pkg/controller/metrics"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -154,7 +154,7 @@ func NewEgressController(
nodeTransportInterface string,
cluster *memberlist.Cluster,
egressInformer crdinformers.EgressInformer,
nodeInformer coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
) (*EgressController, error) {
c := &EgressController{
ofClient: ofClient,
Expand Down Expand Up @@ -207,11 +207,26 @@ func NewEgressController(
},
resyncPeriod,
)
// Subscribe Pod update events from CNIServer to enforce Egress earlier, instead of waiting for their IPs are
// reported to kube-apiserver and processed by antrea-controller.
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool)
return c, nil
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(pod string) {
c.egressBindingsMutex.Lock()
defer c.egressBindingsMutex.Unlock()
binding, exists := c.egressBindings[pod]
if !exists {
return
}
c.queue.Add(binding.effectiveEgress)
}

// addEgress processes Egress ADD events.
func (c *EgressController) addEgress(obj interface{}) {
egress := obj.(*crdv1a2.Egress)
Expand Down
51 changes: 51 additions & 0 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned"
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ type fakeController struct {
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.Channel
}

func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController {
Expand All @@ -114,6 +116,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
addPodInterface(ifaceStore, "ns3", "pod3", 3)
addPodInterface(ifaceStore, "ns4", "pod4", 4)

podUpdateChannel := channel.NewChannel("PodUpdate", 100)

egressController := &EgressController{
ofClient: mockOFClient,
routeClient: mockRouteClient,
Expand All @@ -133,6 +137,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
egressIPStates: map[string]*egressIPState{},
ipAssigner: mockIPAssigner,
}
podUpdateChannel.Subscribe(egressController.processPodUpdate)
return &fakeController{
EgressController: egressController,
mockController: controller,
Expand All @@ -141,6 +146,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
}
}

Expand Down Expand Up @@ -541,6 +547,51 @@ func TestSyncEgress(t *testing.T) {
}
}

func TestPodUpdateShouldSyncEgress(t *testing.T) {
egress := &crdv1a2.Egress{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Spec: crdv1a2.EgressSpec{EgressIP: fakeLocalEgressIP1},
}
egressGroup := &cpv1b2.EgressGroup{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
GroupMembers: []cpv1b2.GroupMember{
{Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}},
{Pod: &cpv1b2.PodReference{Name: "pendingPod", Namespace: "ns1"}},
},
}
c := newFakeController(t, []runtime.Object{egress})
defer c.mockController.Finish()
stopCh := make(chan struct{})
defer close(stopCh)
go c.podUpdateChannel.Run(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
c.addEgressGroup(egressGroup)
require.Equal(t, 1, c.queue.Len())
item, _ := c.queue.Get()
require.Equal(t, egress.Name, item)
require.NoError(t, c.syncEgress(item.(string)))
c.queue.Done(item)

c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(10), net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
// Mock CNIServer
addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10)
c.podUpdateChannel.Notify("ns1/pendingPod")
require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) {
return c.queue.Len() == 1, nil
}))
item, _ = c.queue.Get()
require.Equal(t, egress.Name, item)
require.NoError(t, c.syncEgress(item.(string)))
c.queue.Done(item)
}

func TestSyncOverlappingEgress(t *testing.T) {
egress1 := &crdv1a2.Egress{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Expand Down
Loading

0 comments on commit 8697a9d

Please sign in to comment.