diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 5f4013292b9..98f96bd981e 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1170,6 +1170,9 @@ data: # Service traffic. # AntreaProxy: true + # Enable NodePort Service support in AntreaProxy in antrea-agent. + # AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1276,6 +1279,16 @@ data: # whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), # and all Node traffic directed to that port will be forwarded to the Pod. #nplPortRange: 40000-41000 + + # The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIP: 169.254.169.110 + + # The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIPv6: fec0::ffee:ddcc:bbaa + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1332,7 +1345,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-b8hh7hm486 + name: antrea-config-dccfh2f227 namespace: kube-system --- apiVersion: v1 @@ -1443,7 +1456,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-dccfh2f227 name: antrea-config - name: antrea-controller-tls secret: @@ -1708,7 +1721,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-dccfh2f227 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index c1c243bd170..ac5d8c2f614 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1170,6 +1170,9 @@ data: # Service traffic. # AntreaProxy: true + # Enable NodePort Service support in AntreaProxy in antrea-agent. + # AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1276,6 +1279,16 @@ data: # whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), # and all Node traffic directed to that port will be forwarded to the Pod. #nplPortRange: 40000-41000 + + # The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIP: 169.254.169.110 + + # The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIPv6: fec0::ffee:ddcc:bbaa + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1332,7 +1345,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-b8hh7hm486 + name: antrea-config-dccfh2f227 namespace: kube-system --- apiVersion: v1 @@ -1443,7 +1456,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-dccfh2f227 name: antrea-config - name: antrea-controller-tls secret: @@ -1710,7 +1723,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-dccfh2f227 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 6dbdee6102e..5d2be4a8051 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1170,6 +1170,9 @@ data: # Service traffic. # AntreaProxy: true + # Enable NodePort Service support in AntreaProxy in antrea-agent. + # AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1276,6 +1279,16 @@ data: # whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), # and all Node traffic directed to that port will be forwarded to the Pod. #nplPortRange: 40000-41000 + + # The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIP: 169.254.169.110 + + # The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIPv6: fec0::ffee:ddcc:bbaa + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1332,7 +1345,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-hhfkgg2fg5 + name: antrea-config-ft2hkgk4g5 namespace: kube-system --- apiVersion: v1 @@ -1443,7 +1456,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-hhfkgg2fg5 + name: antrea-config-ft2hkgk4g5 name: antrea-config - name: antrea-controller-tls secret: @@ -1711,7 +1724,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-hhfkgg2fg5 + name: antrea-config-ft2hkgk4g5 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index e752315b49e..203a3872278 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1170,6 +1170,9 @@ data: # Service traffic. # AntreaProxy: true + # Enable NodePort Service support in AntreaProxy in antrea-agent. + # AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1281,6 +1284,16 @@ data: # whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), # and all Node traffic directed to that port will be forwarded to the Pod. #nplPortRange: 40000-41000 + + # The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIP: 169.254.169.110 + + # The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIPv6: fec0::ffee:ddcc:bbaa + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1337,7 +1350,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-bdc66g4872 + name: antrea-config-h2d64449d2 namespace: kube-system --- apiVersion: v1 @@ -1457,7 +1470,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-bdc66g4872 + name: antrea-config-h2d64449d2 name: antrea-config - name: antrea-controller-tls secret: @@ -1757,7 +1770,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-bdc66g4872 + name: antrea-config-h2d64449d2 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index d0f9ac2615a..a20cfcd4e7b 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1170,6 +1170,9 @@ data: # Service traffic. # AntreaProxy: true + # Enable NodePort Service support in AntreaProxy in antrea-agent. + # AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1281,6 +1284,16 @@ data: # whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), # and all Node traffic directed to that port will be forwarded to the Pod. #nplPortRange: 40000-41000 + + # The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIP: 169.254.169.110 + + # The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. + #nodePortVirtualIPv6: fec0::ffee:ddcc:bbaa + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1337,7 +1350,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-9964gfgbb4 + name: antrea-config-td97b2gf56 namespace: kube-system --- apiVersion: v1 @@ -1448,7 +1461,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-9964gfgbb4 + name: antrea-config-td97b2gf56 name: antrea-config - name: antrea-controller-tls secret: @@ -1713,7 +1726,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-9964gfgbb4 + name: antrea-config-td97b2gf56 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 8316dfa6fc0..24596d27604 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -5,6 +5,9 @@ featureGates: # Service traffic. # AntreaProxy: true +# Enable NodePort Service support in AntreaProxy in antrea-agent. +# AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -116,3 +119,13 @@ featureGates: # whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), # and all Node traffic directed to that port will be forwarded to the Pod. #nplPortRange: 40000-41000 + +# The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. +#nodePortVirtualIP: 169.254.169.110 + +# The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. +#nodePortVirtualIPv6: fec0::ffee:ddcc:bbaa + +# A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks +# (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. +#nodePortAddresses: [] diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index c01ddad3a71..1a9e8d11d72 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -89,9 +89,13 @@ func run(o *Options) error { } defer ovsdbConnection.Close() + nodePortVirtualIP := net.ParseIP(o.config.NodePortVirtualIP) + nodePortVirtualIPv6 := net.ParseIP(o.config.NodePortVirtualIPv6) ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, o.config.OVSDatapathType, ovsdbConnection) ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge) ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr, + nodePortVirtualIP, + nodePortVirtualIPv6, features.DefaultFeatureGate.Enabled(features.AntreaProxy), features.DefaultFeatureGate.Enabled(features.AntreaPolicy)) @@ -108,7 +112,7 @@ func run(o *Options) error { TrafficEncapMode: encapMode, EnableIPSecTunnel: o.config.EnableIPSecTunnel} - routeClient, err := route.NewClient(serviceCIDRNet, networkConfig, o.config.NoSNAT) + routeClient, err := route.NewClient(nodePortVirtualIP, nodePortVirtualIPv6, serviceCIDRNet, networkConfig, o.config.NoSNAT, features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort)) if err != nil { return fmt.Errorf("error creating route client: %v", err) } @@ -180,18 +184,28 @@ func run(o *Options) error { var proxier k8sproxy.Provider if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + var nodePortAddresses []*net.IPNet + for _, nodePortAddress := range o.config.NodePortAddresses { + _, ipNet, _ := net.ParseCIDR(nodePortAddress) + nodePortAddresses = append(nodePortAddresses, ipNet) + } v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) + nodePortSupport := features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort) + var err error switch { case v4Enabled && v6Enabled: - proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient) + proxier, err = proxy.NewDualStackProxier(nodePortVirtualIP, nodePortVirtualIPv6, nodePortAddresses, nodeConfig.Name, nodeConfig.PodIPv4CIDR, nodeConfig.PodIPv6CIDR, informerFactory, ofClient, routeClient, nodePortSupport) case v4Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false) + proxier, err = proxy.NewProxier(nodePortVirtualIP, nodePortAddresses, nodeConfig.Name, nodeConfig.PodIPv4CIDR, informerFactory, ofClient, routeClient, false, nodePortSupport) case v6Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true) + proxier, err = proxy.NewProxier(nodePortVirtualIP, nodePortAddresses, nodeConfig.Name, nodeConfig.PodIPv6CIDR, informerFactory, ofClient, routeClient, true, nodePortSupport) default: return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") } + if err != nil { + return fmt.Errorf("error when creating Antrea Proxy: %w", err) + } } isChaining := false diff --git a/cmd/antrea-agent/config.go b/cmd/antrea-agent/config.go index a2a6cf01c8b..cc783ce0543 100644 --- a/cmd/antrea-agent/config.go +++ b/cmd/antrea-agent/config.go @@ -124,4 +124,11 @@ type AgentConfig struct { // whenever a Pod's container defines a specific port to be exposed (each container can define a list of ports as pod.spec.containers[].ports), // and all Node traffic directed to that port will be forwarded to the Pod. NPLPortRange string `yaml:"nplPortRange,omitempty"` + // The virtual IP for NodePort Service support. It must be a link-local IP otherwise the Agents will report error. + NodePortVirtualIP string `yaml:"nodePortVirtualIP,omitempty"` + // The virtual IPv6 for NodePort Service support. It must not be a link-local IP otherwise the Agents will report error. + NodePortVirtualIPv6 string `yaml:"nodePortVirtualIPv6,omitempty"` + // A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + // (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + NodePortAddresses []string `yaml:"nodePortAddresses,omitempty"` } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 2f70017676f..bc0ca462db2 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -43,6 +43,8 @@ const ( defaultFlowPollInterval = 5 * time.Second defaultFlowExportFrequency = 12 defaultNPLPortRange = "40000-41000" + defaultNodePortVirtualIP = "169.254.169.110" + defaultNodePortVirtualIPv6 = "fec0::ffee:ddcc:bbaa" ) type Options struct { @@ -140,6 +142,9 @@ func (o *Options) validate(args []string) error { // (but SNAT can be done by the primary CNI). o.config.NoSNAT = true } + if err := o.validateAntreaProxyConfig(); err != nil { + return fmt.Errorf("proxy config is invalid: %w", err) + } if err := o.validateFlowExporterConfig(); err != nil { return fmt.Errorf("failed to validate flow exporter config: %v", err) } @@ -205,6 +210,39 @@ func (o *Options) setDefaults() { o.config.NPLPortRange = defaultNPLPortRange } } + + if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + if len(o.config.NodePortVirtualIP) == 0 { + o.config.NodePortVirtualIP = defaultNodePortVirtualIP + } + if len(o.config.NodePortVirtualIPv6) == 0 { + o.config.NodePortVirtualIPv6 = defaultNodePortVirtualIPv6 + } + } + +} + +func (o *Options) validateAntreaProxyConfig() error { + if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { + if o.config.NodePortVirtualIP != "" { + _, linkLocalNet, _ := net.ParseCIDR("169.0.0.1/8") + if !linkLocalNet.Contains(net.ParseIP(o.config.NodePortVirtualIP)) { + return fmt.Errorf("NodePortVirtualIP %s is not an valid link-local IP address", o.config.NodePortVirtualIP) + } + } + if o.config.NodePortVirtualIPv6 != "" { + _, linkLocalNet, _ := net.ParseCIDR("fe80::/10") + if linkLocalNet.Contains(net.ParseIP(o.config.NodePortVirtualIPv6)) { + return fmt.Errorf("NodePortVirtualIPv6 %s must not be a link-local IP address", o.config.NodePortVirtualIPv6) + } + } + for _, nodePortAddress := range o.config.NodePortAddresses { + if _, _, err := net.ParseCIDR(nodePortAddress); err != nil { + return fmt.Errorf("NodePortAddress is not valid, can not parse `%s`: %w", nodePortAddress, err) + } + } + } + return nil } func (o *Options) validateFlowExporterConfig() error { diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index 2b6adb31b4d..64770a58a8c 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -228,6 +228,7 @@ fi if $ALLFEATURES; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaProxyNodePort[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaProxyNodePort: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*FlowExporter[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ FlowExporter: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*NetworkPolicyStats[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ NetworkPolicyStats: true/" antrea-agent.conf fi diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 8b168eb82fc..c5df231c882 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -303,7 +303,7 @@ func (i *Initializer) initOpenFlowPipeline() error { // Set up flow entries for gateway interface, including classifier, skip spoof guard check, // L3 forwarding and L2 forwarding - if err := i.ofClient.InstallGatewayFlows(); err != nil { + if err := i.ofClient.InstallGatewayFlows(i.nodeConfig.NodeIPAddr.IP); err != nil { klog.Errorf("Failed to setup openflow entries for gateway: %v", err) return err } diff --git a/pkg/agent/nodeportlocal/rules/iptable_rule.go b/pkg/agent/nodeportlocal/rules/iptable_rule.go index dc6773bfc68..ccca990a763 100644 --- a/pkg/agent/nodeportlocal/rules/iptable_rule.go +++ b/pkg/agent/nodeportlocal/rules/iptable_rule.go @@ -35,7 +35,7 @@ type IPTableRules struct { table *iptables.Client } -// NewIPTableRules retruns a new instance of IPTableRules +// NewIPTableRules returns a new instance of IPTableRules func NewIPTableRules() *IPTableRules { iptInstance, _ := iptables.New(true, false) iptRule := IPTableRules{ @@ -64,7 +64,7 @@ func (ipt *IPTableRules) CreateChains() error { ruleSpec := []string{ "-p", "tcp", "-j", NodePortLocalChain, } - err = ipt.table.EnsureRule(iptables.NATTable, iptables.PreRoutingChain, ruleSpec) + err = ipt.table.EnsureRule(iptables.NATTable, iptables.PreRoutingChain, ruleSpec, false) if err != nil { return fmt.Errorf("IPTABLES rule creation in NAT table failed for NPL with error: %v", err) } @@ -77,7 +77,7 @@ func (ipt *IPTableRules) AddRule(port int, podIP string) error { "-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint(port), "-j", "DNAT", "--to-destination", podIP, } - err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec) + err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec, false) if err != nil { return fmt.Errorf("IPTABLES rule creation failed for NPL with error: %v", err) } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 7ebcc6765ce..3ad6ccb481a 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -42,7 +42,7 @@ type Client interface { Initialize(roundInfo types.RoundInfo, config *config.NodeConfig, encapMode config.TrafficEncapModeType) (<-chan struct{}, error) // InstallGatewayFlows sets up flows related to an OVS gateway port, the gateway must exist. - InstallGatewayFlows() error + InstallGatewayFlows(nodeIP net.IP) error // InstallClusterServiceCIDRFlows sets up the appropriate flows so that traffic can reach // the different Services running in the Cluster. This method needs to be invoked once with @@ -508,7 +508,7 @@ func (c *client) InstallClusterServiceCIDRFlows(serviceNets []*net.IPNet) error return nil } -func (c *client) InstallGatewayFlows() error { +func (c *client) InstallGatewayFlows(nodeIP net.IP) error { gatewayConfig := c.nodeConfig.GatewayConfig gatewayIPs := []net.IP{} @@ -530,6 +530,13 @@ func (c *client) InstallGatewayFlows() error { // Add flow to ensure the liveness check packet could be forwarded correctly. flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...) flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...) + // TODO: check functionality + if c.enableProxy { + flows = append(flows, + c.serviceGatewayFlow(), + c.arpNodePortVirtualResponderFlow(), + ) + } // In NoEncap , no traffic from tunnel port if c.encapMode.SupportsEncap() { flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index e218ace653e..f500e1bbc7c 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -38,7 +38,10 @@ import ( const bridgeName = "dummy-br" -var bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName) +var ( + bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName) + nodePortVirtualIP = net.ParseIP("169.254.169.110") +) func installNodeFlows(ofClient Client, cacheKey string) (int, error) { hostName := cacheKey @@ -91,7 +94,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -122,7 +125,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -166,7 +169,7 @@ func TestFlowInstallationFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -203,7 +206,7 @@ func TestConcurrentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -290,7 +293,7 @@ func Test_client_InstallTraceflowFlows(t *testing.T) { } func prepareTraceflowFlow(ctrl *gomock.Controller) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodePortVirtualIP, nil, true, true) c := ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.nodeConfig = &config.NodeConfig{} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index e9550006c54..153ce09b50e 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -343,10 +343,12 @@ type client struct { // conjMatchFlowContext. globalConjMatchFlowCache map[string]*conjMatchFlowContext // replayMutex provides exclusive access to the OFSwitch to the ReplayFlows method. - replayMutex sync.RWMutex - nodeConfig *config.NodeConfig - encapMode config.TrafficEncapModeType - gatewayOFPort uint32 + replayMutex sync.RWMutex + nodeConfig *config.NodeConfig + encapMode config.TrafficEncapModeType + gatewayOFPort uint32 + nodePortVirtualIPv4 net.IP // The virtual IPv4 used for NodePort, it will be nil if Antrea Proxy is not enabled. + nodePortVirtualIPv6 net.IP // The virtual IPv6 used for NodePort, it will be nil if Antrea Proxy is not enabled. // packetInHandlers stores handler to process PacketIn event. Each packetin reason can have multiple handlers registered. // When a packetin arrives, openflow send packet to registered handlers in this map. packetInHandlers map[uint8]map[string]PacketInHandler @@ -933,6 +935,22 @@ func (c *client) arpResponderFlow(peerGatewayIP net.IP, category cookie.Category Done() } +func (c *client) arpNodePortVirtualResponderFlow() binding.Flow { + return c.pipeline[arpResponderTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolARP). + MatchARPOp(1). + MatchARPTpa(c.nodePortVirtualIPv4). + Action().Move(binding.NxmFieldSrcMAC, binding.NxmFieldDstMAC). + Action().SetSrcMAC(globalVirtualMAC). + Action().LoadARPOperation(2). + Action().Move(binding.NxmFieldARPSha, binding.NxmFieldARPTha). + Action().SetARPSha(globalVirtualMAC). + Action().Move(binding.NxmFieldARPSpa, binding.NxmFieldARPTpa). + Action().SetARPSpa(c.nodePortVirtualIPv4). + Action().OutputInPort(). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). + Done() +} + // arpResponderStaticFlow generates ARP reply for any ARP request with the same global virtual MAC. // This flow is used in policy-only mode, where traffic are routed via IP not MAC. func (c *client) arpResponderStaticFlow(category cookie.Category) binding.Flow { @@ -1783,6 +1801,19 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf return group } +// serviceGatewayFlow replies Service packets back to external addresses without entering Gateway CTZone. +func (c *client) serviceGatewayFlow() binding.Flow { + return c.pipeline[conntrackCommitTable].BuildFlow(priorityHigh). + MatchProtocol(binding.ProtocolIP). + MatchCTMark(ServiceCTMark, nil). + MatchCTStateNew(true). + MatchCTStateTrk(true). + MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). + Action().GotoTable(L2ForwardingOutTable). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). + Done() +} + // decTTLFlows decrements TTL by one for the packets forwarded across Nodes. // The TTL decrement should be skipped for the packets which enter OVS pipeline // from the gateway interface, as the host IP stack should have decremented the @@ -1878,7 +1909,7 @@ func (c *client) generatePipeline() { } // NewClient is the constructor of the Client interface. -func NewClient(bridgeName, mgmtAddr string, enableProxy, enableAntreaPolicy bool) Client { +func NewClient(bridgeName string, mgmtAddr string, nodePortVirtualIPv4 net.IP, nodePortVirtualIPv6 net.IP, enableProxy, enableAntreaPolicy bool) Client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) policyCache := cache.NewIndexer( policyConjKeyFunc, @@ -1886,6 +1917,8 @@ func NewClient(bridgeName, mgmtAddr string, enableProxy, enableAntreaPolicy bool ) c := &client{ bridge: bridge, + nodePortVirtualIPv4: nodePortVirtualIPv4, + nodePortVirtualIPv6: nodePortVirtualIPv6, enableProxy: enableProxy, enableAntreaPolicy: enableAntreaPolicy, nodeFlowCache: newFlowCategoryCache(), diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index a87a5004a81..71cdc188fa3 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -308,17 +308,17 @@ func (mr *MockClientMockRecorder) InstallExternalFlows() *gomock.Call { } // InstallGatewayFlows mocks base method -func (m *MockClient) InstallGatewayFlows() error { +func (m *MockClient) InstallGatewayFlows(arg0 net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallGatewayFlows") + ret := m.ctrl.Call(m, "InstallGatewayFlows", arg0) ret0, _ := ret[0].(error) return ret0 } // InstallGatewayFlows indicates an expected call of InstallGatewayFlows -func (mr *MockClientMockRecorder) InstallGatewayFlows() *gomock.Call { +func (mr *MockClientMockRecorder) InstallGatewayFlows(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallGatewayFlows", reflect.TypeOf((*MockClient)(nil).InstallGatewayFlows)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallGatewayFlows", reflect.TypeOf((*MockClient)(nil).InstallGatewayFlows), arg0) } // InstallLoadBalancerServiceFromOutsideFlows mocks base method diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 37c8c45f5e2..ec07b8b81cc 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -30,6 +30,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/querier" + "github.com/vmware-tanzu/antrea/pkg/agent/route" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" "github.com/vmware-tanzu/antrea/third_party/proxy/config" @@ -41,9 +42,10 @@ const ( ) type proxier struct { - once sync.Once - endpointsConfig *config.EndpointsConfig - serviceConfig *config.ServiceConfig + onceRun sync.Once + onceInitializedReconcile sync.Once + endpointsConfig *config.EndpointsConfig + serviceConfig *config.ServiceConfig // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since last syncProxyRules call. For a single object, // changes are accumulated. Once both endpointsChanges and serviceChanges @@ -64,11 +66,15 @@ type proxier struct { // serviceStringMapMutex protects serviceStringMap object. serviceStringMapMutex sync.Mutex - runner *k8sproxy.BoundedFrequencyRunner - stopChan <-chan struct{} - agentQuerier querier.AgentQuerier - ofClient openflow.Client - isIPv6 bool + runner *k8sproxy.BoundedFrequencyRunner + stopChan <-chan struct{} + agentQuerier querier.AgentQuerier + ofClient openflow.Client + routeClient route.Interface + nodeIPs []net.IP + virtualNodePortIP net.IP + isIPv6 bool + nodePortSupport bool } func (p *proxier) isInitialized() bool { @@ -151,7 +157,8 @@ func (p *proxier) removeStaleEndpoints(staleEndpoints map[k8sproxy.ServicePortNa func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { return svcInfo.ClusterIP().String() != pSvcInfo.ClusterIP().String() || svcInfo.Port() != pSvcInfo.Port() || - svcInfo.OFProtocol != pSvcInfo.OFProtocol + svcInfo.OFProtocol != pSvcInfo.OFProtocol || + svcInfo.NodePort() != pSvcInfo.NodePort() } // smallSliceDifference builds a slice which includes all the strings from s1 @@ -274,6 +281,28 @@ func (p *proxier) installServices() { } } } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + if needRemoval { + err := p.ofClient.UninstallServiceFlows(p.virtualNodePortIP, uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol) + if err != nil { + klog.Errorf("Error when removing NodePort Service flows: %v", err) + continue + } + err = p.routeClient.DeleteNodePort(p.nodeIPs, pSvcInfo, p.isIPv6) + if err != nil { + klog.Errorf("Error when removing NodePort Service entries in IPSet: %v", err) + continue + } + } + if err := p.ofClient.InstallServiceFlows(groupID, p.virtualNodePortIP, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds())); err != nil { + klog.Errorf("Error when installing Service NodePort flow: %v", err) + continue + } + if err := p.routeClient.AddNodePort(p.nodeIPs, svcInfo, p.isIPv6); err != nil { + klog.Errorf("Error when installing Service NodePort rules: %v", err) + continue + } + } p.serviceInstalledMap[svcPortName] = svcPort p.addServiceByIP(svcInfo.String(), svcPortName) @@ -410,7 +439,12 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { } func (p *proxier) Run(stopCh <-chan struct{}) { - p.once.Do(func() { + p.onceRun.Do(func() { + if p.nodePortSupport { + if err := p.routeClient.AddNodePortRoute(p.isIPv6); err != nil { + panic(err) + } + } go p.serviceConfig.Run(stopCh) go p.endpointsConfig.Run(stopCh) p.stopChan = stopCh @@ -418,11 +452,76 @@ func (p *proxier) Run(stopCh <-chan struct{}) { }) } +// getLocalAddrs returns a list of all network addresses on the local system. +func getLocalAddrs() ([]net.IP, error) { + var localAddrs []net.IP + + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, err + } + + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err != nil { + return nil, err + } + localAddrs = append(localAddrs, ip) + } + + return localAddrs, nil +} + +func filterIPFamily(isV6 bool, ips ...net.IP) []net.IP { + var result []net.IP + for _, ip := range ips { + if !isV6 && !utilnet.IsIPv6(ip) { + result = append(result, ip) + } else if isV6 && utilnet.IsIPv6(ip) { + result = append(result, ip) + } + } + return result +} + +func getAvailableAddresses(nodePortAddresses []*net.IPNet, podCIDR *net.IPNet, ipv6 bool) ([]net.IP, error) { + localAddresses, err := getLocalAddrs() + if err != nil { + return nil, err + } + var nodeIPs []net.IP + for _, nodeIP := range filterIPFamily(ipv6, localAddresses...) { + if podCIDR.Contains(nodeIP) { + continue + } + var contains bool + for _, nodePortAddress := range nodePortAddresses { + if nodePortAddress.Contains(nodeIP) { + contains = true + break + } + } + if len(nodePortAddresses) == 0 || contains { + nodeIPs = append(nodeIPs, nodeIP) + } + } + if len(nodeIPs) == 0 { + klog.Warningln("No qualified node IP found.") + } + return nodeIPs, nil +} + func NewProxier( + virtualNodePortIP net.IP, + nodePortAddresses []*net.IPNet, hostname string, + podCIDR *net.IPNet, informerFactory informers.SharedInformerFactory, ofClient openflow.Client, - isIPv6 bool) *proxier { + routeClient route.Interface, + isIPv6 bool, + nodePortSupport bool, +) (*proxier, error) { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, @@ -441,25 +540,51 @@ func NewProxier( serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + routeClient: routeClient, + virtualNodePortIP: virtualNodePortIP, isIPv6: isIPv6, + nodePortSupport: nodePortSupport, } + nodeIPs, err := getAvailableAddresses(nodePortAddresses, podCIDR, isIPv6) + if err != nil { + return nil, err + } + klog.Infof("Proxy NodePort Services on addresses: %v", nodeIPs) + p.nodeIPs = nodeIPs + p.serviceConfig.RegisterEventHandler(p) p.endpointsConfig.RegisterEventHandler(p) p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, 0, 30*time.Second, -1) - return p + return p, nil } func NewDualStackProxier( - hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) k8sproxy.Provider { - - // Create an ipv4 instance of the single-stack proxier - ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false) + virtualNodePortIP net.IP, + virtualNodePortIPv6 net.IP, + nodePortAddresses []*net.IPNet, + hostname string, + podIPv4CIDR *net.IPNet, + podIPv6CIDR *net.IPNet, + informerFactory informers.SharedInformerFactory, + ofClient openflow.Client, + routeClient route.Interface, + nodePortSupport bool, +) (k8sproxy.Provider, error) { + + // Create an ipv4 instance of the single-stack proxier. + ipv4Proxier, err := NewProxier(virtualNodePortIP, nodePortAddresses, hostname, podIPv4CIDR, informerFactory, ofClient, routeClient, false, nodePortSupport) + if err != nil { + return nil, err + } - // Create an ipv6 instance of the single-stack proxier - ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true) + // Create an ipv6 instance of the single-stack proxier. + ipv6Proxier, err := NewProxier(virtualNodePortIPv6, nodePortAddresses, hostname, podIPv6CIDR, informerFactory, ofClient, routeClient, true, nodePortSupport) + if err != nil { + return nil, err + } // Return a meta-proxier that dispatch calls between the two - // single-stack proxier instances + // single-stack proxier instances. metaProxier := k8sproxy.NewMetaProxier(ipv4Proxier, ipv6Proxier) - return metaProxier + return metaProxier, nil } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 3efe5349d49..2535ff2a7e0 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -33,10 +33,14 @@ import ( ofmock "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" + "github.com/vmware-tanzu/antrea/pkg/agent/route" + routetesting "github.com/vmware-tanzu/antrea/pkg/agent/route/testing" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) +var virtualNodePortIP = net.ParseIP("169.254.169.110") + func makeNamespaceName(namespace, name string) apimachinerytypes.NamespacedName { return apimachinerytypes.NamespacedName{Namespace: namespace, Name: name} } @@ -80,7 +84,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) return ept } -func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { +func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, isIPv6, nodeportSupport bool) *proxier { hostname := "localhost" eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder( @@ -96,8 +100,12 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { endpointsMap: types.EndpointsMap{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + routeClient: routeClient, serviceStringMap: map[string]k8sproxy.ServicePortName{}, isIPv6: isIPv6, + nodeIPs: []net.IP{net.ParseIP("192.168.0.1")}, + virtualNodePortIP: virtualNodePortIP, + nodePortSupport: nodeportSupport, } return p } @@ -106,7 +114,8 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -152,6 +161,73 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { fp.syncProxyRules() } +func TestExternalNameToNodePort(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient := ofmock.NewMockClient(ctrl) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, true) + + svcIPv4 := net.ParseIP("10.20.30.41") + svcPort := 80 + svcNodePort := 31000 + svcPortName := k8sproxy.ServicePortName{ + NamespacedName: makeNamespaceName("ns1", "svc1"), + Port: "80", + Protocol: corev1.ProtocolTCP, + } + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() // Should be ignored. + svc.Spec.Type = corev1.ServiceTypeExternalName + svc.Spec.ExternalName = "a.b.c.com" + svc.Spec.Ports = []corev1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + fp.syncProxyRules() + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() + svc.Spec.Type = corev1.ServiceTypeNodePort + svc.Spec.Ports = []corev1.ServicePort{{ + NodePort: int32(svcNodePort), + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + epIP := net.ParseIP("10.180.0.1") + epFunc := func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: epIP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + } + ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) + makeEndpointsMap(fp, ep) + groupID, _ := fp.groupCounter.Get(svcPortName) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + bindingProtocol := binding.ProtocolTCP + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, virtualNodePortIP, uint16(svcNodePort), bindingProtocol, uint16(0)).Times(1) + mockRouteClient.EXPECT().AddNodePort(gomock.Any(), gomock.Any(), false).Times(1) + + fp.syncProxyRules() +} + func TestClusterIPv4(t *testing.T) { testClusterIP(t, net.ParseIP("10.20.30.41"), net.ParseIP("10.180.0.1"), false) } @@ -164,7 +240,8 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, true) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -228,7 +305,8 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -266,7 +344,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -357,7 +436,8 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -416,7 +496,8 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -429,7 +510,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { - svc.Spec.Type = "NodePort" + svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP @@ -484,7 +565,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + fp := NewFakeProxier(nil, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -497,7 +578,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { - svc.Spec.Type = "NodePort" + svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP @@ -531,7 +612,8 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort1 := 80 svcPort2 := 8080 diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 25c5c6cbed6..e6612e9e36e 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -18,6 +18,7 @@ import ( "net" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" ) // Interface is the interface for routing container packets in host network. @@ -38,6 +39,20 @@ type Interface interface { // It should do nothing if the routes don't exist, without error. DeleteRoutes(podCIDR *net.IPNet) error + // AddRoutes should add the route to the NodePort virtual IP. + AddNodePortRoute(isIPv6 bool) error + + // ReconcileNodePort should remove orphaned NodePort ipset entries. + ReconcileNodePort(nodeIPs []net.IP, svcEntries []*types.ServiceInfo) error + + // AddNodePort should add entries of the NodePort Service to the ipset. + // It should have no effect if the entry already exist, without error. + AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error + + // DeleteNodePort should delete entries of the NodePort Service from ipset. + // It should do nothing if entries of the NodePort Service don't exist, without error. + DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error + // MigrateRoutesToGw should move routes from device linkname to local gateway. MigrateRoutesToGw(linkName string) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 148c4c2f70a..1277fd1ef9c 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -17,9 +17,11 @@ package route import ( "bytes" "fmt" + "io" "net" "reflect" "strconv" + "strings" "sync" "time" @@ -30,6 +32,7 @@ import ( "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/agent/util/ipset" "github.com/vmware-tanzu/antrea/pkg/agent/util/iptables" @@ -46,13 +49,24 @@ const ( antreaPodIPSet = "ANTREA-POD-IP" // antreaPodIP6Set contains all IPv6 Pod CIDRs of this cluster. antreaPodIP6Set = "ANTREA-POD-IP6" + // antreaNodePortClusterSet contains all Cluster type NodePort Services Addresses. + antreaNodePortClusterSet = "ANTREA-NODEPORT-CLUSTER" + antreaNodePortClusterSet6 = "ANTREA-NODEPORT-CLUSTER6" + // antreaNodePortLocalSet contains all Local type NodePort Services Addresses. + antreaNodePortLocalSet = "ANTREA-NODEPORT-LOCAL" + antreaNodePortLocalSet6 = "ANTREA-NODEPORT-LOCAL6" // Antrea managed iptables chains. - antreaForwardChain = "ANTREA-FORWARD" - antreaPreRoutingChain = "ANTREA-PREROUTING" - antreaPostRoutingChain = "ANTREA-POSTROUTING" - antreaOutputChain = "ANTREA-OUTPUT" - antreaMangleChain = "ANTREA-MANGLE" + antreaForwardChain = "ANTREA-FORWARD" + antreaPreRoutingChain = "ANTREA-PREROUTING" + antreaNodePortServicesChain = "ANTREA-NODEPORT" + antreaNodePortServicesMasqChain = "ANTREA-NODEPORT-MASQ" + antreaPostRoutingChain = "ANTREA-POSTROUTING" + antreaOutputChain = "ANTREA-OUTPUT" + antreaMangleChain = "ANTREA-MANGLE" + + localNodePortCtMark = "0xf0" + clusterNodePortCtMark = "0xf1" ) // Client implements Interface. @@ -74,17 +88,29 @@ type Client struct { // nodeRoutes caches ip routes to remote Pods. It's a map of podCIDR to routes. nodeRoutes sync.Map // nodeNeighbors caches IPv6 Neighbors to remote host gateway - nodeNeighbors sync.Map + nodeNeighbors sync.Map + nodePortVirtualIP, nodePortVirtualIPv6 net.IP + nodeportSupport bool } // NewClient returns a route client. // TODO: remove param serviceCIDR after kube-proxy is replaced by Antrea Proxy. This param is not used in this file; // leaving it here is to be compatible with the implementation on Windows. -func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSNAT bool) (*Client, error) { +func NewClient( + nodePortVirtualIP net.IP, + nodePortVirtualIPv6 net.IP, + serviceCIDR *net.IPNet, + networkConfig *config.NetworkConfig, + noSNAT bool, + nodeportSupport bool, +) (*Client, error) { return &Client{ - serviceCIDR: serviceCIDR, - networkConfig: networkConfig, - noSNAT: noSNAT, + nodePortVirtualIP: nodePortVirtualIP, + nodePortVirtualIPv6: nodePortVirtualIPv6, + serviceCIDR: serviceCIDR, + networkConfig: networkConfig, + noSNAT: noSNAT, + nodeportSupport: nodeportSupport, }, nil } @@ -128,6 +154,20 @@ func (c *Client) initIPTablesOnce(done func()) { // initIPSet ensures that the required ipset exists and it has the initial members. func (c *Client) initIPSet() error { + if c.nodeportSupport { + if err := ipset.CreateIPSet(antreaNodePortClusterSet, ipset.HashIPPort, false); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortLocalSet, ipset.HashIPPort, false); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortClusterSet6, ipset.HashIPPort, true); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortLocalSet6, ipset.HashIPPort, true); err != nil { + return err + } + } // In policy-only mode, Node Pod CIDR is undefined. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { return nil @@ -194,19 +234,34 @@ func (c *Client) initIPTables() error { // Create the antrea managed chains and link them to built-in chains. // We cannot use iptables-restore for these jump rules because there // are non antrea managed rules in built-in chains. - jumpRules := []struct{ table, srcChain, dstChain, comment string }{ - {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}, - {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, - {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules"}, - {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules"}, - {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules"}, + jumpRules := []struct { + need bool + table, srcChain, dstChain, comment string + prepend bool + }{ + {true, iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", false}, + {true, iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {true, iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules", false}, + {true, iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules", false}, + {true, iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules", false}, + {c.nodeportSupport, iptables.NATTable, iptables.PreRoutingChain, antreaNodePortServicesChain, "Antrea: jump to Antrea NodePort Service rules", true}, + {c.nodeportSupport, iptables.NATTable, iptables.OutputChain, antreaNodePortServicesChain, "Antrea: jump to Antrea NodePort Service rules", true}, + {c.nodeportSupport, iptables.NATTable, iptables.PostRoutingChain, antreaNodePortServicesMasqChain, "Antrea: jump to Antrea NodePort Service masquerade rules", true}, } for _, rule := range jumpRules { + ruleSpec := []string{ + "-j", rule.dstChain, + "-m", "comment", "--comment", rule.comment, + } + if !rule.need { + _ = c.ipt.DeleteChain(rule.table, rule.dstChain) + _ = c.ipt.DeleteRule(rule.table, rule.srcChain, ruleSpec) + continue + } if err := c.ipt.EnsureChain(rule.table, rule.dstChain); err != nil { return err } - ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.ipt.EnsureRule(rule.table, rule.srcChain, ruleSpec); err != nil { + if err := c.ipt.EnsureRule(rule.table, rule.srcChain, ruleSpec, rule.prepend); err != nil { return err } } @@ -231,11 +286,108 @@ func (c *Client) initIPTables() error { return nil } +func (c *Client) restoreNodePortV4IptablesDataV4(hostGateway string) *bytes.Buffer { + iptablesData := new(bytes.Buffer) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", antreaNodePortLocalSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", localNodePortCtMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", antreaNodePortClusterSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", clusterNodePortCtMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", localNodePortCtMark, + "-j", iptables.DNATTarget, "--to-destination", c.nodePortVirtualIP.String(), + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", clusterNodePortCtMark, + "-j", iptables.DNATTarget, "--to-destination", c.nodePortVirtualIP.String(), + }...) + writeLine(iptablesData, + "-A", antreaNodePortServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets with a loopback address"`, + "-s", "127.0.0.1", + "-d", c.nodePortVirtualIP.String(), + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + writeLine(iptablesData, + "-A", antreaNodePortServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets which target Service with Local externalTrafficPolicy"`, + "-m", "mark", "--mark", clusterNodePortCtMark, + "-d", c.nodePortVirtualIP.String(), + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + return iptablesData +} + +func (c *Client) restoreNodePortIptablesData(hostGateway string, isIPv6 bool) *bytes.Buffer { + localSet := antreaNodePortLocalSet + clusterSet := antreaNodePortClusterSet + loopbackAddr := "127.0.0.1" + nodePortVirtualIP := c.nodePortVirtualIP.String() + if isIPv6 { + localSet = antreaNodePortLocalSet6 + clusterSet = antreaNodePortClusterSet6 + loopbackAddr = net.IPv6loopback.String() + nodePortVirtualIP = c.nodePortVirtualIPv6.String() + } + iptablesData := new(bytes.Buffer) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", localSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", localNodePortCtMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", clusterSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", clusterNodePortCtMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", localNodePortCtMark, + "-j", iptables.DNATTarget, "--to-destination", nodePortVirtualIP, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", clusterNodePortCtMark, + "-j", iptables.DNATTarget, "--to-destination", nodePortVirtualIP, + }...) + writeLine(iptablesData, + "-A", antreaNodePortServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets with a loopback address"`, + "-s", loopbackAddr, + "-d", nodePortVirtualIP, + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + writeLine(iptablesData, + "-A", antreaNodePortServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets which target Service with Local externalTrafficPolicy"`, + "-m", "mark", "--mark", clusterNodePortCtMark, + "-d", nodePortVirtualIP, + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + return iptablesData +} + func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet string) *bytes.Buffer { // Create required rules in the antrea chains. // Use iptables-restore as it flushes the involved chains and creates the desired rules // with a single call, instead of string matching to clean up stale rules. iptablesData := bytes.NewBuffer(nil) + // Determined which version of IP is being processed by podCIDR. + isIPv6 := true + if podCIDR.IP.To4() != nil { + isIPv6 = false + } // Write head lines anyway so the undesired rules can be deleted when changing encap mode. writeLine(iptablesData, "*raw") writeLine(iptablesData, iptables.MakeChainLine(antreaPreRoutingChain)) @@ -307,6 +459,11 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet string) *bytes "-j", iptables.MasqueradeTarget, }...) } + if c.nodeportSupport { + writeLine(iptablesData, iptables.MakeChainLine(antreaNodePortServicesChain)) + writeLine(iptablesData, iptables.MakeChainLine(antreaNodePortServicesMasqChain)) + io.Copy(iptablesData, c.restoreNodePortIptablesData(hostGateway, isIPv6)) + } writeLine(iptablesData, "COMMIT") return iptablesData } @@ -322,6 +479,22 @@ func (c *Client) initIPRoutes() error { return nil } +func generateNodePortIPSETEntries(nodeIP net.IP, isIPv6 bool, svcInfos []*types.ServiceInfo) sets.String { + stringSet := sets.NewString() + for _, svcInfo := range svcInfos { + if svcInfo.NodePort() > 0 { + protocolPort := fmt.Sprintf("%s:%d", strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort()) + stringSet.Insert(fmt.Sprintf("%s,%s", nodeIP.String(), protocolPort)) + if isIPv6 { + stringSet.Insert(fmt.Sprintf("%s,%s", net.IPv6loopback.String(), protocolPort)) + } else { + stringSet.Insert(fmt.Sprintf("127.0.0.1,%s", protocolPort)) + } + } + } + return stringSet +} + // Reconcile removes orphaned podCIDRs from ipset and removes routes to orphaned podCIDRs // based on the desired podCIDRs. func (c *Client) Reconcile(podCIDRs []string) error { @@ -362,7 +535,10 @@ func (c *Client) Reconcile(podCIDRs []string) error { if reflect.DeepEqual(route.Dst, c.nodeConfig.PodIPv4CIDR) || reflect.DeepEqual(route.Dst, c.nodeConfig.PodIPv6CIDR) { continue } - if desiredPodCIDRs.Has(route.Dst.String()) { + if c.nodeportSupport && route.Dst != nil && (route.Dst.Contains(c.nodePortVirtualIP) || route.Dst.Contains(c.nodePortVirtualIPv6)) { + continue + } + if route.Dst != nil && desiredPodCIDRs.Has(route.Dst.String()) { continue } klog.Infof("Deleting unknown route %v", route) @@ -386,6 +562,9 @@ func (c *Client) Reconcile(podCIDRs []string) error { if desiredGWs.Has(neighIP) { continue } + if c.nodeportSupport && c.nodePortVirtualIPv6 != nil && c.nodePortVirtualIPv6.String() == neighIP { + continue + } klog.V(4).Infof("Deleting orphaned IPv6 neighbor %v", actualNeigh) if err := netlink.NeighDel(actualNeigh); err != nil { return err @@ -616,3 +795,146 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error } return nil } + +func (c *Client) ReconcileNodePort(nodeIPs []net.IP, svcEntries []*types.ServiceInfo) error { + var cluster, local []*types.ServiceInfo + for _, entry := range svcEntries { + if entry.OnlyNodeLocalEndpoints() { + local = append(local, entry) + } else { + cluster = append(cluster, entry) + } + } + reconcile := func(setName string, isIPv6 bool, desiredSvcEntries []*types.ServiceInfo) error { + existEntries, err := ipset.ListEntries(setName) + if err != nil { + return err + } + desiredEntries := sets.NewString() + for _, nodeIP := range nodeIPs { + desiredEntries.Insert(generateNodePortIPSETEntries(nodeIP, isIPv6, svcEntries).List()...) + } + for _, entry := range existEntries { + if desiredEntries.Has(entry) { + continue + } + klog.Infof("Deleting an orphaned NodePort Service entry %s from ipset", entry) + if err := ipset.DelEntry(setName, entry); err != nil { + return err + } + } + return nil + } + if err := reconcile(antreaNodePortLocalSet, false, local); err != nil { + return err + } + if err := reconcile(antreaNodePortClusterSet, false, cluster); err != nil { + return err + } + if err := reconcile(antreaNodePortLocalSet6, true, local); err != nil { + return err + } + if err := reconcile(antreaNodePortClusterSet6, true, cluster); err != nil { + return err + } + return nil +} + +func (c *Client) AddNodePortRoute(isIPv6 bool) error { + var route *netlink.Route + var nodeportIP *net.IP + if !isIPv6 { + nodeportIP = &c.nodePortVirtualIP + route = &netlink.Route{ + Dst: &net.IPNet{ + IP: *nodeportIP, + Mask: net.IPv4Mask(255, 255, 255, 255), + }, + Gw: *nodeportIP, + Flags: int(netlink.FLAG_ONLINK), + } + route.LinkIndex = c.nodeConfig.GatewayConfig.LinkIndex + } else { + nodeportIP = &c.nodePortVirtualIPv6 + route = &netlink.Route{ + Dst: &net.IPNet{ + IP: *nodeportIP, + Mask: net.IPMask{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + Gw: c.nodePortVirtualIPv6, + } + } + if err := netlink.RouteReplace(route); err != nil { + return fmt.Errorf("failed to install NodePort route: %w", err) + } + if isIPv6 { + neigh := &netlink.Neigh{ + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + Family: netlink.FAMILY_V6, + State: netlink.NUD_PERMANENT, + IP: *nodeportIP, + HardwareAddr: globalVMAC, + } + if err := netlink.NeighSet(neigh); err != nil { + return fmt.Errorf("failed to add nodeport neighbor %v to gw %s: %v", neigh, c.nodeConfig.GatewayConfig.Name, err) + } + c.nodeNeighbors.Store(nodeportIP.String(), neigh) + } + c.nodeRoutes.Store(nodeportIP.String(), route) + return nil +} + +func (c *Client) AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + setName := antreaNodePortClusterSet + if isIPv6 { + setName = antreaNodePortClusterSet6 + } + if svcInfo.OnlyNodeLocalEndpoints() { + if isIPv6 { + setName = antreaNodePortLocalSet6 + } else { + setName = antreaNodePortLocalSet + } + } + for _, nodeIP := range nodeIPs { + if err := ipset.AddEntry(setName, fmt.Sprintf("%s,%s:%d", nodeIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when adding NodePort to ipset %s: %v", setName, err) + } + } + loopbackIP := "127.0.0.1" + if isIPv6 { + loopbackIP = net.IPv6loopback.String() + } + if err := ipset.AddEntry(setName, fmt.Sprintf("%s,%s:%d", loopbackIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when adding NodePort to ipset %s: %v", setName, err) + } + return nil +} + +func (c *Client) DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + setName := antreaNodePortClusterSet + if isIPv6 { + setName = antreaNodePortClusterSet6 + } + if svcInfo.OnlyNodeLocalEndpoints() { + if isIPv6 { + setName = antreaNodePortLocalSet6 + } else { + setName = antreaNodePortLocalSet + } + } + for _, nodeIP := range nodeIPs { + if err := ipset.DelEntry(setName, fmt.Sprintf("%s,%s:%d", nodeIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when removing NodePort from ipset %s: %v", setName, err) + } + } + loopbackIP := "127.0.0.1" + if isIPv6 { + loopbackIP = net.IPv6loopback.String() + } + if err := ipset.DelEntry(setName, fmt.Sprintf("%s,%s:%d", loopbackIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when removing NodePort from ipset %s: %v", setName, err) + } + return nil +} diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 01cc4f4d640..325f21e864b 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/agent/util/winfirewall" ) @@ -35,6 +36,9 @@ const ( outboundFirewallRuleName = "Antrea: accept packets to local Pods" ) +// Client implements Interface. +var _ Interface = &Client{} + type Client struct { nr netroute.Interface nodeConfig *config.NodeConfig @@ -43,9 +47,32 @@ type Client struct { fwClient *winfirewall.Client } +func (c *Client) AddNodePortRoute(isIPv6 bool) error { + return nil +} + +func (c *Client) AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + return nil +} + +func (c *Client) DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + return nil +} + +func (c *Client) ReconcileNodePort(nodeIPs []net.IP, ports []*types.ServiceInfo) error { + return nil +} + // NewClient returns a route client. // Todo: remove param serviceCIDR after kube-proxy is replaced by Antrea Proxy completely. -func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSNAT bool) (*Client, error) { +func NewClient( + nodePortVirtualIP net.IP, + nodePortVirtualIPv6 net.IP, + serviceCIDR *net.IPNet, + networkConfig *config.NetworkConfig, + noSNAT bool, + nodeportSupport bool, +) (*Client, error) { nr := netroute.New() return &Client{ nr: nr, diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index f83840b75f6..ae5267aef93 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -53,7 +53,7 @@ func TestRouteOperation(t *testing.T) { nr := netroute.New() defer nr.Exit() - client, err := NewClient(serviceCIDR, &config.NetworkConfig{}, false) + client, err := NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{}, false, false) require.Nil(t, err) nodeConfig := &config.NodeConfig{ GatewayConfig: &config.GatewayConfig{ diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 6108c38b263..50785d7b5b9 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ package testing import ( gomock "github.com/golang/mock/gomock" config "github.com/vmware-tanzu/antrea/pkg/agent/config" + types "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" net "net" reflect "reflect" ) @@ -49,6 +50,34 @@ func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } +// AddNodePort mocks base method +func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 *types.ServiceInfo, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodePort indicates an expected call of AddNodePort +func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) +} + +// AddNodePortRoute mocks base method +func (m *MockInterface) AddNodePortRoute(arg0 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodePortRoute", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodePortRoute indicates an expected call of AddNodePortRoute +func (mr *MockInterfaceMockRecorder) AddNodePortRoute(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePortRoute", reflect.TypeOf((*MockInterface)(nil).AddNodePortRoute), arg0) +} + // AddRoutes mocks base method func (m *MockInterface) AddRoutes(arg0 *net.IPNet, arg1, arg2 net.IP) error { m.ctrl.T.Helper() @@ -63,6 +92,20 @@ func (mr *MockInterfaceMockRecorder) AddRoutes(arg0, arg1, arg2 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRoutes", reflect.TypeOf((*MockInterface)(nil).AddRoutes), arg0, arg1, arg2) } +// DeleteNodePort mocks base method +func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 *types.ServiceInfo, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteNodePort indicates an expected call of DeleteNodePort +func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) +} + // DeleteRoutes mocks base method func (m *MockInterface) DeleteRoutes(arg0 *net.IPNet) error { m.ctrl.T.Helper() @@ -119,6 +162,20 @@ func (mr *MockInterfaceMockRecorder) Reconcile(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockInterface)(nil).Reconcile), arg0) } +// ReconcileNodePort mocks base method +func (m *MockInterface) ReconcileNodePort(arg0 []net.IP, arg1 []*types.ServiceInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReconcileNodePort", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReconcileNodePort indicates an expected call of ReconcileNodePort +func (mr *MockInterfaceMockRecorder) ReconcileNodePort(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReconcileNodePort", reflect.TypeOf((*MockInterface)(nil).ReconcileNodePort), arg0, arg1) +} + // UnMigrateRoutesFromGw mocks base method func (m *MockInterface) UnMigrateRoutesFromGw(arg0 *net.IPNet, arg1 string) error { m.ctrl.T.Helper() diff --git a/pkg/agent/util/ipset/ipset.go b/pkg/agent/util/ipset/ipset.go index 4bfd933f43a..6e02a3214ec 100644 --- a/pkg/agent/util/ipset/ipset.go +++ b/pkg/agent/util/ipset/ipset.go @@ -28,6 +28,8 @@ const ( // The lookup time grows linearly with the number of the different prefix values added to the set. HashNet SetType = "hash:net" HashIP SetType = "hash:ip" + // The hash:ip,port set type uses a hash to store IP address and protocol-port pairs in it. + HashIPPort SetType = "hash:ip,port" ) // memberPattern is used to match the members part of ipset list result. diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 1b70345eb82..8bdf1b438b7 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -38,6 +38,8 @@ const ( MasqueradeTarget = "MASQUERADE" MarkTarget = "MARK" ConnTrackTarget = "CT" + DNATTarget = "DNAT" + SNATTarget = "SNAT" NoTrackTarget = "NOTRACK" PreRoutingChain = "PREROUTING" @@ -122,8 +124,9 @@ func (c *Client) ChainExists(table string, chain string) (bool, error) { return false, nil } -// EnsureRule checks if target rule already exists, appends it if not. -func (c *Client) EnsureRule(table string, chain string, ruleSpec []string) error { +// EnsureRule checks if target rule already exists, add it if not. If prepend is true, the rule will be added to the top +// of the chain. Otherwise, the rule will be appended to the chain. +func (c *Client) EnsureRule(table string, chain string, ruleSpec []string, prepend bool) error { for idx := range c.ipts { ipt := c.ipts[idx] exist, err := ipt.Exists(table, chain, ruleSpec...) @@ -133,7 +136,13 @@ func (c *Client) EnsureRule(table string, chain string, ruleSpec []string) error if exist { return nil } - if err := ipt.Append(table, chain, ruleSpec...); err != nil { + var f func() error + if !prepend { + f = func() error { return ipt.Append(table, chain, ruleSpec...) } + } else { + f = func() error { return ipt.Insert(table, chain, 1, ruleSpec...) } + } + if err := f(); err != nil { return fmt.Errorf("error appending rule %v to table %s chain %s: %v", ruleSpec, table, chain, err) } } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 1ae07e0d4b1..98d698f0210 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -40,6 +40,10 @@ const ( // Service traffic. AntreaProxy featuregate.Feature = "AntreaProxy" + // alpha: v0.13 + // Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort featuregate.Feature = "AntreaProxyNodePort" + // alpha: v0.8 // beta: v0.11 // Allows to trace path from a generated packet. @@ -70,12 +74,13 @@ var ( // To add a new feature, define a key for it above and add it here. The features will be // available throughout Antrea binaries. defaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - AntreaPolicy: {Default: false, PreRelease: featuregate.Alpha}, - AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, - Traceflow: {Default: true, PreRelease: featuregate.Beta}, - FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, - NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, - NodePortLocal: {Default: false, PreRelease: featuregate.Alpha}, + AntreaPolicy: {Default: false, PreRelease: featuregate.Alpha}, + AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, + AntreaProxyNodePort: {Default: false, PreRelease: featuregate.Alpha}, + Traceflow: {Default: true, PreRelease: featuregate.Beta}, + FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, + NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, + NodePortLocal: {Default: false, PreRelease: featuregate.Alpha}, } // UnsupportedFeaturesOnWindows records the features not supported on @@ -89,7 +94,8 @@ var ( // can have different FeatureSpecs between Linux and Windows, we should // still define a separate defaultAntreaFeatureGates map for Windows. unsupportedFeaturesOnWindows = map[featuregate.Feature]struct{}{ - NodePortLocal: {}, + AntreaProxyNodePort: {}, + NodePortLocal: {}, } ) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 5eeb0519071..d6fad585437 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1160,6 +1160,11 @@ func (data *TestData) createNginxClusterIPService(affinity bool, ipFamily *corev return data.createService("nginx", 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeClusterIP, ipFamily) } +// createNginxNodePortService create a NodePort nginx service with the given name. +func (data *TestData) createNginxNodePortService(affinity bool, ipFamily *corev1.IPFamily) (*corev1.Service, error) { + return data.createService("nginx", 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeNodePort, ipFamily) +} + func (data *TestData) createNginxLoadBalancerService(affinity bool, ingressIPs []string, ipFamily *corev1.IPFamily) (*corev1.Service, error) { svc, err := data.createService(nginxLBService, 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeLoadBalancer, ipFamily) if err != nil { diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 8613944c9aa..6edb3c49270 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -17,6 +17,7 @@ package e2e import ( "encoding/hex" "fmt" + "net" "strings" "testing" "time" @@ -41,6 +42,46 @@ func skipIfProxyDisabled(t *testing.T, data *TestData) { } } +func TestProxyNodePortService(t *testing.T) { + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + skipIfProxyDisabled(t, data) + skipIfNumNodesLessThan(t, 2) + + nodeName := nodeName(1) + require.NoError(t, data.createNginxPod("nginx", nodeName)) + _, err = data.podWaitForIPs(defaultTimeout, "nginx", testNamespace) + require.NoError(t, err) + require.NoError(t, data.podWaitForRunning(defaultTimeout, "nginx", testNamespace)) + ipProctol := corev1.IPv4Protocol + svc, err := data.createNginxNodePortService(true, &ipProctol) + require.NoError(t, err) + require.NoError(t, data.createBusyboxPodOnNode("busybox", nodeName)) + require.NoError(t, data.podWaitForRunning(defaultTimeout, "busybox", testNamespace)) + var nodePort string + for _, port := range svc.Spec.Ports { + if port.NodePort != 0 { + nodePort = fmt.Sprint(port.NodePort) + break + } + } + busyboxPod, err := data.podWaitFor(defaultTimeout, "busybox", testNamespace, func(pod *corev1.Pod) (bool, error) { + return pod.Status.Phase == corev1.PodRunning, nil + }) + require.NoError(t, err) + require.NotNil(t, busyboxPod.Status) + _, _, err = data.runCommandFromPod(testNamespace, "busybox", busyboxContainerName, []string{"wget", "-O", "-", net.JoinHostPort(busyboxPod.Status.HostIP, nodePort), "-T", "1"}) + require.NoError(t, err, "Service NodePort should be able to be connected from Pod") + _, _, _, err = RunCommandOnNode(controlPlaneNodeName(), strings.Join([]string{"wget", "-O", "-", net.JoinHostPort(busyboxPod.Status.HostIP, nodePort), "-T", "1"}, " ")) + require.NoError(t, err, "Service NodePort should be able to be connected from Node IP address on Node which does not have Endpoint") + _, _, _, err = RunCommandOnNode(controlPlaneNodeName(), strings.Join([]string{"wget", "-O", "-", net.JoinHostPort("127.0.0.1", nodePort), "-T", "1"}, " ")) + require.NoError(t, err, "Service NodePort should be able to be connected from loopback address on Node which does not have Endpoint") +} + func TestProxyServiceSessionAffinity(t *testing.T) { skipIfProviderIs(t, "kind", "#881 Does not work in Kind, needs to be investigated.") data, err := setupTest(t) diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index a53af507a8e..fb58e4c8949 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -81,14 +81,15 @@ type testPeerConfig struct { } type testConfig struct { - bridge string - nodeConfig *config1.NodeConfig - localPods []*testLocalPodConfig - peers []*testPeerConfig - serviceCIDR *net.IPNet - globalMAC net.HardwareAddr - enableIPv6 bool - enableIPv4 bool + bridge string + nodeConfig *config1.NodeConfig + localPods []*testLocalPodConfig + peers []*testPeerConfig + serviceCIDR *net.IPNet + globalMAC net.HardwareAddr + enableIPv6 bool + enableIPv4 bool + localPodCIDR net.IPNet } var ( @@ -100,7 +101,7 @@ func TestConnectivityFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -127,7 +128,7 @@ func TestConnectivityFlows(t *testing.T) { } func TestReplayFlowsConnectivityFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -154,7 +155,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { } func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -326,7 +327,7 @@ func TestNetworkPolicyFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -436,7 +437,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, nil, nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -467,7 +468,7 @@ type svcConfig struct { } func TestProxyServiceFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -805,7 +806,7 @@ func checkOVSFlowMetrics(t *testing.T, client ofClient.Client) { func testInstallGatewayFlows(t *testing.T, config *testConfig) { gatewayConfig := config.nodeConfig.GatewayConfig - err := c.InstallGatewayFlows() + err := c.InstallGatewayFlows(config.nodeConfig.NodeIPAddr.IP) if err != nil { t.Fatalf("Failed to install Openflow entries for gateway: %v", err) } @@ -824,6 +825,7 @@ func testInstallGatewayFlows(t *testing.T, config *testConfig) { func prepareConfiguration() *testConfig { podMAC, _ := net.ParseMAC("aa:aa:aa:aa:aa:13") gwMAC, _ := net.ParseMAC("aa:aa:aa:aa:aa:11") + _, podCIDR, _ := net.ParseCIDR("192.168.1.1/24") nodeIP, nodeSubnet, _ := net.ParseCIDR("10.10.10.1/24") nodeSubnet.IP = nodeIP @@ -855,14 +857,15 @@ func prepareConfiguration() *testConfig { } vMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") return &testConfig{ - bridge: br, - nodeConfig: nodeConfig, - localPods: []*testLocalPodConfig{podCfg}, - peers: []*testPeerConfig{peerNode}, - serviceCIDR: serviceCIDR, - globalMAC: vMAC, - enableIPv4: true, - enableIPv6: false, + bridge: br, + nodeConfig: nodeConfig, + localPods: []*testLocalPodConfig{podCfg}, + localPodCIDR: *podCIDR, + peers: []*testPeerConfig{peerNode}, + serviceCIDR: serviceCIDR, + globalMAC: vMAC, + enableIPv4: true, + enableIPv6: false, } } diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index ae8956032ba..3190f3b185a 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -137,7 +137,7 @@ func TestInitialize(t *testing.T) { for _, tc := range tcs { t.Logf("Running Initialize test with mode %s node config %s", tc.networkConfig.TrafficEncapMode, nodeConfig) - routeClient, err := route.NewClient(serviceCIDR, tc.networkConfig, tc.noSNAT) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, tc.networkConfig, tc.noSNAT, false) assert.NoError(t, err) var xtablesReleasedTime, initializedTime time.Time @@ -257,7 +257,7 @@ func TestAddAndDeleteRoutes(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false, false) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -353,7 +353,7 @@ func TestReconcile(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s added routes %v desired routes %v", tc.mode, tc.addedRoutes, tc.desiredPeerCIDRs) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false, false) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -391,9 +391,9 @@ func TestRouteTablePolicyOnly(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly}, false, false) assert.NoError(t, err) - err = routeClient.Initialize(nodeConfig, func() {}) + routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) // Verify gw IP gwName := nodeConfig.GatewayConfig.Name @@ -450,7 +450,7 @@ func TestIPv6RoutesAndNeighbors(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + routeClient, err := route.NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false) assert.Nil(t, err) _, ipv6Subnet, _ := net.ParseCIDR("fd74:ca9b:172:19::/64") gwIPv6 := net.ParseIP("fd74:ca9b:172:19::1")