From 9e79899eabdf03d2a93089eaa6eecb3fc1bcaedc Mon Sep 17 00:00:00 2001 From: Weiqiang Tang Date: Thu, 24 Sep 2020 15:16:29 +0800 Subject: [PATCH] NodePort support for Antrea Proxy on Linux Resolves #1463. --- build/yamls/antrea-aks.yml | 13 +- build/yamls/antrea-eks.yml | 13 +- build/yamls/antrea-gke.yml | 13 +- build/yamls/antrea-ipsec.yml | 13 +- build/yamls/antrea.yml | 13 +- build/yamls/base/conf/antrea-agent.conf | 7 + cmd/antrea-agent/agent.go | 49 ++- cmd/antrea-agent/config.go | 3 + cmd/antrea-agent/options.go | 21 ++ hack/generate-manifest.sh | 1 + pkg/agent/nodeportlocal/rules/iptable_rule.go | 28 +- pkg/agent/openflow/client.go | 9 + pkg/agent/openflow/client_test.go | 17 +- pkg/agent/openflow/pipeline.go | 47 ++- pkg/agent/proxy/proxier.go | 199 ++++++++++- pkg/agent/proxy/proxier_test.go | 130 +++++-- pkg/agent/proxy/types/groupcounter.go | 34 +- pkg/agent/route/interfaces.go | 15 + pkg/agent/route/route_linux.go | 319 ++++++++++++++++-- pkg/agent/route/route_windows.go | 29 +- pkg/agent/route/route_windows_test.go | 2 +- pkg/agent/route/testing/mock_route.go | 57 ++++ pkg/agent/util/ipset/ipset.go | 2 + pkg/agent/util/iptables/iptables.go | 14 +- pkg/features/antrea_features.go | 6 + test/e2e/basic_test.go | 14 +- test/e2e/framework.go | 5 + test/e2e/proxy_test.go | 62 ++++ test/integration/agent/openflow_test.go | 25 +- test/integration/agent/route_test.go | 14 +- 30 files changed, 1034 insertions(+), 140 deletions(-) diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index f48640932ab..16c63d7dce4 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2424,6 +2424,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -2558,6 +2561,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -2640,7 +2647,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gg4m728h98 + name: antrea-config-f74gh4mtc2 namespace: kube-system --- apiVersion: v1 @@ -2760,7 +2767,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-f74gh4mtc2 name: antrea-config - name: antrea-controller-tls secret: @@ -3071,7 +3078,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-f74gh4mtc2 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 0215f74f9f6..afbba06e816 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2424,6 +2424,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -2558,6 +2561,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -2640,7 +2647,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-gg4m728h98 + name: antrea-config-f74gh4mtc2 namespace: kube-system --- apiVersion: v1 @@ -2760,7 +2767,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-f74gh4mtc2 name: antrea-config - name: antrea-controller-tls secret: @@ -3073,7 +3080,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-gg4m728h98 + name: antrea-config-f74gh4mtc2 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 3af0cb8a773..8715c313509 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2424,6 +2424,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -2558,6 +2561,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -2640,7 +2647,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-6bb22hc7fg + name: antrea-config-gm7dh5f556 namespace: kube-system --- apiVersion: v1 @@ -2760,7 +2767,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-6bb22hc7fg + name: antrea-config-gm7dh5f556 name: antrea-config - name: antrea-controller-tls secret: @@ -3074,7 +3081,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-6bb22hc7fg + name: antrea-config-gm7dh5f556 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 396afa98122..bd1f6bd6747 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2424,6 +2424,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -2563,6 +2566,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -2645,7 +2652,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-f57t688chc + name: antrea-config-ckh7bt6f4t namespace: kube-system --- apiVersion: v1 @@ -2774,7 +2781,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-f57t688chc + name: antrea-config-ckh7bt6f4t name: antrea-config - name: antrea-controller-tls secret: @@ -3120,7 +3127,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-f57t688chc + name: antrea-config-ckh7bt6f4t name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index c7c316f5210..4785ea3407f 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2424,6 +2424,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -2563,6 +2566,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -2645,7 +2652,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-5ct9ktdt77 + name: antrea-config-8t8hbfmd84 namespace: kube-system --- apiVersion: v1 @@ -2765,7 +2772,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-5ct9ktdt77 + name: antrea-config-8t8hbfmd84 name: antrea-config - name: antrea-controller-tls secret: @@ -3076,7 +3083,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-5ct9ktdt77 + name: antrea-config-8t8hbfmd84 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 201a82e58d5..3ba56c30a0a 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -10,6 +10,9 @@ featureGates: # this flag will not take effect. # EndpointSlice: false +# Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -149,3 +152,7 @@ featureGates: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + +# A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks +# (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. +#nodePortAddresses: [] diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 6def7e95eb6..e8fc9777107 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -99,6 +99,8 @@ func run(o *Options) error { ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, ovsDatapathType, ovsdbConnection) ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge) ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr, ovsDatapathType, + o.nodePortVirtualIP, + o.nodePortVirtualIPv6, features.DefaultFeatureGate.Enabled(features.AntreaProxy), features.DefaultFeatureGate.Enabled(features.AntreaPolicy), features.DefaultFeatureGate.Enabled(features.Egress)) @@ -116,7 +118,7 @@ func run(o *Options) error { TrafficEncapMode: encapMode, EnableIPSecTunnel: o.config.EnableIPSecTunnel} - routeClient, err := route.NewClient(serviceCIDRNet, networkConfig, o.config.NoSNAT) + routeClient, err := route.NewClient(o.nodePortVirtualIP, o.nodePortVirtualIPv6, serviceCIDRNet, networkConfig, o.config.NoSNAT, features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort)) if err != nil { return fmt.Errorf("error creating route client: %v", err) } @@ -202,15 +204,52 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) + var nodePortAddresses []*net.IPNet + nodePortSupport := features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort) + if nodePortSupport { + for _, nodePortAddress := range o.config.NodePortAddresses { + _, ipNet, _ := net.ParseCIDR(nodePortAddress) + nodePortAddresses = append(nodePortAddresses, ipNet) + } + } + var err error switch { case v4Enabled && v6Enabled: - proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient) + proxier = proxy.NewDualStackProxier(o.nodePortVirtualIP, + o.nodePortVirtualIPv6, + nodePortAddresses, + nodeConfig.Name, + nodeConfig.PodIPv4CIDR, + nodeConfig.PodIPv6CIDR, + informerFactory, + ofClient, + routeClient, + nodePortSupport) case v4Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false) + proxier = proxy.NewProxier(o.nodePortVirtualIP, + nodePortAddresses, + nodeConfig.Name, + nodeConfig.PodIPv4CIDR, + informerFactory, + ofClient, + routeClient, + v6Enabled, + nodePortSupport) case v6Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true) + proxier = proxy.NewProxier(o.nodePortVirtualIPv6, + nodePortAddresses, + nodeConfig.Name, + nodeConfig.PodIPv4CIDR, + informerFactory, + ofClient, + routeClient, + v6Enabled, + nodePortSupport) default: - return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") + err = fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") + } + if err != nil { + return fmt.Errorf("error when creating Antrea Proxy: %w", err) } } diff --git a/cmd/antrea-agent/config.go b/cmd/antrea-agent/config.go index ec3a4195537..9ff93c6b90f 100644 --- a/cmd/antrea-agent/config.go +++ b/cmd/antrea-agent/config.go @@ -143,4 +143,7 @@ type AgentConfig struct { TLSCipherSuites string `yaml:"tlsCipherSuites,omitempty"` // TLS min version. TLSMinVersion string `yaml:"tlsMinVersion,omitempty"` + // A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + // (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + NodePortAddresses []string `yaml:"nodePortAddresses,omitempty"` } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 9830635aacb..9677c056288 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -42,9 +42,12 @@ const ( defaultFlowCollectorTransport = "tcp" defaultFlowCollectorPort = "4739" defaultFlowPollInterval = 5 * time.Second + defaultFlowExportFrequency = 12 defaultActiveFlowExportTimeout = 60 * time.Second defaultIdleFlowExportTimeout = 15 * time.Second defaultNPLPortRange = "40000-41000" + defaultNodePortVirtualIP = "169.254.169.110" + defaultNodePortVirtualIPv6 = "fec0::ffee:ddcc:bbaa" ) type Options struct { @@ -62,10 +65,14 @@ type Options struct { activeFlowTimeout time.Duration // Idle flow timeout to export records of inactive flows idleFlowTimeout time.Duration + // The virtual IP for NodePort Service support. + nodePortVirtualIP, nodePortVirtualIPv6 net.IP } func newOptions() *Options { return &Options{ + nodePortVirtualIP: net.ParseIP(defaultNodePortVirtualIP), + nodePortVirtualIPv6: net.ParseIP(defaultNodePortVirtualIPv6), config: &AgentConfig{ EnablePrometheusMetrics: true, EnableTLSToFlowAggregator: true, @@ -147,6 +154,9 @@ func (o *Options) validate(args []string) error { // (but SNAT can be done by the primary CNI). o.config.NoSNAT = true } + if err := o.validateAntreaProxyConfig(); err != nil { + return fmt.Errorf("proxy config is invalid: %w", err) + } if err := o.validateFlowExporterConfig(); err != nil { return fmt.Errorf("failed to validate flow exporter config: %v", err) } @@ -216,6 +226,17 @@ func (o *Options) setDefaults() { } } +func (o *Options) validateAntreaProxyConfig() error { + if features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort) { + for _, nodePortAddress := range o.config.NodePortAddresses { + if _, _, err := net.ParseCIDR(nodePortAddress); err != nil { + return fmt.Errorf("NodePortAddress is not valid, can not parse `%s`: %w", nodePortAddress, err) + } + } + } + return nil +} + func (o *Options) validateFlowExporterConfig() error { if features.DefaultFeatureGate.Enabled(features.FlowExporter) { host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport) diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index 1129ddd6ada..f25fb1f115c 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -261,6 +261,7 @@ fi if $ALLFEATURES; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaProxyNodePort[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaProxyNodePort: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*FlowExporter[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ FlowExporter: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*NetworkPolicyStats[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ NetworkPolicyStats: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*EndpointSlice[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ EndpointSlice: true/" antrea-agent.conf diff --git a/pkg/agent/nodeportlocal/rules/iptable_rule.go b/pkg/agent/nodeportlocal/rules/iptable_rule.go index cccb810952a..50b164cd741 100644 --- a/pkg/agent/nodeportlocal/rules/iptable_rule.go +++ b/pkg/agent/nodeportlocal/rules/iptable_rule.go @@ -29,15 +29,15 @@ import ( const NodePortLocalChain = "ANTREA-NODE-PORT-LOCAL" // IPTableRules provides a client to perform IPTABLES operations -type iptablesRules struct { +type IPTableRules struct { name string table *iptables.Client } -// NewIPTableRules retruns a new instance of IPTableRules -func NewIPTableRules() *iptablesRules { +// NewIPTableRules returns a new instance of IPTableRules +func NewIPTableRules() *IPTableRules { iptInstance, _ := iptables.New(true, false) - iptRule := iptablesRules{ + iptRule := IPTableRules{ name: "NPL", table: iptInstance, } @@ -45,13 +45,17 @@ func NewIPTableRules() *iptablesRules { } // Init initializes IPTABLES rules for NPL. Currently it deletes existing rules to ensure that no stale entries are present. -func (ipt *iptablesRules) Init() error { +func (ipt *IPTableRules) Init() error { + err := ipt.DeleteAllRules() + if err != nil { + return err + } return ipt.CreateChains() } // CreateChains creates the chain NodePortLocalChain in NAT table. // All DNAT rules for NPL would be added in this chain. -func (ipt *iptablesRules) CreateChains() error { +func (ipt *IPTableRules) CreateChains() error { err := ipt.table.EnsureChain(iptables.NATTable, NodePortLocalChain) if err != nil { return fmt.Errorf("IPTABLES chain creation in NAT table failed for NPL with error: %v", err) @@ -59,7 +63,7 @@ func (ipt *iptablesRules) CreateChains() error { ruleSpec := []string{ "-p", "tcp", "-j", NodePortLocalChain, } - err = ipt.table.EnsureRule(iptables.NATTable, iptables.PreRoutingChain, ruleSpec) + err = ipt.table.EnsureRule(iptables.NATTable, iptables.PreRoutingChain, ruleSpec, false) if err != nil { return fmt.Errorf("IPTABLES rule creation in NAT table failed for NPL with error: %v", err) } @@ -67,12 +71,12 @@ func (ipt *iptablesRules) CreateChains() error { } // AddRule appends a DNAT rule in NodePortLocalChain chain of NAT table -func (ipt *iptablesRules) AddRule(port int, podIP string) error { +func (ipt *IPTableRules) AddRule(port int, podIP string) error { ruleSpec := []string{ "-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint(port), "-j", "DNAT", "--to-destination", podIP, } - err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec) + err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec, false) if err != nil { return fmt.Errorf("IPTABLES rule creation failed for NPL with error: %v", err) } @@ -82,7 +86,7 @@ func (ipt *iptablesRules) AddRule(port int, podIP string) error { // AddAllRules constructs a list of iptables rules for the NPL chain and performs a // iptables-restore on this chain. It uses --no-flush to keep the previous rules intact. -func (ipt *iptablesRules) AddAllRules(nplList []PodNodePort) error { +func (ipt *IPTableRules) AddAllRules(nplList []PodNodePort) error { iptablesData := bytes.NewBuffer(nil) writeLine(iptablesData, "*nat") writeLine(iptablesData, iptables.MakeChainLine(NodePortLocalChain)) @@ -105,7 +109,7 @@ func (ipt *iptablesRules) AddAllRules(nplList []PodNodePort) error { } // DeleteRule deletes a specific NPL rule from NodePortLocalChain chain -func (ipt *iptablesRules) DeleteRule(port int, podip string) error { +func (ipt *IPTableRules) DeleteRule(port int, podip string) error { klog.Infof("Deleting rule with port %v and podip %v", port, podip) ruleSpec := []string{ "-p", "tcp", "-m", "tcp", "--dport", @@ -119,7 +123,7 @@ func (ipt *iptablesRules) DeleteRule(port int, podip string) error { } // DeleteAllRules deletes all NPL rules programmed in the node -func (ipt *iptablesRules) DeleteAllRules() error { +func (ipt *IPTableRules) DeleteAllRules() error { exists, err := ipt.table.ChainExists(iptables.NATTable, NodePortLocalChain) if err != nil { return fmt.Errorf("failed to check if NodePortLocal chain exists in NAT table: %v", err) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 9e6232c41bc..a8597da374c 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -575,6 +575,15 @@ func (c *client) InstallGatewayFlows() error { // Add flow to ensure the liveness check packet could be forwarded correctly. flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...) flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...) + if c.enableProxy { + flows = append(flows, c.arpNodePortVirtualResponderFlow()) + if gatewayConfig.IPv6 != nil { + flows = append(flows, c.serviceGatewayFlow(true)) + } + if gatewayConfig.IPv4 != nil { + flows = append(flows, c.serviceGatewayFlow(false)) + } + } // In NoEncap , no traffic from tunnel port if c.encapMode.SupportsEncap() { flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 8528086037b..f5114b8fa5a 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -48,7 +48,8 @@ var ( IPv6: gwIPv6, MAC: gwMAC, } - nodeConfig = &config.NodeConfig{GatewayConfig: gatewayConfig} + nodeConfig = &config.NodeConfig{GatewayConfig: gatewayConfig} + nodePortVirtualIP = net.ParseIP("169.254.169.110") ) func installNodeFlows(ofClient Client, cacheKey string) (int, error) { @@ -101,7 +102,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -129,7 +130,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -170,7 +171,7 @@ func TestFlowInstallationFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -204,7 +205,7 @@ func TestConcurrentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -394,7 +395,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) { } func prepareTraceflowFlow(ctrl *gomock.Controller) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, true, false) c := ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.nodeConfig = nodeConfig @@ -412,7 +413,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { } func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, true, false) c := ofClient.(*client) c.nodeConfig = nodeConfig m := ovsoftest.NewMockBridge(ctrl) @@ -500,7 +501,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, true, false) c := ofClient.(*client) m := ovsoftest.NewMockBridge(ctrl) c.bridge = m diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 2ab968c46a4..9bc52585b6f 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -383,7 +383,9 @@ type client struct { encapMode config.TrafficEncapModeType gatewayOFPort uint32 // ovsDatapathType is the type of the datapath used by the bridge. - ovsDatapathType ovsconfig.OVSDatapathType + ovsDatapathType ovsconfig.OVSDatapathType + nodePortVirtualIPv4 net.IP // The virtual IPv4 used for host forwarding, it can be nil if NodePort support is not enabled. + nodePortVirtualIPv6 net.IP // The virtual IPv6 used for host forwarding, it can be nil if NodePort support is not enabled. // packetInHandlers stores handler to process PacketIn event. Each packetin reason can have multiple handlers registered. // When a packetin arrives, openflow send packet to registered handlers in this map. packetInHandlers map[uint8]map[string]PacketInHandler @@ -1211,6 +1213,22 @@ func (c *client) arpResponderFlow(peerGatewayIP net.IP, category cookie.Category Done() } +func (c *client) arpNodePortVirtualResponderFlow() binding.Flow { + return c.pipeline[arpResponderTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolARP). + MatchARPOp(1). + MatchARPTpa(c.nodePortVirtualIPv4). + Action().Move(binding.NxmFieldSrcMAC, binding.NxmFieldDstMAC). + Action().SetSrcMAC(globalVirtualMAC). + Action().LoadARPOperation(2). + Action().Move(binding.NxmFieldARPSha, binding.NxmFieldARPTha). + Action().SetARPSha(globalVirtualMAC). + Action().Move(binding.NxmFieldARPSpa, binding.NxmFieldARPTpa). + Action().SetARPSpa(c.nodePortVirtualIPv4). + Action().OutputInPort(). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). + Done() +} + // arpResponderStaticFlow generates ARP reply for any ARP request with the same global virtual MAC. // This flow is used in policy-only mode, where traffic are routed via IP not MAC. func (c *client) arpResponderStaticFlow(category cookie.Category) binding.Flow { @@ -2030,6 +2048,22 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf return group } +// serviceGatewayFlow replies Service packets back to external addresses without entering Gateway CTZone. +func (c *client) serviceGatewayFlow(isIPv6 bool) binding.Flow { + builder := c.pipeline[conntrackCommitTable].BuildFlow(priorityHigh). + MatchCTMark(ServiceCTMark, nil). + MatchCTStateNew(true). + MatchCTStateTrk(true). + MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). + Action().GotoTable(L2ForwardingOutTable). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()) + + if isIPv6 { + return builder.MatchProtocol(binding.ProtocolIPv6).Done() + } + return builder.MatchProtocol(binding.ProtocolIP).Done() +} + // decTTLFlows decrements TTL by one for the packets forwarded across Nodes. // The TTL decrement should be skipped for the packets which enter OVS pipeline // from the gateway interface, as the host IP stack should have decremented the @@ -2119,7 +2153,14 @@ func (c *client) generatePipeline() { } // NewClient is the constructor of the Client interface. -func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapathType, enableProxy, enableAntreaPolicy, enableEgress bool) Client { +func NewClient(bridgeName string, + mgmtAddr string, + ovsDatapathType ovsconfig.OVSDatapathType, + nodePortVirtualIPv4 net.IP, + nodePortVirtualIPv6 net.IP, + enableProxy bool, + enableAntreaPolicy bool, + enableEgress bool) Client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) policyCache := cache.NewIndexer( policyConjKeyFunc, @@ -2127,6 +2168,8 @@ func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapat ) c := &client{ bridge: bridge, + nodePortVirtualIPv4: nodePortVirtualIPv4, + nodePortVirtualIPv6: nodePortVirtualIPv6, enableProxy: enableProxy, enableAntreaPolicy: enableAntreaPolicy, enableEgress: enableEgress, diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 6f653e6ae28..07b5146b35a 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -33,6 +33,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/openflow" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" + "github.com/vmware-tanzu/antrea/pkg/agent/route" "github.com/vmware-tanzu/antrea/pkg/features" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" @@ -40,8 +41,9 @@ import ( ) const ( - resyncPeriod = time.Minute - componentName = "antrea-agent-proxy" + resyncPeriod = time.Minute + componentName = "antrea-agent-proxy" + nodePortLocalLabel = "NodePortLocal" ) // Proxier wraps proxy.Provider and adds extra methods. It is introduced for @@ -60,10 +62,12 @@ type Proxier interface { } type proxier struct { - once sync.Once - endpointSliceConfig *config.EndpointSliceConfig - endpointsConfig *config.EndpointsConfig - serviceConfig *config.ServiceConfig + onceRun sync.Once + onceInitializedReconcile sync.Once + once sync.Once + endpointSliceConfig *config.EndpointSliceConfig + endpointsConfig *config.EndpointsConfig + serviceConfig *config.ServiceConfig // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since last syncProxyRules call. For a single object, // changes are accumulated. Once both endpointsChanges and serviceChanges @@ -94,7 +98,11 @@ type proxier struct { runner *k8sproxy.BoundedFrequencyRunner stopChan <-chan struct{} ofClient openflow.Client + routeClient route.Interface + nodeIPs []net.IP + virtualNodePortIP net.IP isIPv6 bool + nodePortSupport bool enableEndpointSlice bool } @@ -129,14 +137,32 @@ func (p *proxier) removeStaleServices() { } } } - groupID, _ := p.groupCounter.Get(svcPortName) + groupID, _ := p.groupCounter.Get(svcPortName, "") if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err) continue } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + if svcInfo.OnlyNodeLocalEndpoints() { + nGroupID, _ := p.groupCounter.Get(svcPortName, nodePortLocalLabel) + if err := p.ofClient.UninstallServiceGroup(nGroupID); err != nil { + klog.Errorf("Failed to remove flows of Service NodePort %v: %v", svcPortName, err) + continue + } + p.groupCounter.Recycle(svcPortName, nodePortLocalLabel) + } + if err := p.ofClient.UninstallServiceFlows(p.virtualNodePortIP, uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + klog.Errorf("Failed to remove Service NodePort flows: %v", err) + continue + } + if err := p.routeClient.DeleteNodePort(p.nodeIPs, svcInfo, p.isIPv6); err != nil { + klog.Errorf("Failed to remove Service NodePort rules: %v", err) + continue + } + } delete(p.serviceInstalledMap, svcPortName) p.deleteServiceByIP(svcInfo.String()) - p.groupCounter.Recycle(svcPortName) + p.groupCounter.Recycle(svcPortName, "") } } @@ -206,7 +232,8 @@ func (p *proxier) removeStaleEndpoints() { func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { return svcInfo.ClusterIP().String() != pSvcInfo.ClusterIP().String() || svcInfo.Port() != pSvcInfo.Port() || - svcInfo.OFProtocol != pSvcInfo.OFProtocol + svcInfo.OFProtocol != pSvcInfo.OFProtocol || + svcInfo.NodePort() != pSvcInfo.NodePort() } // smallSliceDifference builds a slice which includes all the strings from s1 @@ -233,7 +260,7 @@ func smallSliceDifference(s1, s2 []string) []string { func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) - groupID, _ := p.groupCounter.Get(svcPortName) + groupID, _ := p.groupCounter.Get(svcPortName, "") endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { endpointsInstalled = map[string]k8sproxy.Endpoint{} @@ -303,6 +330,21 @@ func (p *proxier) installServices() { klog.Errorf("Error when installing Endpoints groups: %v", err) continue } + // Install another group for local type NodePort service. + if p.nodePortSupport && svcInfo.NodePort() > 0 && svcInfo.OnlyNodeLocalEndpoints() { + nGroupID, _ := p.groupCounter.Get(svcPortName, nodePortLocalLabel) + var localEndpointList []k8sproxy.Endpoint + for _, ed := range endpointUpdateList { + if !ed.GetIsLocal() { + continue + } + localEndpointList = append(localEndpointList, ed) + } + if err := p.ofClient.InstallServiceGroup(nGroupID, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil { + klog.Errorf("Error when installing Group for Service NodePort local: %v", err) + continue + } + } for _, e := range endpointUpdateList { // If the Endpoint is newly installed, add a reference. if _, ok := endpointsInstalled[e.String()]; !ok { @@ -320,6 +362,18 @@ func (p *proxier) installServices() { klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err) continue } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + err := p.ofClient.UninstallServiceFlows(p.virtualNodePortIP, uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol) + if err != nil { + klog.Errorf("Error when removing NodePort Service flows: %v", err) + continue + } + err = p.routeClient.DeleteNodePort(p.nodeIPs, pSvcInfo, p.isIPv6) + if err != nil { + klog.Errorf("Error when removing NodePort Service entries in IPSet: %v", err) + continue + } + } } if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds())); err != nil { klog.Errorf("Error when installing Service flows: %v", err) @@ -354,6 +408,20 @@ func (p *proxier) installServices() { } } } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + nGroupID := groupID + if svcInfo.OnlyNodeLocalEndpoints() { + nGroupID, _ = p.groupCounter.Get(svcPortName, nodePortLocalLabel) + } + if err := p.ofClient.InstallServiceFlows(nGroupID, p.virtualNodePortIP, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds())); err != nil { + klog.Errorf("Error when installing Service NodePort flow: %v", err) + continue + } + if err := p.routeClient.AddNodePort(p.nodeIPs, svcInfo, p.isIPv6); err != nil { + klog.Errorf("Error when installing Service NodePort rules: %v", err) + continue + } + } } p.serviceInstalledMap[svcPortName] = svcPort @@ -522,7 +590,12 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { } func (p *proxier) Run(stopCh <-chan struct{}) { - p.once.Do(func() { + p.onceRun.Do(func() { + if p.nodePortSupport { + if err := p.routeClient.AddNodePortRoute(p.isIPv6); err != nil { + panic(err) + } + } go p.serviceConfig.Run(stopCh) if p.enableEndpointSlice { go p.endpointSliceConfig.Run(stopCh) @@ -534,6 +607,65 @@ func (p *proxier) Run(stopCh <-chan struct{}) { }) } +// getLocalAddrs returns a list of all network addresses on the local system. +func getLocalAddrs() ([]net.IP, error) { + var localAddrs []net.IP + + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, err + } + + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err != nil { + return nil, err + } + localAddrs = append(localAddrs, ip) + } + + return localAddrs, nil +} + +func filterIPFamily(isV6 bool, ips ...net.IP) []net.IP { + var result []net.IP + for _, ip := range ips { + if !isV6 && !utilnet.IsIPv6(ip) { + result = append(result, ip) + } else if isV6 && utilnet.IsIPv6(ip) { + result = append(result, ip) + } + } + return result +} + +func getAvailableAddresses(nodePortAddresses []*net.IPNet, podCIDR *net.IPNet, ipv6 bool) ([]net.IP, error) { + localAddresses, err := getLocalAddrs() + if err != nil { + return nil, err + } + var nodeIPs []net.IP + for _, nodeIP := range filterIPFamily(ipv6, localAddresses...) { + if podCIDR.Contains(nodeIP) { + continue + } + var contains bool + for _, nodePortAddress := range nodePortAddresses { + if nodePortAddress.Contains(nodeIP) { + contains = true + break + } + } + if len(nodePortAddresses) == 0 || contains { + nodeIPs = append(nodeIPs, nodeIP) + } + } + if len(nodeIPs) == 0 { + klog.Warningln("No qualified node IP found.") + } + return nodeIPs, nil +} + func (p *proxier) GetProxyProvider() k8sproxy.Provider { // Return myself. return p @@ -572,7 +704,7 @@ func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, [ svcFlows := p.ofClient.GetServiceFlowKeys(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, epList) flows = append(flows, svcFlows...) - groupID, _ := p.groupCounter.Get(svcPortName) + groupID, _ := p.groupCounter.Get(svcPortName, "") groups = append(groups, groupID) } @@ -580,10 +712,16 @@ func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, [ } func NewProxier( + virtualNodePortIP net.IP, + nodePortAddresses []*net.IPNet, hostname string, + podCIDR *net.IPNet, informerFactory informers.SharedInformerFactory, ofClient openflow.Client, - isIPv6 bool) *proxier { + routeClient route.Interface, + isIPv6 bool, + nodePortSupport bool, +) *proxier { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, @@ -607,7 +745,20 @@ func NewProxier( serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + routeClient: routeClient, + virtualNodePortIP: virtualNodePortIP, isIPv6: isIPv6, + nodePortSupport: nodePortSupport, + } + + if nodePortSupport { + nodeIPs, err := getAvailableAddresses(nodePortAddresses, podCIDR, isIPv6) + if err != nil { + klog.Errorf("%v", err) + return nil + } + klog.Infof("Proxy NodePort Services on addresses: %v", nodeIPs) + p.nodeIPs = nodeIPs } p.serviceConfig.RegisterEventHandler(p) p.endpointsConfig.RegisterEventHandler(p) @@ -654,16 +805,28 @@ func (p *metaProxierWrapper) GetServiceByIP(serviceStr string) (k8sproxy.Service } func NewDualStackProxier( - hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) *metaProxierWrapper { + virtualNodePortIP net.IP, + virtualNodePortIPv6 net.IP, + nodePortAddresses []*net.IPNet, + hostname string, + podIPv4CIDR *net.IPNet, + podIPv6CIDR *net.IPNet, + informerFactory informers.SharedInformerFactory, + ofClient openflow.Client, + routeClient route.Interface, + nodePortSupport bool, +) *metaProxierWrapper { - // Create an ipv4 instance of the single-stack proxier - ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false) + // Create an ipv4 instance of the single-stack proxier. + ipv4Proxier := NewProxier(virtualNodePortIP, nodePortAddresses, hostname, podIPv4CIDR, informerFactory, ofClient, routeClient, false, nodePortSupport) - // Create an ipv6 instance of the single-stack proxier - ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true) + // Create an ipv6 instance of the single-stack proxier. + ipv6Proxier := NewProxier(virtualNodePortIPv6, nodePortAddresses, hostname, podIPv6CIDR, informerFactory, ofClient, routeClient, true, nodePortSupport) // Create a meta-proxier that dispatch calls between the two // single-stack proxier instances. + // Return a meta-proxier that dispatch calls between the two + // single-stack proxier instances. metaProxier := k8sproxy.NewMetaProxier(ipv4Proxier, ipv6Proxier) return &metaProxierWrapper{ipv4Proxier, ipv6Proxier, metaProxier} diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index e9c0113a6c7..ba11ab40760 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -33,10 +33,14 @@ import ( ofmock "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" + "github.com/vmware-tanzu/antrea/pkg/agent/route" + routetesting "github.com/vmware-tanzu/antrea/pkg/agent/route/testing" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) +var virtualNodePortIP = net.ParseIP("169.254.169.110") + func makeNamespaceName(namespace, name string) apimachinerytypes.NamespacedName { return apimachinerytypes.NamespacedName{Namespace: namespace, Name: name} } @@ -80,7 +84,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) return ept } -func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { +func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, isIPv6, nodeportSupport bool) *proxier { hostname := "localhost" eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder( @@ -97,8 +101,12 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { endpointsMap: types.EndpointsMap{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + routeClient: routeClient, serviceStringMap: map[string]k8sproxy.ServicePortName{}, isIPv6: isIPv6, + nodeIPs: []net.IP{net.ParseIP("192.168.0.1")}, + virtualNodePortIP: virtualNodePortIP, + nodePortSupport: nodeportSupport, } return p } @@ -107,7 +115,8 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -141,7 +150,7 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { }), ) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP if isIPv6 { @@ -157,7 +166,8 @@ func TestLoadbalancer(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, false) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, true) svcIPv4 := net.ParseIP("10.20.30.41") svcPort := 80 @@ -198,7 +208,7 @@ func TestLoadbalancer(t *testing.T) { }), ) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) @@ -208,6 +218,73 @@ func TestLoadbalancer(t *testing.T) { fp.syncProxyRules() } +func TestExternalNameToNodePort(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient := ofmock.NewMockClient(ctrl) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, true) + + svcIPv4 := net.ParseIP("10.20.30.41") + svcPort := 80 + svcNodePort := 31000 + svcPortName := k8sproxy.ServicePortName{ + NamespacedName: makeNamespaceName("ns1", "svc1"), + Port: "80", + Protocol: corev1.ProtocolTCP, + } + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() // Should be ignored. + svc.Spec.Type = corev1.ServiceTypeExternalName + svc.Spec.ExternalName = "a.b.c.com" + svc.Spec.Ports = []corev1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + fp.syncProxyRules() + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() + svc.Spec.Type = corev1.ServiceTypeNodePort + svc.Spec.Ports = []corev1.ServicePort{{ + NodePort: int32(svcNodePort), + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + epIP := net.ParseIP("10.180.0.1") + epFunc := func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: epIP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + } + ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) + makeEndpointsMap(fp, ep) + groupID, _ := fp.groupCounter.Get(svcPortName, "") + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + bindingProtocol := binding.ProtocolTCP + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, virtualNodePortIP, uint16(svcNodePort), bindingProtocol, uint16(0)).Times(1) + mockRouteClient.EXPECT().AddNodePort(gomock.Any(), gomock.Any(), false).Times(1) + + fp.syncProxyRules() +} + func TestClusterIPv4(t *testing.T) { testClusterIP(t, net.ParseIP("10.20.30.41"), net.ParseIP("10.180.0.1"), false) } @@ -220,7 +297,8 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, true) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -257,7 +335,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) } ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) makeEndpointsMap(fp, ep) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) @@ -284,7 +362,8 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -321,7 +400,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -385,8 +465,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP } makeEndpointsMap(fp, ep, epUDP) - groupID, _ := fp.groupCounter.Get(svcPortName) - groupIDUDP, _ := fp.groupCounter.Get(svcPortNameUDP) + groupID, _ := fp.groupCounter.Get(svcPortName, "") + groupIDUDP, _ := fp.groupCounter.Get(svcPortNameUDP, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) @@ -412,7 +492,8 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -448,7 +529,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv bindingProtocol = binding.ProtocolTCPv6 } makeEndpointsMap(fp, ep) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) @@ -471,7 +552,8 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -484,7 +566,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { - svc.Spec.Type = "NodePort" + svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP @@ -519,7 +601,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 } - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, true, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(corev1.DefaultClientIPServiceAffinitySeconds)).Times(1) @@ -539,7 +621,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + fp := NewFakeProxier(nil, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -552,7 +634,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { - svc.Spec.Type = "NodePort" + svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP @@ -586,7 +668,8 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort1 := 80 svcPort2 := 8080 @@ -625,7 +708,7 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { } ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) makeEndpointsMap(fp, ep) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort1), bindingProtocol, uint16(0)) @@ -659,7 +742,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, false) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, false) epIP := net.ParseIP("10.50.60.71") svcIP1 := net.ParseIP("10.180.30.41") svcIP2 := net.ParseIP("10.180.30.42") @@ -710,8 +794,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { ep1 := epMapFactory(svcPortName1, epIP.String()) ep2 := epMapFactory(svcPortName2, epIP.String()) - groupID1, _ := fp.groupCounter.Get(svcPortName1) - groupID2, _ := fp.groupCounter.Get(svcPortName2) + groupID1, _ := fp.groupCounter.Get(svcPortName1, "") + groupID2, _ := fp.groupCounter.Get(svcPortName2, "") mockOFClient.EXPECT().InstallServiceGroup(groupID1, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID2, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP diff --git a/pkg/agent/proxy/types/groupcounter.go b/pkg/agent/proxy/types/groupcounter.go index ef41a539f42..cd9083d800b 100644 --- a/pkg/agent/proxy/types/groupcounter.go +++ b/pkg/agent/proxy/types/groupcounter.go @@ -15,6 +15,7 @@ package types import ( + "fmt" "sync" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" @@ -27,10 +28,10 @@ type GroupCounter interface { // If the group ID of the service has been generated, then return the // prior one. The bool return value indicates whether the groupID is newly // generated. - Get(svcPortName k8sproxy.ServicePortName) (binding.GroupIDType, bool) + Get(svcPortName k8sproxy.ServicePortName, label string) (binding.GroupIDType, bool) // Recycle removes a Service Group ID mapping. The recycled groupID can be // reused. - Recycle(svcPortName k8sproxy.ServicePortName) bool + Recycle(svcPortName k8sproxy.ServicePortName, label string) bool } type groupCounter struct { @@ -38,37 +39,46 @@ type groupCounter struct { groupIDCounter binding.GroupIDType recycled []binding.GroupIDType - groupMap map[k8sproxy.ServicePortName]binding.GroupIDType + groupMap map[string]binding.GroupIDType } func NewGroupCounter() *groupCounter { - return &groupCounter{groupMap: map[k8sproxy.ServicePortName]binding.GroupIDType{}} + return &groupCounter{groupMap: map[string]binding.GroupIDType{}} } -func (c *groupCounter) Get(svcPortName k8sproxy.ServicePortName) (binding.GroupIDType, bool) { +func keyString(svcPortName k8sproxy.ServicePortName, label string) string { + key := svcPortName.String() + if len(key) > 0 { + key = fmt.Sprintf("%s/%s", key, label) + } + return key +} + +func (c *groupCounter) Get(svcPortName k8sproxy.ServicePortName, label string) (binding.GroupIDType, bool) { c.mu.Lock() defer c.mu.Unlock() - - if id, ok := c.groupMap[svcPortName]; ok { + key := keyString(svcPortName, label) + if id, ok := c.groupMap[key]; ok { return id, false } else if len(c.recycled) != 0 { id = c.recycled[len(c.recycled)-1] c.recycled = c.recycled[:len(c.recycled)-1] - c.groupMap[svcPortName] = id + c.groupMap[key] = id return id, true } else { c.groupIDCounter += 1 - c.groupMap[svcPortName] = c.groupIDCounter + c.groupMap[key] = c.groupIDCounter return c.groupIDCounter, true } } -func (c *groupCounter) Recycle(svcPortName k8sproxy.ServicePortName) bool { +func (c *groupCounter) Recycle(svcPortName k8sproxy.ServicePortName, label string) bool { c.mu.Lock() defer c.mu.Unlock() - if id, ok := c.groupMap[svcPortName]; ok { - delete(c.groupMap, svcPortName) + key := keyString(svcPortName, label) + if id, ok := c.groupMap[key]; ok { + delete(c.groupMap, key) c.recycled = append(c.recycled, id) return true } diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 0d565ac69ed..d400223fd8b 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -18,6 +18,7 @@ import ( "net" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" ) // Interface is the interface for routing container packets in host network. @@ -38,6 +39,20 @@ type Interface interface { // It should do nothing if the routes don't exist, without error. DeleteRoutes(podCIDR *net.IPNet) error + // AddRoutes should add the route to the NodePort virtual IP. + AddNodePortRoute(isIPv6 bool) error + + // ReconcileNodePort should remove orphaned NodePort ipset entries. + ReconcileNodePort(nodeIPs []net.IP, svcEntries []*types.ServiceInfo) error + + // AddNodePort should add entries of the NodePort Service to the ipset. + // It should have no effect if the entry already exist, without error. + AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error + + // DeleteNodePort should delete entries of the NodePort Service from ipset. + // It should do nothing if entries of the NodePort Service don't exist, without error. + DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error + // MigrateRoutesToGw should move routes from device linkname to local gateway. MigrateRoutesToGw(linkName string) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 67983dd5932..ac3e7cac0fa 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -17,9 +17,11 @@ package route import ( "bytes" "fmt" + "io" "net" "reflect" "strconv" + "strings" "sync" "time" @@ -31,6 +33,7 @@ import ( "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" + proxytypes "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/types" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/agent/util/ipset" @@ -48,13 +51,23 @@ const ( antreaPodIPSet = "ANTREA-POD-IP" // antreaPodIP6Set contains all IPv6 Pod CIDRs of this cluster. antreaPodIP6Set = "ANTREA-POD-IP6" + // antreaNodePortClusterSet contains all Cluster type NodePort Services Addresses. + antreaNodePortClusterSet = "ANTREA-SVC-NP-CLUSTER" + antreaNodePortClusterSet6 = "ANTREA-SVC-NP-CLUSTER6" + antreaNodePortLocalSet = "ANTREA-SVC-NP-LOCAL" + antreaNodePortLocalSet6 = "ANTREA-SVC-NP-LOCAL6" // Antrea managed iptables chains. - antreaForwardChain = "ANTREA-FORWARD" - antreaPreRoutingChain = "ANTREA-PREROUTING" - antreaPostRoutingChain = "ANTREA-POSTROUTING" - antreaOutputChain = "ANTREA-OUTPUT" - antreaMangleChain = "ANTREA-MANGLE" + antreaForwardChain = "ANTREA-FORWARD" + antreaPreRoutingChain = "ANTREA-PREROUTING" + antreaNodePortServicesChain = "ANTREA-SVC-NP" + antreaServicesMasqChain = "ANTREA-SVC-MASQ" + antreaPostRoutingChain = "ANTREA-POSTROUTING" + antreaOutputChain = "ANTREA-OUTPUT" + antreaMangleChain = "ANTREA-MANGLE" + + localNodePortMark = "0xf0" + clusterNodePortMark = "0xf1" ) // Client implements Interface. @@ -79,7 +92,9 @@ type Client struct { // nodeRoutes caches ip routes to remote Pods. It's a map of podCIDR to routes. nodeRoutes sync.Map // nodeNeighbors caches IPv6 Neighbors to remote host gateway - nodeNeighbors sync.Map + nodeNeighbors sync.Map + nodePortVirtualIP, nodePortVirtualIPv6 net.IP + nodeportSupport bool // markToSNATIP caches marks to SNAT IPs. It's used in Egress feature. markToSNATIP sync.Map // iptablesInitialized is used to notify when iptables initialization is done. @@ -89,11 +104,21 @@ type Client struct { // NewClient returns a route client. // TODO: remove param serviceCIDR after kube-proxy is replaced by Antrea Proxy. This param is not used in this file; // leaving it here is to be compatible with the implementation on Windows. -func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSNAT bool) (*Client, error) { +func NewClient( + nodePortVirtualIP net.IP, + nodePortVirtualIPv6 net.IP, + serviceCIDR *net.IPNet, + networkConfig *config.NetworkConfig, + noSNAT bool, + nodeportSupport bool, +) (*Client, error) { return &Client{ - serviceCIDR: serviceCIDR, - networkConfig: networkConfig, - noSNAT: noSNAT, + nodePortVirtualIP: nodePortVirtualIP, + nodePortVirtualIPv6: nodePortVirtualIPv6, + serviceCIDR: serviceCIDR, + networkConfig: networkConfig, + noSNAT: noSNAT, + nodeportSupport: nodeportSupport, }, nil } @@ -157,6 +182,20 @@ func (c *Client) syncIPInfra() { // syncIPSet ensures that the required ipset exists and it has the initial members. func (c *Client) syncIPSet() error { + if c.nodeportSupport { + if err := ipset.CreateIPSet(antreaNodePortClusterSet, ipset.HashIPPort, false); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortLocalSet, ipset.HashIPPort, false); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortClusterSet6, ipset.HashIPPort, true); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortLocalSet6, ipset.HashIPPort, true); err != nil { + return err + } + } // In policy-only mode, Node Pod CIDR is undefined. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { return nil @@ -223,20 +262,35 @@ func (c *Client) syncIPTables() error { // Create the antrea managed chains and link them to built-in chains. // We cannot use iptables-restore for these jump rules because there // are non antrea managed rules in built-in chains. - jumpRules := []struct{ table, srcChain, dstChain, comment string }{ - {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}, - {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, - {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules"}, - {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules"}, - {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules"}, // TODO: unify the chain naming style - {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, + jumpRules := []struct { + need bool + table, srcChain, dstChain, comment string + prepend bool + }{ + {true, iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", false}, + {true, iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {true, iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules", false}, + {true, iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules", false}, + {true, iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules", false}, + {true, iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {c.nodeportSupport, iptables.NATTable, iptables.PreRoutingChain, antreaNodePortServicesChain, "Antrea: jump to Antrea NodePort Service rules", true}, + {c.nodeportSupport, iptables.NATTable, iptables.OutputChain, antreaNodePortServicesChain, "Antrea: jump to Antrea NodePort Service rules", true}, + {c.nodeportSupport, iptables.NATTable, iptables.PostRoutingChain, antreaServicesMasqChain, "Antrea: jump to Antrea NodePort Service masquerade rules", true}, } for _, rule := range jumpRules { + ruleSpec := []string{ + "-j", rule.dstChain, + "-m", "comment", "--comment", rule.comment, + } + if !rule.need { + _ = c.ipt.DeleteChain(rule.table, rule.dstChain) + _ = c.ipt.DeleteRule(rule.table, rule.srcChain, ruleSpec) + continue + } if err := c.ipt.EnsureChain(rule.table, rule.dstChain); err != nil { return err } - ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.ipt.EnsureRule(rule.table, rule.srcChain, ruleSpec); err != nil { + if err := c.ipt.EnsureRule(rule.table, rule.srcChain, ruleSpec, rule.prepend); err != nil { return err } } @@ -273,11 +327,67 @@ func (c *Client) syncIPTables() error { return nil } +func (c *Client) restoreNodePortIptablesData(hostGateway string, isIPv6 bool) *bytes.Buffer { + localSet := antreaNodePortLocalSet + clusterSet := antreaNodePortClusterSet + loopbackAddr := "127.0.0.1" + nodePortVirtualIP := c.nodePortVirtualIP.String() + if isIPv6 { + localSet = antreaNodePortLocalSet6 + clusterSet = antreaNodePortClusterSet6 + loopbackAddr = net.IPv6loopback.String() + nodePortVirtualIP = c.nodePortVirtualIPv6.String() + } + iptablesData := new(bytes.Buffer) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", localSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", localNodePortMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", clusterSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", clusterNodePortMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", localNodePortMark, + "-j", iptables.DNATTarget, "--to-destination", nodePortVirtualIP, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", clusterNodePortMark, + "-j", iptables.DNATTarget, "--to-destination", nodePortVirtualIP, + }...) + writeLine(iptablesData, + "-A", antreaServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets with a loopback address"`, + "-s", loopbackAddr, + "-d", nodePortVirtualIP, + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + writeLine(iptablesData, + "-A", antreaServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets which target Service with Local externalTrafficPolicy"`, + "-m", "mark", "--mark", clusterNodePortMark, + "-d", nodePortVirtualIP, + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + return iptablesData +} + func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet string, snatMarkToIP map[uint32]net.IP) *bytes.Buffer { // Create required rules in the antrea chains. // Use iptables-restore as it flushes the involved chains and creates the desired rules // with a single call, instead of string matching to clean up stale rules. iptablesData := bytes.NewBuffer(nil) + // Determined which version of IP is being processed by podCIDR. + isIPv6 := true + if podCIDR.IP.To4() != nil { + isIPv6 = false + } // Write head lines anyway so the undesired rules can be deleted when changing encap mode. writeLine(iptablesData, "*raw") writeLine(iptablesData, iptables.MakeChainLine(antreaPreRoutingChain)) @@ -374,6 +484,11 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet string, snatMa "-j", iptables.MasqueradeTarget, }...) } + if c.nodeportSupport { + writeLine(iptablesData, iptables.MakeChainLine(antreaNodePortServicesChain)) + writeLine(iptablesData, iptables.MakeChainLine(antreaServicesMasqChain)) + io.Copy(iptablesData, c.restoreNodePortIptablesData(hostGateway, isIPv6)) + } writeLine(iptablesData, "COMMIT") return iptablesData } @@ -389,6 +504,22 @@ func (c *Client) initIPRoutes() error { return nil } +func generateNodePortIPSETEntries(nodeIP net.IP, isIPv6 bool, svcInfos []*proxytypes.ServiceInfo) sets.String { + stringSet := sets.NewString() + for _, svcInfo := range svcInfos { + if svcInfo.NodePort() > 0 { + protocolPort := fmt.Sprintf("%s:%d", strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort()) + stringSet.Insert(fmt.Sprintf("%s,%s", nodeIP.String(), protocolPort)) + if isIPv6 { + stringSet.Insert(fmt.Sprintf("%s,%s", net.IPv6loopback.String(), protocolPort)) + } else { + stringSet.Insert(fmt.Sprintf("127.0.0.1,%s", protocolPort)) + } + } + } + return stringSet +} + // Reconcile removes orphaned podCIDRs from ipset and removes routes to orphaned podCIDRs // based on the desired podCIDRs. func (c *Client) Reconcile(podCIDRs []string) error { @@ -429,7 +560,10 @@ func (c *Client) Reconcile(podCIDRs []string) error { if reflect.DeepEqual(route.Dst, c.nodeConfig.PodIPv4CIDR) || reflect.DeepEqual(route.Dst, c.nodeConfig.PodIPv6CIDR) { continue } - if desiredPodCIDRs.Has(route.Dst.String()) { + if c.nodeportSupport && route.Dst != nil && (route.Dst.Contains(c.nodePortVirtualIP) || route.Dst.Contains(c.nodePortVirtualIPv6)) { + continue + } + if route.Dst != nil && desiredPodCIDRs.Has(route.Dst.String()) { continue } klog.Infof("Deleting unknown route %v", route) @@ -453,6 +587,9 @@ func (c *Client) Reconcile(podCIDRs []string) error { if desiredGWs.Has(neighIP) { continue } + if c.nodeportSupport && c.nodePortVirtualIPv6 != nil && c.nodePortVirtualIPv6.String() == neighIP { + continue + } klog.V(4).Infof("Deleting orphaned IPv6 neighbor %v", actualNeigh) if err := netlink.NeighDel(actualNeigh); err != nil { return err @@ -714,3 +851,145 @@ func (c *Client) DeleteSNATRule(mark uint32) error { snatIP := value.(net.IP) return c.ipt.DeleteRule(iptables.NATTable, antreaPostRoutingChain, c.snatRuleSpec(snatIP, mark)) } + +func (c *Client) ReconcileNodePort(nodeIPs []net.IP, svcEntries []*proxytypes.ServiceInfo) error { + var cluster, local []*proxytypes.ServiceInfo + for _, entry := range svcEntries { + if entry.OnlyNodeLocalEndpoints() { + local = append(local, entry) + } else { + cluster = append(cluster, entry) + } + } + reconcile := func(setName string, isIPv6 bool, desiredSvcEntries []*proxytypes.ServiceInfo) error { + existEntries, err := ipset.ListEntries(setName) + if err != nil { + return err + } + desiredEntries := sets.NewString() + for _, nodeIP := range nodeIPs { + desiredEntries.Insert(generateNodePortIPSETEntries(nodeIP, isIPv6, svcEntries).List()...) + } + for _, entry := range existEntries { + if desiredEntries.Has(entry) { + continue + } + klog.Infof("Deleting an orphaned NodePort Service entry %s from ipset", entry) + if err := ipset.DelEntry(setName, entry); err != nil { + return err + } + } + return nil + } + if err := reconcile(antreaNodePortLocalSet, false, local); err != nil { + return err + } + if err := reconcile(antreaNodePortClusterSet, false, cluster); err != nil { + return err + } + if err := reconcile(antreaNodePortLocalSet6, true, local); err != nil { + return err + } + if err := reconcile(antreaNodePortClusterSet6, true, cluster); err != nil { + return err + } + return nil +} + +func (c *Client) AddNodePortRoute(isIPv6 bool) error { + var route *netlink.Route + var nodeportIP *net.IP + if !isIPv6 { + nodeportIP = &c.nodePortVirtualIP + route = &netlink.Route{ + Dst: &net.IPNet{ + IP: *nodeportIP, + Mask: net.IPv4Mask(255, 255, 255, 255), + }, + Gw: *nodeportIP, + Flags: int(netlink.FLAG_ONLINK), + } + route.LinkIndex = c.nodeConfig.GatewayConfig.LinkIndex + } else { + nodeportIP = &c.nodePortVirtualIPv6 + route = &netlink.Route{ + Dst: &net.IPNet{ + IP: *nodeportIP, + Mask: net.IPMask{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + } + } + if err := netlink.RouteReplace(route); err != nil { + return fmt.Errorf("failed to install NodePort route: %w", err) + } + if isIPv6 { + neigh := &netlink.Neigh{ + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + Family: netlink.FAMILY_V6, + State: netlink.NUD_PERMANENT, + IP: *nodeportIP, + HardwareAddr: globalVMAC, + } + if err := netlink.NeighSet(neigh); err != nil { + return fmt.Errorf("failed to add nodeport neighbor %v to gw %s: %v", neigh, c.nodeConfig.GatewayConfig.Name, err) + } + c.nodeNeighbors.Store(nodeportIP.String(), neigh) + } + c.nodeRoutes.Store(nodeportIP.String(), route) + return nil +} + +func (c *Client) AddNodePort(nodeIPs []net.IP, svcInfo *proxytypes.ServiceInfo, isIPv6 bool) error { + setName := antreaNodePortClusterSet + if isIPv6 { + setName = antreaNodePortClusterSet6 + } + if svcInfo.OnlyNodeLocalEndpoints() { + if isIPv6 { + setName = antreaNodePortLocalSet6 + } else { + setName = antreaNodePortLocalSet + } + } + for _, nodeIP := range nodeIPs { + if err := ipset.AddEntry(setName, fmt.Sprintf("%s,%s:%d", nodeIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when adding NodePort to ipset %s: %v", setName, err) + } + } + loopbackIP := "127.0.0.1" + if isIPv6 { + loopbackIP = net.IPv6loopback.String() + } + if err := ipset.AddEntry(setName, fmt.Sprintf("%s,%s:%d", loopbackIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when adding NodePort to ipset %s: %v", setName, err) + } + return nil +} + +func (c *Client) DeleteNodePort(nodeIPs []net.IP, svcInfo *proxytypes.ServiceInfo, isIPv6 bool) error { + setName := antreaNodePortClusterSet + if isIPv6 { + setName = antreaNodePortClusterSet6 + } + if svcInfo.OnlyNodeLocalEndpoints() { + if isIPv6 { + setName = antreaNodePortLocalSet6 + } else { + setName = antreaNodePortLocalSet + } + } + for _, nodeIP := range nodeIPs { + if err := ipset.DelEntry(setName, fmt.Sprintf("%s,%s:%d", nodeIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when removing NodePort from ipset %s: %v", setName, err) + } + } + loopbackIP := "127.0.0.1" + if isIPv6 { + loopbackIP = net.IPv6loopback.String() + } + if err := ipset.DelEntry(setName, fmt.Sprintf("%s,%s:%d", loopbackIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when removing NodePort from ipset %s: %v", setName, err) + } + return nil +} diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 7eb9033d320..d2f32c6abc4 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/agent/util/winfirewall" ) @@ -35,6 +36,9 @@ const ( outboundFirewallRuleName = "Antrea: accept packets to local Pods" ) +// Client implements Interface. +var _ Interface = &Client{} + type Client struct { nr netroute.Interface nodeConfig *config.NodeConfig @@ -43,9 +47,32 @@ type Client struct { fwClient *winfirewall.Client } +func (c *Client) AddNodePortRoute(isIPv6 bool) error { + return nil +} + +func (c *Client) AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + return nil +} + +func (c *Client) DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + return nil +} + +func (c *Client) ReconcileNodePort(nodeIPs []net.IP, ports []*types.ServiceInfo) error { + return nil +} + // NewClient returns a route client. // Todo: remove param serviceCIDR after kube-proxy is replaced by Antrea Proxy completely. -func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSNAT bool) (*Client, error) { +func NewClient( + nodePortVirtualIP net.IP, + nodePortVirtualIPv6 net.IP, + serviceCIDR *net.IPNet, + networkConfig *config.NetworkConfig, + noSNAT bool, + nodeportSupport bool, +) (*Client, error) { nr := netroute.New() return &Client{ nr: nr, diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index 68e02a34646..72f611c4107 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -53,7 +53,7 @@ func TestRouteOperation(t *testing.T) { nr := netroute.New() defer nr.Exit() - client, err := NewClient(serviceCIDR, &config.NetworkConfig{}, false) + client, err := NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{}, false, false) require.Nil(t, err) nodeConfig := &config.NodeConfig{ GatewayConfig: &config.GatewayConfig{ diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 7536447f46e..4e00ccc4d51 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -22,6 +22,7 @@ package testing import ( gomock "github.com/golang/mock/gomock" config "github.com/vmware-tanzu/antrea/pkg/agent/config" + types "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" net "net" reflect "reflect" ) @@ -49,6 +50,34 @@ func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } +// AddNodePort mocks base method +func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 *types.ServiceInfo, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodePort indicates an expected call of AddNodePort +func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) +} + +// AddNodePortRoute mocks base method +func (m *MockInterface) AddNodePortRoute(arg0 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodePortRoute", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodePortRoute indicates an expected call of AddNodePortRoute +func (mr *MockInterfaceMockRecorder) AddNodePortRoute(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePortRoute", reflect.TypeOf((*MockInterface)(nil).AddNodePortRoute), arg0) +} + // AddRoutes mocks base method func (m *MockInterface) AddRoutes(arg0 *net.IPNet, arg1 string, arg2, arg3 net.IP) error { m.ctrl.T.Helper() @@ -77,6 +106,20 @@ func (mr *MockInterfaceMockRecorder) AddSNATRule(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSNATRule", reflect.TypeOf((*MockInterface)(nil).AddSNATRule), arg0, arg1) } +// DeleteNodePort mocks base method +func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 *types.ServiceInfo, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteNodePort indicates an expected call of DeleteNodePort +func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) +} + // DeleteRoutes mocks base method func (m *MockInterface) DeleteRoutes(arg0 *net.IPNet) error { m.ctrl.T.Helper() @@ -147,6 +190,20 @@ func (mr *MockInterfaceMockRecorder) Reconcile(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockInterface)(nil).Reconcile), arg0) } +// ReconcileNodePort mocks base method +func (m *MockInterface) ReconcileNodePort(arg0 []net.IP, arg1 []*types.ServiceInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReconcileNodePort", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReconcileNodePort indicates an expected call of ReconcileNodePort +func (mr *MockInterfaceMockRecorder) ReconcileNodePort(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReconcileNodePort", reflect.TypeOf((*MockInterface)(nil).ReconcileNodePort), arg0, arg1) +} + // Run mocks base method func (m *MockInterface) Run(arg0 <-chan struct{}) { m.ctrl.T.Helper() diff --git a/pkg/agent/util/ipset/ipset.go b/pkg/agent/util/ipset/ipset.go index 4bfd933f43a..6e02a3214ec 100644 --- a/pkg/agent/util/ipset/ipset.go +++ b/pkg/agent/util/ipset/ipset.go @@ -28,6 +28,8 @@ const ( // The lookup time grows linearly with the number of the different prefix values added to the set. HashNet SetType = "hash:net" HashIP SetType = "hash:ip" + // The hash:ip,port set type uses a hash to store IP address and protocol-port pairs in it. + HashIPPort SetType = "hash:ip,port" ) // memberPattern is used to match the members part of ipset list result. diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 086a8c04f85..680f45dd1ea 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -38,6 +38,7 @@ const ( MasqueradeTarget = "MASQUERADE" MarkTarget = "MARK" ConnTrackTarget = "CT" + DNATTarget = "DNAT" NoTrackTarget = "NOTRACK" SNATTarget = "SNAT" @@ -131,8 +132,9 @@ func (c *Client) ChainExists(table string, chain string) (bool, error) { return false, nil } -// EnsureRule checks if target rule already exists, appends it if not. -func (c *Client) EnsureRule(table string, chain string, ruleSpec []string) error { +// EnsureRule checks if target rule already exists, add it if not. If prepend is true, the rule will be added to the top +// of the chain. Otherwise, the rule will be appended to the chain. +func (c *Client) EnsureRule(table string, chain string, ruleSpec []string, prepend bool) error { for idx := range c.ipts { ipt := c.ipts[idx] exist, err := ipt.Exists(table, chain, ruleSpec...) @@ -142,7 +144,13 @@ func (c *Client) EnsureRule(table string, chain string, ruleSpec []string) error if exist { return nil } - if err := ipt.Append(table, chain, ruleSpec...); err != nil { + var f func() error + if !prepend { + f = func() error { return ipt.Append(table, chain, ruleSpec...) } + } else { + f = func() error { return ipt.Insert(table, chain, 1, ruleSpec...) } + } + if err := f(); err != nil { return fmt.Errorf("error appending rule %v to table %s chain %s: %v", ruleSpec, table, chain, err) } } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 6ebef62eb7c..c3c2ae31ff6 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -46,6 +46,10 @@ const ( // Service traffic. AntreaProxy featuregate.Feature = "AntreaProxy" + // alpha: v0.14 + // Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort featuregate.Feature = "AntreaProxyNodePort" + // alpha: v0.8 // beta: v0.11 // Allows to trace path from a generated packet. @@ -84,6 +88,7 @@ var ( AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, Egress: {Default: false, PreRelease: featuregate.Alpha}, EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, + AntreaProxyNodePort: {Default: false, PreRelease: featuregate.Alpha}, Traceflow: {Default: true, PreRelease: featuregate.Beta}, FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, @@ -101,6 +106,7 @@ var ( // can have different FeatureSpecs between Linux and Windows, we should // still define a separate defaultAntreaFeatureGates map for Windows. unsupportedFeaturesOnWindows = map[featuregate.Feature]struct{}{ + AntreaProxyNodePort: {}, NodePortLocal: {}, Egress: {}, } diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index caceb7f2bd5..cd2d1cdfb77 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -26,12 +26,14 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" "github.com/vmware-tanzu/antrea/pkg/agent/apiserver/handlers/podinterface" "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/openflow/cookie" "github.com/vmware-tanzu/antrea/pkg/clusteridentity" + "github.com/vmware-tanzu/antrea/pkg/features" ) // TestDeploy is a "no-op" test that simply performs setup and teardown. @@ -354,7 +356,11 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo continue } route := Route{} - if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { + m1 := matches[1] + if !strings.Contains(m1, "/") { + m1 = m1 + "/32" + } + if _, route.peerPodCIDR, err = net.ParseCIDR(m1); err != nil { return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) } if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { @@ -372,6 +378,12 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo } else if encapMode == config.TrafficEncapModeHybrid { expectedRtNumMin = 1 } + agentFeatures, err := data.GetAgentFeatures(antreaNamespace) + require.NoError(t, err) + if agentFeatures.Enabled(features.AntreaProxy) && agentFeatures.Enabled(features.AntreaProxyNodePort) { + expectedRtNumMin += 1 + expectedRtNumMax += 1 + } t.Logf("Retrieving gateway routes on Node '%s'", nodeName) var routes []Route diff --git a/test/e2e/framework.go b/test/e2e/framework.go index efce44872e8..df31f14e617 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1324,6 +1324,11 @@ func (data *TestData) createNginxClusterIPService(name string, affinity bool, ip return data.createService(name, 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeClusterIP, ipFamily) } +// createNginxNodePortService create a NodePort nginx service with the given name. +func (data *TestData) createNginxNodePortService(affinity bool, ipFamily *corev1.IPFamily) (*corev1.Service, error) { + return data.createService("nginx", 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeNodePort, ipFamily) +} + func (data *TestData) createNginxLoadBalancerService(affinity bool, ingressIPs []string, ipFamily *corev1.IPFamily) (*corev1.Service, error) { svc, err := data.createService(nginxLBService, 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeLoadBalancer, ipFamily) if err != nil { diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index ef9e31968c6..3eff6dcb5c3 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -42,6 +42,46 @@ func skipIfProxyDisabled(t *testing.T, data *TestData) { } } +func TestProxyNodePortService(t *testing.T) { + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + skipIfProxyDisabled(t, data) + skipIfNumNodesLessThan(t, 2) + + nodeName := nodeName(1) + require.NoError(t, data.createNginxPod("nginx", nodeName)) + _, err = data.podWaitForIPs(defaultTimeout, "nginx", testNamespace) + require.NoError(t, err) + require.NoError(t, data.podWaitForRunning(defaultTimeout, "nginx", testNamespace)) + ipProctol := corev1.IPv4Protocol + svc, err := data.createNginxNodePortService(true, &ipProctol) + require.NoError(t, err) + require.NoError(t, data.createBusyboxPodOnNode("busybox", nodeName)) + require.NoError(t, data.podWaitForRunning(defaultTimeout, "busybox", testNamespace)) + var nodePort string + for _, port := range svc.Spec.Ports { + if port.NodePort != 0 { + nodePort = fmt.Sprint(port.NodePort) + break + } + } + busyboxPod, err := data.podWaitFor(defaultTimeout, "busybox", testNamespace, func(pod *corev1.Pod) (bool, error) { + return pod.Status.Phase == corev1.PodRunning, nil + }) + require.NoError(t, err) + require.NotNil(t, busyboxPod.Status) + _, _, err = data.runCommandFromPod(testNamespace, "busybox", busyboxContainerName, []string{"wget", "-O", "-", net.JoinHostPort(busyboxPod.Status.HostIP, nodePort), "-T", "1"}) + require.NoError(t, err, "Service NodePort should be able to be connected from Pod") + _, _, _, err = RunCommandOnNode(controlPlaneNodeName(), strings.Join([]string{"wget", "-O", "-", net.JoinHostPort(busyboxPod.Status.HostIP, nodePort), "-T", "1"}, " ")) + require.NoError(t, err, "Service NodePort should be able to be connected from Node IP address on Node which does not have Endpoint") + _, _, _, err = RunCommandOnNode(controlPlaneNodeName(), strings.Join([]string{"wget", "-O", "-", net.JoinHostPort("127.0.0.1", nodePort), "-T", "1"}, " ")) + require.NoError(t, err, "Service NodePort should be able to be connected from loopback address on Node which does not have Endpoint") +} + func TestProxyServiceSessionAffinity(t *testing.T) { skipIfProviderIs(t, "kind", "#881 Does not work in Kind, needs to be investigated.") data, err := setupTest(t) @@ -192,19 +232,41 @@ func testProxyEndpointLifeCycle(ipFamily *corev1.IPFamily, data *TestData, t *te keywords := make(map[int]string) keywords[42] = fmt.Sprintf("nat(dst=%s)", net.JoinHostPort(nginxIP, "80")) // endpointNATTable + var groupKeywords []string + if *ipFamily == corev1.IPv6Protocol { + groupKeywords = append(groupKeywords, fmt.Sprintf("set_field:0x%s->xxreg3", strings.TrimPrefix(hex.EncodeToString(*nginxIPs.ipv6), "0"))) + } else { + groupKeywords = append(groupKeywords, fmt.Sprintf("0x%s->NXM_NX_REG3[]", strings.TrimPrefix(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"))) + } + for tableID, keyword := range keywords { tableOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%d", tableID)}) require.NoError(t, err) require.Contains(t, tableOutput, keyword) } + groupOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName}) + require.NoError(t, err) + for _, k := range groupKeywords { + require.Contains(t, groupOutput, k) + } + require.NoError(t, data.deletePodAndWait(defaultTimeout, nginx)) + // Wait for one second to make sure the pipeline to be updated. + time.Sleep(time.Second) + for tableID, keyword := range keywords { tableOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%d", tableID)}) require.NoError(t, err) require.NotContains(t, tableOutput, keyword) } + + groupOutput, _, err = data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName}) + require.NoError(t, err) + for _, k := range groupKeywords { + require.NotContains(t, groupOutput, k) + } } func TestProxyServiceLifeCycle(t *testing.T) { diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index b368494b3a9..05fc34e7d53 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -47,11 +47,12 @@ import ( ) var ( - br = "br01" - c ofClient.Client - roundInfo = types.RoundInfo{RoundNum: 0, PrevRoundNum: nil} - ovsCtlClient = ovsctl.NewClient(br) - bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, br) + br = "br01" + c ofClient.Client + roundInfo = types.RoundInfo{RoundNum: 0, PrevRoundNum: nil} + ovsCtlClient = ovsctl.NewClient(br) + bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, br) + nodePortVirtualIP = net.ParseIP("169.254.169.110") ) const ( @@ -110,7 +111,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false, true) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -137,7 +138,7 @@ func TestConnectivityFlows(t *testing.T) { } func TestReplayFlowsConnectivityFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -164,7 +165,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { } func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -343,7 +344,7 @@ func TestNetworkPolicyFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -453,7 +454,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false, true) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -485,7 +486,7 @@ type svcConfig struct { } func TestProxyServiceFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1360,7 +1361,7 @@ func prepareSNATFlows(snatIP net.IP, mark, podOFPort, podOFPortRemote uint32, vM } func TestSNATFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, false, false, true) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, nodePortVirtualIP, nil, false, false, true) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index b194acd43ff..1d26c241ac3 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -137,7 +137,7 @@ func TestInitialize(t *testing.T) { for _, tc := range tcs { t.Logf("Running Initialize test with mode %s node config %s", tc.networkConfig.TrafficEncapMode, nodeConfig) - routeClient, err := route.NewClient(serviceCIDR, tc.networkConfig, tc.noSNAT) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, tc.networkConfig, tc.noSNAT, false) assert.NoError(t, err) var xtablesReleasedTime, initializedTime time.Time @@ -241,7 +241,7 @@ func TestIpTablesSync(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + routeClient, err := route.NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false) assert.Nil(t, err) inited := make(chan struct{}) @@ -346,7 +346,7 @@ func TestAddAndDeleteRoutes(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false, false) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -446,7 +446,7 @@ func TestReconcile(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s added routes %v desired routes %v", tc.mode, tc.addedRoutes, tc.desiredPeerCIDRs) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false, false) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -484,9 +484,9 @@ func TestRouteTablePolicyOnly(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly}, false, false) assert.NoError(t, err) - err = routeClient.Initialize(nodeConfig, func() {}) + routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) // Verify gw IP gwName := nodeConfig.GatewayConfig.Name @@ -543,7 +543,7 @@ func TestIPv6RoutesAndNeighbors(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + routeClient, err := route.NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false) assert.Nil(t, err) _, ipv6Subnet, _ := net.ParseCIDR("fd74:ca9b:172:19::/64") gwIPv6 := net.ParseIP("fd74:ca9b:172:19::1")