Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #4419: Set NO_FLOOD to IPsec tunnel ports #4470: Fix that Service routes may get lost when starting on Windows #4654: Restore NO_FLOOD to OVS ports after reconnecting the OVS #4711: Fix route deletion for Service ClusterIP and LoadBalancerIP #4761

7 changes: 4 additions & 3 deletions ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ function deliver_antrea_windows {
sleep 5
# Some tests need us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13 image but it is not for windows/amd64 10.0.17763
# Use e2eteam/agnhost:2.13 instead
harbor_images=("sigwindowstools-kube-proxy:v1.18.0" "agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2")
antrea_images=("sigwindowstools/kube-proxy:v1.18.0" "e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "k8s.gcr.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "e2eteam/pause:3.2")
harbor_images=("sigwindowstools-kube-proxy:v1.18.0" "agnhost:2.13" "agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2" "e2eteam-pause:3.2" "e2eteam-busybox:1.29-windows-amd64-1809")
antrea_images=("sigwindowstools/kube-proxy:v1.18.0" "e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.13" "k8s.gcr.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "gcr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.0" "e2eteam/pause:3.2" "k8s.gcr.io/pause:3.2" "docker.io/library/busybox:1.29")
common_images=("mcr.microsoft.com/windows/servercore/iis:latest")
# Pull necessary images in advance to avoid transient error
for i in "${!harbor_images[@]}"; do
ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "docker pull -q ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} && docker tag ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} ${antrea_images[i]}" || true
Expand Down Expand Up @@ -410,7 +411,7 @@ function deliver_antrea_windows {
done
echo "=== Build Windows on Windows Node==="
ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "docker pull ${DOCKER_REGISTRY}/antrea/golang:${GO_VERSION}-nanoserver && docker tag ${DOCKER_REGISTRY}/antrea/golang:${GO_VERSION}-nanoserver golang:${GO_VERSION}-nanoserver"
ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "rm -rf antrea && mkdir antrea && cd antrea && tar -xzf ../antrea_repo.tar.gz > /dev/null && sed -i \"s|build/images/base-windows/Dockerfile|build/images/base-windows/Dockerfile --network host|g\" Makefile && sed -i \"s|build/images/Dockerfile.build.windows|build/images/Dockerfile.build.windows --network host|g\" Makefile && NO_PULL=${NO_PULL} make build-windows && docker save -o antrea-windows.tar ${DOCKER_REGISTRY}/antrea/antrea-windows:latest && gzip -f antrea-windows.tar" || true
ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "rm -rf antrea && mkdir antrea && cd antrea && tar -xzf ../antrea_repo.tar.gz > /dev/null && NO_PULL=${NO_PULL}; DOCKER_NETWORK=host make build-windows && docker save -o antrea-windows.tar antrea/antrea-windows:latest && gzip -f antrea-windows.tar" || true
for i in `seq 2`; do
timeout 2m scp -o StrictHostKeyChecking=no -T Administrator@${IP}:antrea/antrea-windows.tar.gz . && break
done
Expand Down
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func run(o *Options) error {
k8sClient,
crdClient,
ovsBridgeClient,
ovsctl.NewClient(o.config.OVSBridge),
ofClient,
routeClient,
ifaceStore,
Expand Down Expand Up @@ -268,6 +269,7 @@ func run(o *Options) error {
k8sClient,
informerFactory,
ofClient,
ovsctl.NewClient(o.config.OVSBridge),
ovsBridgeClient,
routeClient,
ifaceStore,
Expand Down
47 changes: 41 additions & 6 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Initializer struct {
client clientset.Interface
crdClient versioned.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ovsCtlClient ovsctl.OVSCtlClient
ofClient openflow.Client
routeClient route.Interface
wireGuardClient wireguard.Interface
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewInitializer(
k8sClient clientset.Interface,
crdClient versioned.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ovsCtlClient ovsctl.OVSCtlClient,
ofClient openflow.Client,
routeClient route.Interface,
ifaceStore interfacestore.InterfaceStore,
Expand All @@ -142,6 +144,7 @@ func NewInitializer(
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
Expand Down Expand Up @@ -279,7 +282,6 @@ func (i *Initializer) initInterfaceStore() error {
return intf
}
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
ovsCtlClient := ovsctl.NewClient(i.ovsBridge)
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
Expand All @@ -297,6 +299,8 @@ func (i *Initializer) initInterfaceStore() error {
case interfacestore.AntreaUplink:
intf = parseUplinkInterfaceFunc(port, ovsPort)
case interfacestore.AntreaTunnel:
fallthrough
case interfacestore.AntreaIPsecTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
case interfacestore.AntreaHost:
if port.Name == i.ovsBridge {
Expand All @@ -314,9 +318,6 @@ func (i *Initializer) initInterfaceStore() error {
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true)
case interfacestore.AntreaTrafficControl:
intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort)
if err := ovsCtlClient.SetPortNoFlood(int(ovsPort.OFPort)); err != nil {
klog.ErrorS(err, "Failed to set port with no-flood config", "PortName", port.Name)
}
default:
klog.InfoS("Unknown Antrea interface type", "type", interfaceType)
}
Expand All @@ -340,7 +341,11 @@ func (i *Initializer) initInterfaceStore() error {
fallthrough
case port.IFType == ovsconfig.STTTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
antreaIFType = interfacestore.AntreaTunnel
if intf.Type == interfacestore.IPSecTunnelInterface {
antreaIFType = interfacestore.AntreaIPsecTunnel
} else {
antreaIFType = interfacestore.AntreaTunnel
}
case port.Name == i.ovsBridge:
intf = nil
antreaIFType = interfacestore.AntreaHost
Expand Down Expand Up @@ -368,6 +373,26 @@ func (i *Initializer) initInterfaceStore() error {
return nil
}

func (i *Initializer) restorePortConfigs() error {
interfaces := i.ifaceStore.ListInterfaces()
for _, intf := range interfaces {
switch intf.Type {
case interfacestore.IPSecTunnelInterface:
fallthrough
case interfacestore.TrafficControlInterface:
if intf.OFPort < 0 {
klog.InfoS("Skipped setting no-flood for port due to invalid ofPort", "port", intf.InterfaceName, "ofport", intf.OFPort)
continue
}
if err := i.ovsCtlClient.SetPortNoFlood(int(intf.OFPort)); err != nil {
return fmt.Errorf("failed to set no-flood for port %s: %w", intf.InterfaceName, err)
}
klog.InfoS("Set no-flood for port", "port", intf.InterfaceName)
}
}
return nil
}

// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
Expand All @@ -386,6 +411,10 @@ func (i *Initializer) Initialize() error {
return err
}

if err := i.restorePortConfigs(); err != nil {
return err
}

// initializeWireGuard must be executed after setupOVSBridge as it requires gateway addresses on the OVS bridge.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeWireGuard {
if err := i.initializeWireGuard(); err != nil {
Expand Down Expand Up @@ -553,11 +582,17 @@ func (i *Initializer) initOpenFlowPipeline() error {
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")

klog.InfoS("Restoring OF port configs to OVS bridge")
if err := i.restorePortConfigs(); err != nil {
klog.ErrorS(err, "Failed to restore OF port configs")
} else {
klog.InfoS("Port configs restoration completed")
}
// ofClient and ovsBridgeClient have their own mechanisms to restore connections with OVS, and it could
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
err = wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
Expand Down
68 changes: 68 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/runtime"
Expand Down Expand Up @@ -647,3 +648,70 @@ func mockConfigureLinkAddress(returnedErr error) func() {
configureLinkAddresses = originalConfigureLinkAddresses
}
}

func TestRestorePortConfigs(t *testing.T) {
tests := []struct {
name string
existingInterfaces []*interfacestore.InterfaceConfig
expectedOVSCtlCalls func(client *ovsctltest.MockOVSCtlClientMockRecorder)
expectedErr string
}{
{
name: "success",
existingInterfaces: []*interfacestore.InterfaceConfig{
interfacestore.NewIPSecTunnelInterface("antrea-ipsec1",
ovsconfig.GeneveTunnel,
"node1",
net.ParseIP("1.1.1.1"),
"abcdefg",
"node1",
&interfacestore.OVSPortConfig{OFPort: 11, PortUUID: "uuid1"}),
interfacestore.NewTunnelInterface(defaultTunInterfaceName,
ovsconfig.GeneveTunnel,
0,
net.ParseIP("1.1.1.10"),
true,
&interfacestore.OVSPortConfig{OFPort: 12}),
interfacestore.NewTrafficControlInterface("antrea-tap1",
&interfacestore.OVSPortConfig{OFPort: 13, PortUUID: "uuid3"}),
interfacestore.NewTrafficControlInterface("antrea-tap2",
&interfacestore.OVSPortConfig{OFPort: -1, PortUUID: "uuid3"}),
},
expectedOVSCtlCalls: func(client *ovsctltest.MockOVSCtlClientMockRecorder) {
client.SetPortNoFlood(11).Return(nil)
client.SetPortNoFlood(13).Return(nil)
},
},
{
name: "fail",
existingInterfaces: []*interfacestore.InterfaceConfig{
interfacestore.NewTrafficControlInterface("antrea-tap1",
&interfacestore.OVSPortConfig{OFPort: 10, PortUUID: "uuid3"}),
},
expectedOVSCtlCalls: func(client *ovsctltest.MockOVSCtlClientMockRecorder) {
client.SetPortNoFlood(10).Return(fmt.Errorf("server unavailable"))
},
expectedErr: "failed to set no-flood for port antrea-tap1: server unavailable",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
controller := mock.NewController(t)
defer controller.Finish()
mockOVSCtlClient := ovsctltest.NewMockOVSCtlClient(controller)
ifaceStore := interfacestore.NewInterfaceStore()
initializer := &Initializer{
ifaceStore: ifaceStore,
ovsCtlClient: mockOVSCtlClient,
}
ifaceStore.Initialize(tt.existingInterfaces)
tt.expectedOVSCtlCalls(mockOVSCtlClient.EXPECT())
err := initializer.restorePortConfigs()
if tt.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.ErrorContains(t, err, tt.expectedErr)
}
})
}
}
46 changes: 17 additions & 29 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/wireguard"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/runtime"
)

const (
Expand All @@ -65,14 +65,14 @@ type Controller struct {
kubeClient clientset.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ofClient openflow.Client
ovsCtlClient ovsctl.OVSCtlClient
routeClient route.Interface
interfaceStore interfacestore.InterfaceStore
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
nodeInformer coreinformers.NodeInformer
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
svcLister corelisters.ServiceLister
queue workqueue.RateLimitingInterface
// installedNodes records routes and flows installation states of Nodes.
// The key is the host name of the Node, the value is the nodeRouteInfo of the Node.
Expand All @@ -92,6 +92,7 @@ func NewNodeRouteController(
kubeClient clientset.Interface,
informerFactory informers.SharedInformerFactory,
client openflow.Client,
ovsCtlClient ovsctl.OVSCtlClient,
ovsBridgeClient ovsconfig.OVSBridgeClient,
routeClient route.Interface,
interfaceStore interfacestore.InterfaceStore,
Expand All @@ -102,19 +103,18 @@ func NewNodeRouteController(
ipsecCertificateManager ipseccertificate.Manager,
) *Controller {
nodeInformer := informerFactory.Core().V1().Nodes()
svcLister := informerFactory.Core().V1().Services()
controller := &Controller{
kubeClient: kubeClient,
ovsBridgeClient: ovsBridgeClient,
ofClient: client,
ovsCtlClient: ovsCtlClient,
routeClient: routeClient,
interfaceStore: interfaceStore,
networkConfig: networkConfig,
nodeConfig: nodeConfig,
nodeInformer: nodeInformer,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
svcLister: svcLister.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
wireGuardClient: wireguardClient,
Expand Down Expand Up @@ -203,27 +203,10 @@ func (c *Controller) removeStaleGatewayRoutes() error {
desiredPodCIDRs = append(desiredPodCIDRs, podCIDRs...)
}

// TODO: This is not the best place to keep the ClusterIP Service routes.
desiredClusterIPSvcIPs := map[string]bool{}
if c.proxyAll && runtime.IsWindowsPlatform() {
// The route for virtual IP -> antrea-gw0 should be always kept.
desiredClusterIPSvcIPs[config.VirtualServiceIPv4.String()] = true

svcs, err := c.svcLister.List(labels.Everything())
for _, svc := range svcs {
for _, ip := range svc.Spec.ClusterIPs {
desiredClusterIPSvcIPs[ip] = true
}
}
if err != nil {
return fmt.Errorf("error when listing ClusterIP Service IPs: %v", err)
}
}

// routeClient will remove orphaned routes whose destinations are not in desiredPodCIDRs.
// If proxyAll enabled, it will also remove routes that are for Windows ClusterIP Services
// which no longer exist.
if err := c.routeClient.Reconcile(desiredPodCIDRs, desiredClusterIPSvcIPs); err != nil {
if err := c.routeClient.Reconcile(desiredPodCIDRs); err != nil {
return err
}
return nil
Expand All @@ -244,7 +227,7 @@ func (c *Controller) removeStaleTunnelPorts() error {
// will not include it in the set.
desiredInterfaces := make(map[string]bool)
// knownInterfaces is the list of interfaces currently in the local cache.
knownInterfaces := c.interfaceStore.GetInterfaceKeysByType(interfacestore.TunnelInterface)
knownInterfaces := c.interfaceStore.GetInterfaceKeysByType(interfacestore.IPSecTunnelInterface)

if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
for _, node := range nodes {
Expand Down Expand Up @@ -671,15 +654,14 @@ func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int3
}
c.interfaceStore.DeleteInterface(interfaceConfig)
exists = false
} else {
if interfaceConfig.OFPort != 0 {
klog.V(2).InfoS("Found cached IPsec tunnel interface", "node", nodeName, "interface", interfaceConfig.InterfaceName, "port", interfaceConfig.OFPort)
return interfaceConfig.OFPort, nil
}
}
}

if !exists {
ovsExternalIDs := map[string]interface{}{ovsExternalIDNodeName: nodeName}
ovsExternalIDs := map[string]interface{}{
ovsExternalIDNodeName: nodeName,
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaIPsecTunnel,
}
portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(
portName,
c.networkConfig.TunnelType,
Expand Down Expand Up @@ -715,6 +697,12 @@ func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int3
// Let NodeRouteController retry at errors.
return 0, fmt.Errorf("failed to get of_port of IPsec tunnel port for Node %s", nodeName)
}

// Set the port with no-flood to reject ARP flood packets.
if err := c.ovsCtlClient.SetPortNoFlood(int(ofPort)); err != nil {
return 0, fmt.Errorf("failed to set port %s with no-flood config: %w", portName, err)
}

interfaceConfig.OFPort = ofPort
return ofPort, nil
}
Expand Down
Loading