diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 5141e8d3dcb..f88486251b2 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1312,6 +1312,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1434,6 +1437,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1499,7 +1506,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-mc9bkf47f4 + name: antrea-config-hdm8fm2tkk namespace: kube-system --- apiVersion: v1 @@ -1619,7 +1626,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-mc9bkf47f4 + name: antrea-config-hdm8fm2tkk name: antrea-config - name: antrea-controller-tls secret: @@ -1883,7 +1890,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-mc9bkf47f4 + name: antrea-config-hdm8fm2tkk name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 422756bfbb3..fdd7642e2f3 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1312,6 +1312,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1434,6 +1437,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1499,7 +1506,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-mc9bkf47f4 + name: antrea-config-hdm8fm2tkk namespace: kube-system --- apiVersion: v1 @@ -1619,7 +1626,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-mc9bkf47f4 + name: antrea-config-hdm8fm2tkk name: antrea-config - name: antrea-controller-tls secret: @@ -1885,7 +1892,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-mc9bkf47f4 + name: antrea-config-hdm8fm2tkk name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 4ed94b242db..b9e847cc682 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1312,6 +1312,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1434,6 +1437,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1499,7 +1506,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-bkc6dfdhgt + name: antrea-config-f7b6g9b8k2 namespace: kube-system --- apiVersion: v1 @@ -1619,7 +1626,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-bkc6dfdhgt + name: antrea-config-f7b6g9b8k2 name: antrea-config - name: antrea-controller-tls secret: @@ -1886,7 +1893,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-bkc6dfdhgt + name: antrea-config-f7b6g9b8k2 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index f5782ccbdab..f6af91c6e4d 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1312,6 +1312,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1439,6 +1442,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1504,7 +1511,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-ftckkg2dc8 + name: antrea-config-4g9mg9ckm8 namespace: kube-system --- apiVersion: v1 @@ -1633,7 +1640,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-ftckkg2dc8 + name: antrea-config-4g9mg9ckm8 name: antrea-config - name: antrea-controller-tls secret: @@ -1932,7 +1939,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-ftckkg2dc8 + name: antrea-config-4g9mg9ckm8 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 43ef7ca6f95..e80036ae48b 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1312,6 +1312,9 @@ data: # this flag will not take effect. # EndpointSlice: false + # Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1439,6 +1442,10 @@ data: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + + # A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + # (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + #nodePortAddresses: [] antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -1504,7 +1511,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-md64tc85t9 + name: antrea-config-dgbk4m9gfb namespace: kube-system --- apiVersion: v1 @@ -1624,7 +1631,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-md64tc85t9 + name: antrea-config-dgbk4m9gfb name: antrea-config - name: antrea-controller-tls secret: @@ -1888,7 +1895,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-md64tc85t9 + name: antrea-config-dgbk4m9gfb name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 307138a004a..5caba3e8f2d 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -10,6 +10,9 @@ featureGates: # this flag will not take effect. # EndpointSlice: false +# Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort: true + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -137,3 +140,7 @@ featureGates: # TLS min version from: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. #tlsMinVersion: + +# A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks +# (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. +#nodePortAddresses: [] diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index dfd5e74897d..a9f60c5f5f1 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -94,6 +94,8 @@ func run(o *Options) error { ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, ovsDatapathType, ovsdbConnection) ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge) ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr, ovsDatapathType, + o.nodePortVirtualIP, + o.nodePortVirtualIPv6, features.DefaultFeatureGate.Enabled(features.AntreaProxy), features.DefaultFeatureGate.Enabled(features.AntreaPolicy)) @@ -110,7 +112,7 @@ func run(o *Options) error { TrafficEncapMode: encapMode, EnableIPSecTunnel: o.config.EnableIPSecTunnel} - routeClient, err := route.NewClient(serviceCIDRNet, networkConfig, o.config.NoSNAT) + routeClient, err := route.NewClient(o.nodePortVirtualIP, o.nodePortVirtualIPv6, serviceCIDRNet, networkConfig, o.config.NoSNAT, features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort)) if err != nil { return fmt.Errorf("error creating route client: %v", err) } @@ -192,15 +194,27 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.AntreaProxy) { v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode) v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode) + var nodePortAddresses []*net.IPNet + nodePortSupport := features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort) + if nodePortSupport { + for _, nodePortAddress := range o.config.NodePortAddresses { + _, ipNet, _ := net.ParseCIDR(nodePortAddress) + nodePortAddresses = append(nodePortAddresses, ipNet) + } + } + var err error switch { case v4Enabled && v6Enabled: - proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient) + proxier, err = proxy.NewDualStackProxier(o.nodePortVirtualIP, o.nodePortVirtualIPv6, nodePortAddresses, nodeConfig.Name, nodeConfig.PodIPv4CIDR, nodeConfig.PodIPv6CIDR, informerFactory, ofClient, routeClient, nodePortSupport) case v4Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false) + proxier, err = proxy.NewProxier(o.nodePortVirtualIP, nodePortAddresses, nodeConfig.Name, nodeConfig.PodIPv4CIDR, informerFactory, ofClient, routeClient, v6Enabled, nodePortSupport) case v6Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true) + proxier, err = proxy.NewProxier(o.nodePortVirtualIPv6, nodePortAddresses, nodeConfig.Name, nodeConfig.PodIPv4CIDR, informerFactory, ofClient, routeClient, v6Enabled, nodePortSupport) default: - return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") + err = fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") + } + if err != nil { + return fmt.Errorf("error when creating Antrea Proxy: %w", err) } } diff --git a/cmd/antrea-agent/config.go b/cmd/antrea-agent/config.go index ae8f8a3640d..68625fc041f 100644 --- a/cmd/antrea-agent/config.go +++ b/cmd/antrea-agent/config.go @@ -134,4 +134,7 @@ type AgentConfig struct { TLSCipherSuites string `yaml:"tlsCipherSuites,omitempty"` // TLS min version. TLSMinVersion string `yaml:"tlsMinVersion,omitempty"` + // A string slice of values which specify the addresses to use for NodePorts. Values may be valid IP blocks + // (e.g. 1.2.3.0/24, 1.2.3.4/32). The default empty string slice ([]) means to use all local addresses. + NodePortAddresses []string `yaml:"nodePortAddresses,omitempty"` } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index b494f3353a9..b9da6b6da4c 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -43,6 +43,8 @@ const ( defaultFlowPollInterval = 5 * time.Second defaultFlowExportFrequency = 12 defaultNPLPortRange = "40000-41000" + defaultNodePortVirtualIP = "169.254.169.110" + defaultNodePortVirtualIPv6 = "fec0::ffee:ddcc:bbaa" ) type Options struct { @@ -56,10 +58,14 @@ type Options struct { flowCollectorProto string // Flow exporter poll interval pollInterval time.Duration + // The virtual IP for NodePort Service support. + nodePortVirtualIP, nodePortVirtualIPv6 net.IP } func newOptions() *Options { return &Options{ + nodePortVirtualIP: net.ParseIP(defaultNodePortVirtualIP), + nodePortVirtualIPv6: net.ParseIP(defaultNodePortVirtualIPv6), config: &AgentConfig{ EnablePrometheusMetrics: true, EnableTLSToFlowAggregator: true, @@ -141,6 +147,9 @@ func (o *Options) validate(args []string) error { // (but SNAT can be done by the primary CNI). o.config.NoSNAT = true } + if err := o.validateAntreaProxyConfig(); err != nil { + return fmt.Errorf("proxy config is invalid: %w", err) + } if err := o.validateFlowExporterConfig(); err != nil { return fmt.Errorf("failed to validate flow exporter config: %v", err) } @@ -208,6 +217,17 @@ func (o *Options) setDefaults() { } } +func (o *Options) validateAntreaProxyConfig() error { + if features.DefaultFeatureGate.Enabled(features.AntreaProxyNodePort) { + for _, nodePortAddress := range o.config.NodePortAddresses { + if _, _, err := net.ParseCIDR(nodePortAddress); err != nil { + return fmt.Errorf("NodePortAddress is not valid, can not parse `%s`: %w", nodePortAddress, err) + } + } + } + return nil +} + func (o *Options) validateFlowExporterConfig() error { if features.DefaultFeatureGate.Enabled(features.FlowExporter) { host, port, proto, err := flowexport.ParseFlowCollectorAddr(o.config.FlowCollectorAddr, defaultFlowCollectorPort, defaultFlowCollectorTransport) diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index a00daa013e1..4681ac97cc8 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -241,6 +241,7 @@ fi if $ALLFEATURES; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaProxyNodePort[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaProxyNodePort: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*FlowExporter[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ FlowExporter: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*NetworkPolicyStats[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ NetworkPolicyStats: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*EndpointSlice[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ EndpointSlice: true/" antrea-agent.conf diff --git a/pkg/agent/nodeportlocal/rules/iptable_rule.go b/pkg/agent/nodeportlocal/rules/iptable_rule.go index cccb810952a..50b164cd741 100644 --- a/pkg/agent/nodeportlocal/rules/iptable_rule.go +++ b/pkg/agent/nodeportlocal/rules/iptable_rule.go @@ -29,15 +29,15 @@ import ( const NodePortLocalChain = "ANTREA-NODE-PORT-LOCAL" // IPTableRules provides a client to perform IPTABLES operations -type iptablesRules struct { +type IPTableRules struct { name string table *iptables.Client } -// NewIPTableRules retruns a new instance of IPTableRules -func NewIPTableRules() *iptablesRules { +// NewIPTableRules returns a new instance of IPTableRules +func NewIPTableRules() *IPTableRules { iptInstance, _ := iptables.New(true, false) - iptRule := iptablesRules{ + iptRule := IPTableRules{ name: "NPL", table: iptInstance, } @@ -45,13 +45,17 @@ func NewIPTableRules() *iptablesRules { } // Init initializes IPTABLES rules for NPL. Currently it deletes existing rules to ensure that no stale entries are present. -func (ipt *iptablesRules) Init() error { +func (ipt *IPTableRules) Init() error { + err := ipt.DeleteAllRules() + if err != nil { + return err + } return ipt.CreateChains() } // CreateChains creates the chain NodePortLocalChain in NAT table. // All DNAT rules for NPL would be added in this chain. -func (ipt *iptablesRules) CreateChains() error { +func (ipt *IPTableRules) CreateChains() error { err := ipt.table.EnsureChain(iptables.NATTable, NodePortLocalChain) if err != nil { return fmt.Errorf("IPTABLES chain creation in NAT table failed for NPL with error: %v", err) @@ -59,7 +63,7 @@ func (ipt *iptablesRules) CreateChains() error { ruleSpec := []string{ "-p", "tcp", "-j", NodePortLocalChain, } - err = ipt.table.EnsureRule(iptables.NATTable, iptables.PreRoutingChain, ruleSpec) + err = ipt.table.EnsureRule(iptables.NATTable, iptables.PreRoutingChain, ruleSpec, false) if err != nil { return fmt.Errorf("IPTABLES rule creation in NAT table failed for NPL with error: %v", err) } @@ -67,12 +71,12 @@ func (ipt *iptablesRules) CreateChains() error { } // AddRule appends a DNAT rule in NodePortLocalChain chain of NAT table -func (ipt *iptablesRules) AddRule(port int, podIP string) error { +func (ipt *IPTableRules) AddRule(port int, podIP string) error { ruleSpec := []string{ "-p", "tcp", "-m", "tcp", "--dport", fmt.Sprint(port), "-j", "DNAT", "--to-destination", podIP, } - err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec) + err := ipt.table.EnsureRule(iptables.NATTable, NodePortLocalChain, ruleSpec, false) if err != nil { return fmt.Errorf("IPTABLES rule creation failed for NPL with error: %v", err) } @@ -82,7 +86,7 @@ func (ipt *iptablesRules) AddRule(port int, podIP string) error { // AddAllRules constructs a list of iptables rules for the NPL chain and performs a // iptables-restore on this chain. It uses --no-flush to keep the previous rules intact. -func (ipt *iptablesRules) AddAllRules(nplList []PodNodePort) error { +func (ipt *IPTableRules) AddAllRules(nplList []PodNodePort) error { iptablesData := bytes.NewBuffer(nil) writeLine(iptablesData, "*nat") writeLine(iptablesData, iptables.MakeChainLine(NodePortLocalChain)) @@ -105,7 +109,7 @@ func (ipt *iptablesRules) AddAllRules(nplList []PodNodePort) error { } // DeleteRule deletes a specific NPL rule from NodePortLocalChain chain -func (ipt *iptablesRules) DeleteRule(port int, podip string) error { +func (ipt *IPTableRules) DeleteRule(port int, podip string) error { klog.Infof("Deleting rule with port %v and podip %v", port, podip) ruleSpec := []string{ "-p", "tcp", "-m", "tcp", "--dport", @@ -119,7 +123,7 @@ func (ipt *iptablesRules) DeleteRule(port int, podip string) error { } // DeleteAllRules deletes all NPL rules programmed in the node -func (ipt *iptablesRules) DeleteAllRules() error { +func (ipt *IPTableRules) DeleteAllRules() error { exists, err := ipt.table.ChainExists(iptables.NATTable, NodePortLocalChain) if err != nil { return fmt.Errorf("failed to check if NodePortLocal chain exists in NAT table: %v", err) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 4848b8682b8..a3faffff5a8 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -532,6 +532,15 @@ func (c *client) InstallGatewayFlows() error { // Add flow to ensure the liveness check packet could be forwarded correctly. flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...) flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...) + if c.enableProxy { + flows = append(flows, c.arpNodePortVirtualResponderFlow()) + if gatewayConfig.IPv6 != nil { + flows = append(flows, c.serviceGatewayFlow(true)) + } + if gatewayConfig.IPv4 != nil { + flows = append(flows, c.serviceGatewayFlow(false)) + } + } // In NoEncap , no traffic from tunnel port if c.encapMode.SupportsEncap() { flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 94fae4ba65f..b09f067a980 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -38,7 +38,10 @@ import ( const bridgeName = "dummy-br" -var bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName) +var ( + bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName) + nodePortVirtualIP = net.ParseIP("169.254.169.110") +) func installNodeFlows(ofClient Client, cacheKey string) (int, error) { hostName := cacheKey @@ -91,7 +94,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -122,7 +125,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -166,7 +169,7 @@ func TestFlowInstallationFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -203,7 +206,7 @@ func TestConcurrentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -469,7 +472,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) { } func prepareTraceflowFlow(ctrl *gomock.Controller) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nodePortVirtualIP, nil, true, true) c := ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.nodeConfig = &config.NodeConfig{} @@ -487,7 +490,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { } func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, nil, nil, true, true) c := ofClient.(*client) mac, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") c.nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{MAC: mac}} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index e71c5af2b97..308bfeb955c 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -362,7 +362,9 @@ type client struct { encapMode config.TrafficEncapModeType gatewayOFPort uint32 // ovsDatapathType is the type of the datapath used by the bridge. - ovsDatapathType ovsconfig.OVSDatapathType + ovsDatapathType ovsconfig.OVSDatapathType + nodePortVirtualIPv4 net.IP // The virtual IPv4 used for host forwarding, it can be nil if NodePort support is not enabled. + nodePortVirtualIPv6 net.IP // The virtual IPv6 used for host forwarding, it can be nil if NodePort support is not enabled. // packetInHandlers stores handler to process PacketIn event. Each packetin reason can have multiple handlers registered. // When a packetin arrives, openflow send packet to registered handlers in this map. packetInHandlers map[uint8]map[string]PacketInHandler @@ -997,6 +999,22 @@ func (c *client) arpResponderFlow(peerGatewayIP net.IP, category cookie.Category Done() } +func (c *client) arpNodePortVirtualResponderFlow() binding.Flow { + return c.pipeline[arpResponderTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolARP). + MatchARPOp(1). + MatchARPTpa(c.nodePortVirtualIPv4). + Action().Move(binding.NxmFieldSrcMAC, binding.NxmFieldDstMAC). + Action().SetSrcMAC(globalVirtualMAC). + Action().LoadARPOperation(2). + Action().Move(binding.NxmFieldARPSha, binding.NxmFieldARPTha). + Action().SetARPSha(globalVirtualMAC). + Action().Move(binding.NxmFieldARPSpa, binding.NxmFieldARPTpa). + Action().SetARPSpa(c.nodePortVirtualIPv4). + Action().OutputInPort(). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()). + Done() +} + // arpResponderStaticFlow generates ARP reply for any ARP request with the same global virtual MAC. // This flow is used in policy-only mode, where traffic are routed via IP not MAC. func (c *client) arpResponderStaticFlow(category cookie.Category) binding.Flow { @@ -1910,6 +1928,22 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf return group } +// serviceGatewayFlow replies Service packets back to external addresses without entering Gateway CTZone. +func (c *client) serviceGatewayFlow(isIPv6 bool) binding.Flow { + builder := c.pipeline[conntrackCommitTable].BuildFlow(priorityHigh). + MatchCTMark(ServiceCTMark, nil). + MatchCTStateNew(true). + MatchCTStateTrk(true). + MatchRegRange(int(marksReg), markTrafficFromGateway, binding.Range{0, 15}). + Action().GotoTable(L2ForwardingOutTable). + Cookie(c.cookieAllocator.Request(cookie.Service).Raw()) + + if isIPv6 { + return builder.MatchProtocol(binding.ProtocolIPv6).Done() + } + return builder.MatchProtocol(binding.ProtocolIP).Done() +} + // decTTLFlows decrements TTL by one for the packets forwarded across Nodes. // The TTL decrement should be skipped for the packets which enter OVS pipeline // from the gateway interface, as the host IP stack should have decremented the @@ -1994,7 +2028,7 @@ func (c *client) generatePipeline() { } // NewClient is the constructor of the Client interface. -func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapathType, enableProxy, enableAntreaPolicy bool) Client { +func NewClient(bridgeName string, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapathType, nodePortVirtualIPv4 net.IP, nodePortVirtualIPv6 net.IP, enableProxy, enableAntreaPolicy bool) Client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) policyCache := cache.NewIndexer( policyConjKeyFunc, @@ -2002,6 +2036,8 @@ func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapat ) c := &client{ bridge: bridge, + nodePortVirtualIPv4: nodePortVirtualIPv4, + nodePortVirtualIPv6: nodePortVirtualIPv6, enableProxy: enableProxy, enableAntreaPolicy: enableAntreaPolicy, nodeFlowCache: newFlowCategoryCache(), diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index a87a5004a81..56d31c71569 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2020 Antrea Authors +// Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 744f5134e68..9776a2646c7 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -32,6 +32,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/querier" + "github.com/vmware-tanzu/antrea/pkg/agent/route" "github.com/vmware-tanzu/antrea/pkg/features" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" @@ -39,15 +40,18 @@ import ( ) const ( - resyncPeriod = time.Minute - componentName = "antrea-agent-proxy" + resyncPeriod = time.Minute + componentName = "antrea-agent-proxy" + nodePortLocalLabel = "NodePortLocal" ) type proxier struct { - once sync.Once - endpointSliceConfig *config.EndpointSliceConfig - endpointsConfig *config.EndpointsConfig - serviceConfig *config.ServiceConfig + onceRun sync.Once + onceInitializedReconcile sync.Once + once sync.Once + endpointSliceConfig *config.EndpointSliceConfig + endpointsConfig *config.EndpointsConfig + serviceConfig *config.ServiceConfig // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since last syncProxyRules call. For a single object, // changes are accumulated. Once both endpointsChanges and serviceChanges @@ -75,7 +79,11 @@ type proxier struct { stopChan <-chan struct{} agentQuerier querier.AgentQuerier ofClient openflow.Client + routeClient route.Interface + nodeIPs []net.IP + virtualNodePortIP net.IP isIPv6 bool + nodePortSupport bool enableEndpointSlice bool } @@ -110,14 +118,32 @@ func (p *proxier) removeStaleServices() { } } } - groupID, _ := p.groupCounter.Get(svcPortName) + groupID, _ := p.groupCounter.Get(svcPortName, "") if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err) continue } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + if svcInfo.OnlyNodeLocalEndpoints() { + nGroupID, _ := p.groupCounter.Get(svcPortName, nodePortLocalLabel) + if err := p.ofClient.UninstallServiceGroup(nGroupID); err != nil { + klog.Errorf("Failed to remove flows of Service NodePort %v: %v", svcPortName, err) + continue + } + p.groupCounter.Recycle(svcPortName, nodePortLocalLabel) + } + if err := p.ofClient.UninstallServiceFlows(p.virtualNodePortIP, uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + klog.Errorf("Failed to remove Service NodePort flows: %v", err) + continue + } + if err := p.routeClient.DeleteNodePort(p.nodeIPs, svcInfo, p.isIPv6); err != nil { + klog.Errorf("Failed to remove Service NodePort rules: %v", err) + continue + } + } delete(p.serviceInstalledMap, svcPortName) p.deleteServiceByIP(svcInfo.String()) - p.groupCounter.Recycle(svcPortName) + p.groupCounter.Recycle(svcPortName, "") } } @@ -187,7 +213,8 @@ func (p *proxier) removeStaleEndpoints() { func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { return svcInfo.ClusterIP().String() != pSvcInfo.ClusterIP().String() || svcInfo.Port() != pSvcInfo.Port() || - svcInfo.OFProtocol != pSvcInfo.OFProtocol + svcInfo.OFProtocol != pSvcInfo.OFProtocol || + svcInfo.NodePort() != pSvcInfo.NodePort() } // smallSliceDifference builds a slice which includes all the strings from s1 @@ -214,7 +241,7 @@ func smallSliceDifference(s1, s2 []string) []string { func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) - groupID, _ := p.groupCounter.Get(svcPortName) + groupID, _ := p.groupCounter.Get(svcPortName, "") endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { p.endpointsInstalledMap[svcPortName] = map[string]k8sproxy.Endpoint{} @@ -284,6 +311,21 @@ func (p *proxier) installServices() { klog.Errorf("Error when installing Endpoints groups: %v", err) continue } + // Install another group for local type NodePort service. + if p.nodePortSupport && svcInfo.NodePort() > 0 && svcInfo.OnlyNodeLocalEndpoints() { + nGroupID, _ := p.groupCounter.Get(svcPortName, nodePortLocalLabel) + var localEndpointList []k8sproxy.Endpoint + for _, ed := range endpointUpdateList { + if !ed.GetIsLocal() { + continue + } + localEndpointList = append(localEndpointList, ed) + } + if err := p.ofClient.InstallServiceGroup(nGroupID, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil { + klog.Errorf("Error when installing Group for Service NodePort local: %v", err) + continue + } + } for _, e := range endpointUpdateList { // If the Endpoint is newly installed, add a reference. if _, ok := endpointsInstalled[e.String()]; !ok { @@ -301,6 +343,18 @@ func (p *proxier) installServices() { klog.Errorf("Failed to remove flows of Service %v: %v", svcPortName, err) continue } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + err := p.ofClient.UninstallServiceFlows(p.virtualNodePortIP, uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol) + if err != nil { + klog.Errorf("Error when removing NodePort Service flows: %v", err) + continue + } + err = p.routeClient.DeleteNodePort(p.nodeIPs, pSvcInfo, p.isIPv6) + if err != nil { + klog.Errorf("Error when removing NodePort Service entries in IPSet: %v", err) + continue + } + } } if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds())); err != nil { klog.Errorf("Error when installing Service flows: %v", err) @@ -335,6 +389,20 @@ func (p *proxier) installServices() { } } } + if p.nodePortSupport && svcInfo.NodePort() > 0 { + nGroupID := groupID + if svcInfo.OnlyNodeLocalEndpoints() { + nGroupID, _ = p.groupCounter.Get(svcPortName, nodePortLocalLabel) + } + if err := p.ofClient.InstallServiceFlows(nGroupID, p.virtualNodePortIP, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds())); err != nil { + klog.Errorf("Error when installing Service NodePort flow: %v", err) + continue + } + if err := p.routeClient.AddNodePort(p.nodeIPs, svcInfo, p.isIPv6); err != nil { + klog.Errorf("Error when installing Service NodePort rules: %v", err) + continue + } + } } p.serviceInstalledMap[svcPortName] = svcPort @@ -497,7 +565,12 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { } func (p *proxier) Run(stopCh <-chan struct{}) { - p.once.Do(func() { + p.onceRun.Do(func() { + if p.nodePortSupport { + if err := p.routeClient.AddNodePortRoute(p.isIPv6); err != nil { + panic(err) + } + } go p.serviceConfig.Run(stopCh) if p.enableEndpointSlice { go p.endpointSliceConfig.Run(stopCh) @@ -509,11 +582,76 @@ func (p *proxier) Run(stopCh <-chan struct{}) { }) } +// getLocalAddrs returns a list of all network addresses on the local system. +func getLocalAddrs() ([]net.IP, error) { + var localAddrs []net.IP + + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil, err + } + + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err != nil { + return nil, err + } + localAddrs = append(localAddrs, ip) + } + + return localAddrs, nil +} + +func filterIPFamily(isV6 bool, ips ...net.IP) []net.IP { + var result []net.IP + for _, ip := range ips { + if !isV6 && !utilnet.IsIPv6(ip) { + result = append(result, ip) + } else if isV6 && utilnet.IsIPv6(ip) { + result = append(result, ip) + } + } + return result +} + +func getAvailableAddresses(nodePortAddresses []*net.IPNet, podCIDR *net.IPNet, ipv6 bool) ([]net.IP, error) { + localAddresses, err := getLocalAddrs() + if err != nil { + return nil, err + } + var nodeIPs []net.IP + for _, nodeIP := range filterIPFamily(ipv6, localAddresses...) { + if podCIDR.Contains(nodeIP) { + continue + } + var contains bool + for _, nodePortAddress := range nodePortAddresses { + if nodePortAddress.Contains(nodeIP) { + contains = true + break + } + } + if len(nodePortAddresses) == 0 || contains { + nodeIPs = append(nodeIPs, nodeIP) + } + } + if len(nodeIPs) == 0 { + klog.Warningln("No qualified node IP found.") + } + return nodeIPs, nil +} + func NewProxier( + virtualNodePortIP net.IP, + nodePortAddresses []*net.IPNet, hostname string, + podCIDR *net.IPNet, informerFactory informers.SharedInformerFactory, ofClient openflow.Client, - isIPv6 bool) *proxier { + routeClient route.Interface, + isIPv6 bool, + nodePortSupport bool, +) (*proxier, error) { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, @@ -537,7 +675,19 @@ func NewProxier( serviceStringMap: map[string]k8sproxy.ServicePortName{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + routeClient: routeClient, + virtualNodePortIP: virtualNodePortIP, isIPv6: isIPv6, + nodePortSupport: nodePortSupport, + } + + if nodePortSupport { + nodeIPs, err := getAvailableAddresses(nodePortAddresses, podCIDR, isIPv6) + if err != nil { + return nil, err + } + klog.Infof("Proxy NodePort Services on addresses: %v", nodeIPs) + p.nodeIPs = nodeIPs } p.serviceConfig.RegisterEventHandler(p) p.endpointsConfig.RegisterEventHandler(p) @@ -550,20 +700,36 @@ func NewProxier( p.endpointsConfig.RegisterEventHandler(p) } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, 0, 30*time.Second, -1) - return p + return p, nil } func NewDualStackProxier( - hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) k8sproxy.Provider { - - // Create an ipv4 instance of the single-stack proxier - ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false) + virtualNodePortIP net.IP, + virtualNodePortIPv6 net.IP, + nodePortAddresses []*net.IPNet, + hostname string, + podIPv4CIDR *net.IPNet, + podIPv6CIDR *net.IPNet, + informerFactory informers.SharedInformerFactory, + ofClient openflow.Client, + routeClient route.Interface, + nodePortSupport bool, +) (k8sproxy.Provider, error) { + + // Create an ipv4 instance of the single-stack proxier. + ipv4Proxier, err := NewProxier(virtualNodePortIP, nodePortAddresses, hostname, podIPv4CIDR, informerFactory, ofClient, routeClient, false, nodePortSupport) + if err != nil { + return nil, err + } - // Create an ipv6 instance of the single-stack proxier - ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true) + // Create an ipv6 instance of the single-stack proxier. + ipv6Proxier, err := NewProxier(virtualNodePortIPv6, nodePortAddresses, hostname, podIPv6CIDR, informerFactory, ofClient, routeClient, true, nodePortSupport) + if err != nil { + return nil, err + } // Return a meta-proxier that dispatch calls between the two - // single-stack proxier instances + // single-stack proxier instances. metaProxier := k8sproxy.NewMetaProxier(ipv4Proxier, ipv6Proxier) - return metaProxier + return metaProxier, nil } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 4f37b7dfbb9..f4f5b30691d 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -33,10 +33,14 @@ import ( ofmock "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" + "github.com/vmware-tanzu/antrea/pkg/agent/route" + routetesting "github.com/vmware-tanzu/antrea/pkg/agent/route/testing" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) +var virtualNodePortIP = net.ParseIP("169.254.169.110") + func makeNamespaceName(namespace, name string) apimachinerytypes.NamespacedName { return apimachinerytypes.NamespacedName{Namespace: namespace, Name: name} } @@ -80,7 +84,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) return ept } -func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { +func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, isIPv6, nodeportSupport bool) *proxier { hostname := "localhost" eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder( @@ -97,8 +101,12 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { endpointsMap: types.EndpointsMap{}, groupCounter: types.NewGroupCounter(), ofClient: ofClient, + routeClient: routeClient, serviceStringMap: map[string]k8sproxy.ServicePortName{}, isIPv6: isIPv6, + nodeIPs: []net.IP{net.ParseIP("192.168.0.1")}, + virtualNodePortIP: virtualNodePortIP, + nodePortSupport: nodeportSupport, } return p } @@ -107,7 +115,8 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -141,7 +150,7 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { }), ) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP if isIPv6 { @@ -157,7 +166,8 @@ func TestLoadbalancer(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, false) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, true) svcIPv4 := net.ParseIP("10.20.30.41") svcPort := 80 @@ -198,7 +208,7 @@ func TestLoadbalancer(t *testing.T) { }), ) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any(), false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) @@ -208,6 +218,73 @@ func TestLoadbalancer(t *testing.T) { fp.syncProxyRules() } +func TestExternalNameToNodePort(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient := ofmock.NewMockClient(ctrl) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, true) + + svcIPv4 := net.ParseIP("10.20.30.41") + svcPort := 80 + svcNodePort := 31000 + svcPortName := k8sproxy.ServicePortName{ + NamespacedName: makeNamespaceName("ns1", "svc1"), + Port: "80", + Protocol: corev1.ProtocolTCP, + } + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() // Should be ignored. + svc.Spec.Type = corev1.ServiceTypeExternalName + svc.Spec.ExternalName = "a.b.c.com" + svc.Spec.Ports = []corev1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + fp.syncProxyRules() + + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() + svc.Spec.Type = corev1.ServiceTypeNodePort + svc.Spec.Ports = []corev1.ServicePort{{ + NodePort: int32(svcNodePort), + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + epIP := net.ParseIP("10.180.0.1") + epFunc := func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: epIP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + } + ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) + makeEndpointsMap(fp, ep) + groupID, _ := fp.groupCounter.Get(svcPortName, "") + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + bindingProtocol := binding.ProtocolTCP + mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, virtualNodePortIP, uint16(svcNodePort), bindingProtocol, uint16(0)).Times(1) + mockRouteClient.EXPECT().AddNodePort(gomock.Any(), gomock.Any(), false).Times(1) + + fp.syncProxyRules() +} + func TestClusterIPv4(t *testing.T) { testClusterIP(t, net.ParseIP("10.20.30.41"), net.ParseIP("10.180.0.1"), false) } @@ -220,7 +297,8 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, true) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -257,7 +335,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) } ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) makeEndpointsMap(fp, ep) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) @@ -284,7 +362,8 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -321,7 +400,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -385,8 +465,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP } makeEndpointsMap(fp, ep, epUDP) - groupID, _ := fp.groupCounter.Get(svcPortName) - groupIDUDP, _ := fp.groupCounter.Get(svcPortNameUDP) + groupID, _ := fp.groupCounter.Get(svcPortName, "") + groupIDUDP, _ := fp.groupCounter.Get(svcPortNameUDP, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any(), isIPv6).Times(1) @@ -412,7 +492,8 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcPortName := k8sproxy.ServicePortName{ @@ -448,7 +529,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv bindingProtocol = binding.ProtocolTCPv6 } makeEndpointsMap(fp, ep) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(2) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0)).Times(1) @@ -471,7 +552,8 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -484,7 +566,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { - svc.Spec.Type = "NodePort" + svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP @@ -519,7 +601,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 } - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, true, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(corev1.DefaultClientIPServiceAffinitySeconds)).Times(1) @@ -539,7 +621,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + fp := NewFakeProxier(nil, mockOFClient, isIPv6, false) svcPort := 80 svcNodePort := 3001 @@ -552,7 +634,7 @@ func testSessionAffinity(t *testing.T, svcExternalIPs net.IP, svcIP net.IP, isIP makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { - svc.Spec.Type = "NodePort" + svc.Spec.Type = corev1.ServiceTypeNodePort svc.Spec.ClusterIP = svcIP.String() svc.Spec.ExternalIPs = []string{svcExternalIPs.String()} svc.Spec.SessionAffinity = corev1.ServiceAffinityClientIP @@ -586,7 +668,8 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, isIPv6) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, isIPv6, false) svcPort1 := 80 svcPort2 := 8080 @@ -625,7 +708,7 @@ func testPortChange(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { } ep := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc) makeEndpointsMap(fp, ep) - groupID, _ := fp.groupCounter.Get(svcPortName) + groupID, _ := fp.groupCounter.Get(svcPortName, "") mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any(), isIPv6).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort1), bindingProtocol, uint16(0)) @@ -659,7 +742,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOFClient := ofmock.NewMockClient(ctrl) - fp := NewFakeProxier(mockOFClient, false) + mockRouteClient := routetesting.NewMockInterface(ctrl) + fp := NewFakeProxier(mockRouteClient, mockOFClient, false, false) epIP := net.ParseIP("10.50.60.71") svcIP1 := net.ParseIP("10.180.30.41") svcIP2 := net.ParseIP("10.180.30.42") @@ -710,8 +794,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { ep1 := epMapFactory(svcPortName1, epIP.String()) ep2 := epMapFactory(svcPortName2, epIP.String()) - groupID1, _ := fp.groupCounter.Get(svcPortName1) - groupID2, _ := fp.groupCounter.Get(svcPortName2) + groupID1, _ := fp.groupCounter.Get(svcPortName1, "") + groupID2, _ := fp.groupCounter.Get(svcPortName2, "") mockOFClient.EXPECT().InstallServiceGroup(groupID1, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID2, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP diff --git a/pkg/agent/proxy/types/groupcounter.go b/pkg/agent/proxy/types/groupcounter.go index ef41a539f42..cd9083d800b 100644 --- a/pkg/agent/proxy/types/groupcounter.go +++ b/pkg/agent/proxy/types/groupcounter.go @@ -15,6 +15,7 @@ package types import ( + "fmt" "sync" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" @@ -27,10 +28,10 @@ type GroupCounter interface { // If the group ID of the service has been generated, then return the // prior one. The bool return value indicates whether the groupID is newly // generated. - Get(svcPortName k8sproxy.ServicePortName) (binding.GroupIDType, bool) + Get(svcPortName k8sproxy.ServicePortName, label string) (binding.GroupIDType, bool) // Recycle removes a Service Group ID mapping. The recycled groupID can be // reused. - Recycle(svcPortName k8sproxy.ServicePortName) bool + Recycle(svcPortName k8sproxy.ServicePortName, label string) bool } type groupCounter struct { @@ -38,37 +39,46 @@ type groupCounter struct { groupIDCounter binding.GroupIDType recycled []binding.GroupIDType - groupMap map[k8sproxy.ServicePortName]binding.GroupIDType + groupMap map[string]binding.GroupIDType } func NewGroupCounter() *groupCounter { - return &groupCounter{groupMap: map[k8sproxy.ServicePortName]binding.GroupIDType{}} + return &groupCounter{groupMap: map[string]binding.GroupIDType{}} } -func (c *groupCounter) Get(svcPortName k8sproxy.ServicePortName) (binding.GroupIDType, bool) { +func keyString(svcPortName k8sproxy.ServicePortName, label string) string { + key := svcPortName.String() + if len(key) > 0 { + key = fmt.Sprintf("%s/%s", key, label) + } + return key +} + +func (c *groupCounter) Get(svcPortName k8sproxy.ServicePortName, label string) (binding.GroupIDType, bool) { c.mu.Lock() defer c.mu.Unlock() - - if id, ok := c.groupMap[svcPortName]; ok { + key := keyString(svcPortName, label) + if id, ok := c.groupMap[key]; ok { return id, false } else if len(c.recycled) != 0 { id = c.recycled[len(c.recycled)-1] c.recycled = c.recycled[:len(c.recycled)-1] - c.groupMap[svcPortName] = id + c.groupMap[key] = id return id, true } else { c.groupIDCounter += 1 - c.groupMap[svcPortName] = c.groupIDCounter + c.groupMap[key] = c.groupIDCounter return c.groupIDCounter, true } } -func (c *groupCounter) Recycle(svcPortName k8sproxy.ServicePortName) bool { +func (c *groupCounter) Recycle(svcPortName k8sproxy.ServicePortName, label string) bool { c.mu.Lock() defer c.mu.Unlock() - if id, ok := c.groupMap[svcPortName]; ok { - delete(c.groupMap, svcPortName) + key := keyString(svcPortName, label) + if id, ok := c.groupMap[key]; ok { + delete(c.groupMap, key) c.recycled = append(c.recycled, id) return true } diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index df3a8eb6f5a..1b5b79d6e6f 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -18,6 +18,7 @@ import ( "net" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" ) // Interface is the interface for routing container packets in host network. @@ -38,6 +39,20 @@ type Interface interface { // It should do nothing if the routes don't exist, without error. DeleteRoutes(podCIDR *net.IPNet) error + // AddRoutes should add the route to the NodePort virtual IP. + AddNodePortRoute(isIPv6 bool) error + + // ReconcileNodePort should remove orphaned NodePort ipset entries. + ReconcileNodePort(nodeIPs []net.IP, svcEntries []*types.ServiceInfo) error + + // AddNodePort should add entries of the NodePort Service to the ipset. + // It should have no effect if the entry already exist, without error. + AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error + + // DeleteNodePort should delete entries of the NodePort Service from ipset. + // It should do nothing if entries of the NodePort Service don't exist, without error. + DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error + // MigrateRoutesToGw should move routes from device linkname to local gateway. MigrateRoutesToGw(linkName string) error diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index bb9c82e8cf4..6bd5bc81460 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -17,9 +17,11 @@ package route import ( "bytes" "fmt" + "io" "net" "reflect" "strconv" + "strings" "sync" "time" @@ -31,6 +33,7 @@ import ( "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" + proxytypes "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/types" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/agent/util/ipset" @@ -48,13 +51,23 @@ const ( antreaPodIPSet = "ANTREA-POD-IP" // antreaPodIP6Set contains all IPv6 Pod CIDRs of this cluster. antreaPodIP6Set = "ANTREA-POD-IP6" + // antreaNodePortClusterSet contains all Cluster type NodePort Services Addresses. + antreaNodePortClusterSet = "ANTREA-SVC-NP-CLUSTER" + antreaNodePortClusterSet6 = "ANTREA-SVC-NP-CLUSTER6" + antreaNodePortLocalSet = "ANTREA-SVC-NP-LOCAL" + antreaNodePortLocalSet6 = "ANTREA-SVC-NP-LOCAL6" // Antrea managed iptables chains. - antreaForwardChain = "ANTREA-FORWARD" - antreaPreRoutingChain = "ANTREA-PREROUTING" - antreaPostRoutingChain = "ANTREA-POSTROUTING" - antreaOutputChain = "ANTREA-OUTPUT" - antreaMangleChain = "ANTREA-MANGLE" + antreaForwardChain = "ANTREA-FORWARD" + antreaPreRoutingChain = "ANTREA-PREROUTING" + antreaNodePortServicesChain = "ANTREA-SVC-NP" + antreaServicesMasqChain = "ANTREA-SVC-MASQ" + antreaPostRoutingChain = "ANTREA-POSTROUTING" + antreaOutputChain = "ANTREA-OUTPUT" + antreaMangleChain = "ANTREA-MANGLE" + + localNodePortMark = "0xf0" + clusterNodePortMark = "0xf1" ) // Client implements Interface. @@ -79,7 +92,9 @@ type Client struct { // nodeRoutes caches ip routes to remote Pods. It's a map of podCIDR to routes. nodeRoutes sync.Map // nodeNeighbors caches IPv6 Neighbors to remote host gateway - nodeNeighbors sync.Map + nodeNeighbors sync.Map + nodePortVirtualIP, nodePortVirtualIPv6 net.IP + nodeportSupport bool // iptablesInitialized is used to notify when iptables initialization is done. iptablesInitialized chan struct{} } @@ -87,11 +102,21 @@ type Client struct { // NewClient returns a route client. // TODO: remove param serviceCIDR after kube-proxy is replaced by Antrea Proxy. This param is not used in this file; // leaving it here is to be compatible with the implementation on Windows. -func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSNAT bool) (*Client, error) { +func NewClient( + nodePortVirtualIP net.IP, + nodePortVirtualIPv6 net.IP, + serviceCIDR *net.IPNet, + networkConfig *config.NetworkConfig, + noSNAT bool, + nodeportSupport bool, +) (*Client, error) { return &Client{ - serviceCIDR: serviceCIDR, - networkConfig: networkConfig, - noSNAT: noSNAT, + nodePortVirtualIP: nodePortVirtualIP, + nodePortVirtualIPv6: nodePortVirtualIPv6, + serviceCIDR: serviceCIDR, + networkConfig: networkConfig, + noSNAT: noSNAT, + nodeportSupport: nodeportSupport, }, nil } @@ -155,6 +180,20 @@ func (c *Client) syncIPInfra() { // syncIPSet ensures that the required ipset exists and it has the initial members. func (c *Client) syncIPSet() error { + if c.nodeportSupport { + if err := ipset.CreateIPSet(antreaNodePortClusterSet, ipset.HashIPPort, false); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortLocalSet, ipset.HashIPPort, false); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortClusterSet6, ipset.HashIPPort, true); err != nil { + return err + } + if err := ipset.CreateIPSet(antreaNodePortLocalSet6, ipset.HashIPPort, true); err != nil { + return err + } + } // In policy-only mode, Node Pod CIDR is undefined. if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { return nil @@ -221,20 +260,35 @@ func (c *Client) syncIPTables() error { // Create the antrea managed chains and link them to built-in chains. // We cannot use iptables-restore for these jump rules because there // are non antrea managed rules in built-in chains. - jumpRules := []struct{ table, srcChain, dstChain, comment string }{ - {iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules"}, - {iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, - {iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules"}, - {iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules"}, - {iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules"}, // TODO: unify the chain naming style - {iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules"}, + jumpRules := []struct { + need bool + table, srcChain, dstChain, comment string + prepend bool + }{ + {true, iptables.RawTable, iptables.PreRoutingChain, antreaPreRoutingChain, "Antrea: jump to Antrea prerouting rules", false}, + {true, iptables.RawTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {true, iptables.FilterTable, iptables.ForwardChain, antreaForwardChain, "Antrea: jump to Antrea forwarding rules", false}, + {true, iptables.NATTable, iptables.PostRoutingChain, antreaPostRoutingChain, "Antrea: jump to Antrea postrouting rules", false}, + {true, iptables.MangleTable, iptables.PreRoutingChain, antreaMangleChain, "Antrea: jump to Antrea mangle rules", false}, + {true, iptables.MangleTable, iptables.OutputChain, antreaOutputChain, "Antrea: jump to Antrea output rules", false}, + {c.nodeportSupport, iptables.NATTable, iptables.PreRoutingChain, antreaNodePortServicesChain, "Antrea: jump to Antrea NodePort Service rules", true}, + {c.nodeportSupport, iptables.NATTable, iptables.OutputChain, antreaNodePortServicesChain, "Antrea: jump to Antrea NodePort Service rules", true}, + {c.nodeportSupport, iptables.NATTable, iptables.PostRoutingChain, antreaServicesMasqChain, "Antrea: jump to Antrea NodePort Service masquerade rules", true}, } for _, rule := range jumpRules { + ruleSpec := []string{ + "-j", rule.dstChain, + "-m", "comment", "--comment", rule.comment, + } + if !rule.need { + _ = c.ipt.DeleteChain(rule.table, rule.dstChain) + _ = c.ipt.DeleteRule(rule.table, rule.srcChain, ruleSpec) + continue + } if err := c.ipt.EnsureChain(rule.table, rule.dstChain); err != nil { return err } - ruleSpec := []string{"-j", rule.dstChain, "-m", "comment", "--comment", rule.comment} - if err := c.ipt.EnsureRule(rule.table, rule.srcChain, ruleSpec); err != nil { + if err := c.ipt.EnsureRule(rule.table, rule.srcChain, ruleSpec, rule.prepend); err != nil { return err } } @@ -259,11 +313,67 @@ func (c *Client) syncIPTables() error { return nil } +func (c *Client) restoreNodePortIptablesData(hostGateway string, isIPv6 bool) *bytes.Buffer { + localSet := antreaNodePortLocalSet + clusterSet := antreaNodePortClusterSet + loopbackAddr := "127.0.0.1" + nodePortVirtualIP := c.nodePortVirtualIP.String() + if isIPv6 { + localSet = antreaNodePortLocalSet6 + clusterSet = antreaNodePortClusterSet6 + loopbackAddr = net.IPv6loopback.String() + nodePortVirtualIP = c.nodePortVirtualIPv6.String() + } + iptablesData := new(bytes.Buffer) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", localSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", localNodePortMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "set", "--match-set", clusterSet, "dst,dst", + "-j", iptables.MarkTarget, "--set-mark", clusterNodePortMark, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", localNodePortMark, + "-j", iptables.DNATTarget, "--to-destination", nodePortVirtualIP, + }...) + writeLine(iptablesData, []string{ + "-A", antreaNodePortServicesChain, + "-m", "mark", "--mark", clusterNodePortMark, + "-j", iptables.DNATTarget, "--to-destination", nodePortVirtualIP, + }...) + writeLine(iptablesData, + "-A", antreaServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets with a loopback address"`, + "-s", loopbackAddr, + "-d", nodePortVirtualIP, + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + writeLine(iptablesData, + "-A", antreaServicesMasqChain, + "-m", "comment", "--comment", `"Antrea: Masquerade NodePort packets which target Service with Local externalTrafficPolicy"`, + "-m", "mark", "--mark", clusterNodePortMark, + "-d", nodePortVirtualIP, + "-o", hostGateway, + "-j", iptables.MasqueradeTarget, + ) + return iptablesData +} + func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet string) *bytes.Buffer { // Create required rules in the antrea chains. // Use iptables-restore as it flushes the involved chains and creates the desired rules // with a single call, instead of string matching to clean up stale rules. iptablesData := bytes.NewBuffer(nil) + // Determined which version of IP is being processed by podCIDR. + isIPv6 := true + if podCIDR.IP.To4() != nil { + isIPv6 = false + } // Write head lines anyway so the undesired rules can be deleted when changing encap mode. writeLine(iptablesData, "*raw") writeLine(iptablesData, iptables.MakeChainLine(antreaPreRoutingChain)) @@ -348,6 +458,11 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet string) *bytes "-j", iptables.MasqueradeTarget, }...) } + if c.nodeportSupport { + writeLine(iptablesData, iptables.MakeChainLine(antreaNodePortServicesChain)) + writeLine(iptablesData, iptables.MakeChainLine(antreaServicesMasqChain)) + io.Copy(iptablesData, c.restoreNodePortIptablesData(hostGateway, isIPv6)) + } writeLine(iptablesData, "COMMIT") return iptablesData } @@ -363,6 +478,22 @@ func (c *Client) initIPRoutes() error { return nil } +func generateNodePortIPSETEntries(nodeIP net.IP, isIPv6 bool, svcInfos []*proxytypes.ServiceInfo) sets.String { + stringSet := sets.NewString() + for _, svcInfo := range svcInfos { + if svcInfo.NodePort() > 0 { + protocolPort := fmt.Sprintf("%s:%d", strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort()) + stringSet.Insert(fmt.Sprintf("%s,%s", nodeIP.String(), protocolPort)) + if isIPv6 { + stringSet.Insert(fmt.Sprintf("%s,%s", net.IPv6loopback.String(), protocolPort)) + } else { + stringSet.Insert(fmt.Sprintf("127.0.0.1,%s", protocolPort)) + } + } + } + return stringSet +} + // Reconcile removes orphaned podCIDRs from ipset and removes routes to orphaned podCIDRs // based on the desired podCIDRs. func (c *Client) Reconcile(podCIDRs []string) error { @@ -403,7 +534,10 @@ func (c *Client) Reconcile(podCIDRs []string) error { if reflect.DeepEqual(route.Dst, c.nodeConfig.PodIPv4CIDR) || reflect.DeepEqual(route.Dst, c.nodeConfig.PodIPv6CIDR) { continue } - if desiredPodCIDRs.Has(route.Dst.String()) { + if c.nodeportSupport && route.Dst != nil && (route.Dst.Contains(c.nodePortVirtualIP) || route.Dst.Contains(c.nodePortVirtualIPv6)) { + continue + } + if route.Dst != nil && desiredPodCIDRs.Has(route.Dst.String()) { continue } klog.Infof("Deleting unknown route %v", route) @@ -427,6 +561,9 @@ func (c *Client) Reconcile(podCIDRs []string) error { if desiredGWs.Has(neighIP) { continue } + if c.nodeportSupport && c.nodePortVirtualIPv6 != nil && c.nodePortVirtualIPv6.String() == neighIP { + continue + } klog.V(4).Infof("Deleting orphaned IPv6 neighbor %v", actualNeigh) if err := netlink.NeighDel(actualNeigh); err != nil { return err @@ -657,3 +794,145 @@ func (c *Client) UnMigrateRoutesFromGw(route *net.IPNet, linkName string) error } return nil } + +func (c *Client) ReconcileNodePort(nodeIPs []net.IP, svcEntries []*proxytypes.ServiceInfo) error { + var cluster, local []*proxytypes.ServiceInfo + for _, entry := range svcEntries { + if entry.OnlyNodeLocalEndpoints() { + local = append(local, entry) + } else { + cluster = append(cluster, entry) + } + } + reconcile := func(setName string, isIPv6 bool, desiredSvcEntries []*proxytypes.ServiceInfo) error { + existEntries, err := ipset.ListEntries(setName) + if err != nil { + return err + } + desiredEntries := sets.NewString() + for _, nodeIP := range nodeIPs { + desiredEntries.Insert(generateNodePortIPSETEntries(nodeIP, isIPv6, svcEntries).List()...) + } + for _, entry := range existEntries { + if desiredEntries.Has(entry) { + continue + } + klog.Infof("Deleting an orphaned NodePort Service entry %s from ipset", entry) + if err := ipset.DelEntry(setName, entry); err != nil { + return err + } + } + return nil + } + if err := reconcile(antreaNodePortLocalSet, false, local); err != nil { + return err + } + if err := reconcile(antreaNodePortClusterSet, false, cluster); err != nil { + return err + } + if err := reconcile(antreaNodePortLocalSet6, true, local); err != nil { + return err + } + if err := reconcile(antreaNodePortClusterSet6, true, cluster); err != nil { + return err + } + return nil +} + +func (c *Client) AddNodePortRoute(isIPv6 bool) error { + var route *netlink.Route + var nodeportIP *net.IP + if !isIPv6 { + nodeportIP = &c.nodePortVirtualIP + route = &netlink.Route{ + Dst: &net.IPNet{ + IP: *nodeportIP, + Mask: net.IPv4Mask(255, 255, 255, 255), + }, + Gw: *nodeportIP, + Flags: int(netlink.FLAG_ONLINK), + } + route.LinkIndex = c.nodeConfig.GatewayConfig.LinkIndex + } else { + nodeportIP = &c.nodePortVirtualIPv6 + route = &netlink.Route{ + Dst: &net.IPNet{ + IP: *nodeportIP, + Mask: net.IPMask{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + } + } + if err := netlink.RouteReplace(route); err != nil { + return fmt.Errorf("failed to install NodePort route: %w", err) + } + if isIPv6 { + neigh := &netlink.Neigh{ + LinkIndex: c.nodeConfig.GatewayConfig.LinkIndex, + Family: netlink.FAMILY_V6, + State: netlink.NUD_PERMANENT, + IP: *nodeportIP, + HardwareAddr: globalVMAC, + } + if err := netlink.NeighSet(neigh); err != nil { + return fmt.Errorf("failed to add nodeport neighbor %v to gw %s: %v", neigh, c.nodeConfig.GatewayConfig.Name, err) + } + c.nodeNeighbors.Store(nodeportIP.String(), neigh) + } + c.nodeRoutes.Store(nodeportIP.String(), route) + return nil +} + +func (c *Client) AddNodePort(nodeIPs []net.IP, svcInfo *proxytypes.ServiceInfo, isIPv6 bool) error { + setName := antreaNodePortClusterSet + if isIPv6 { + setName = antreaNodePortClusterSet6 + } + if svcInfo.OnlyNodeLocalEndpoints() { + if isIPv6 { + setName = antreaNodePortLocalSet6 + } else { + setName = antreaNodePortLocalSet + } + } + for _, nodeIP := range nodeIPs { + if err := ipset.AddEntry(setName, fmt.Sprintf("%s,%s:%d", nodeIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when adding NodePort to ipset %s: %v", setName, err) + } + } + loopbackIP := "127.0.0.1" + if isIPv6 { + loopbackIP = net.IPv6loopback.String() + } + if err := ipset.AddEntry(setName, fmt.Sprintf("%s,%s:%d", loopbackIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when adding NodePort to ipset %s: %v", setName, err) + } + return nil +} + +func (c *Client) DeleteNodePort(nodeIPs []net.IP, svcInfo *proxytypes.ServiceInfo, isIPv6 bool) error { + setName := antreaNodePortClusterSet + if isIPv6 { + setName = antreaNodePortClusterSet6 + } + if svcInfo.OnlyNodeLocalEndpoints() { + if isIPv6 { + setName = antreaNodePortLocalSet6 + } else { + setName = antreaNodePortLocalSet + } + } + for _, nodeIP := range nodeIPs { + if err := ipset.DelEntry(setName, fmt.Sprintf("%s,%s:%d", nodeIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when removing NodePort from ipset %s: %v", setName, err) + } + } + loopbackIP := "127.0.0.1" + if isIPv6 { + loopbackIP = net.IPv6loopback.String() + } + if err := ipset.DelEntry(setName, fmt.Sprintf("%s,%s:%d", loopbackIP, strings.ToLower(string(svcInfo.Protocol())), svcInfo.NodePort())); err != nil { + klog.Errorf("Error when removing NodePort from ipset %s: %v", setName, err) + } + return nil +} diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 8ab8487f136..c56ff3067fa 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/util" "github.com/vmware-tanzu/antrea/pkg/agent/util/winfirewall" ) @@ -35,6 +36,9 @@ const ( outboundFirewallRuleName = "Antrea: accept packets to local Pods" ) +// Client implements Interface. +var _ Interface = &Client{} + type Client struct { nr netroute.Interface nodeConfig *config.NodeConfig @@ -43,9 +47,32 @@ type Client struct { fwClient *winfirewall.Client } +func (c *Client) AddNodePortRoute(isIPv6 bool) error { + return nil +} + +func (c *Client) AddNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + return nil +} + +func (c *Client) DeleteNodePort(nodeIPs []net.IP, svcInfo *types.ServiceInfo, isIPv6 bool) error { + return nil +} + +func (c *Client) ReconcileNodePort(nodeIPs []net.IP, ports []*types.ServiceInfo) error { + return nil +} + // NewClient returns a route client. // Todo: remove param serviceCIDR after kube-proxy is replaced by Antrea Proxy completely. -func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSNAT bool) (*Client, error) { +func NewClient( + nodePortVirtualIP net.IP, + nodePortVirtualIPv6 net.IP, + serviceCIDR *net.IPNet, + networkConfig *config.NetworkConfig, + noSNAT bool, + nodeportSupport bool, +) (*Client, error) { nr := netroute.New() return &Client{ nr: nr, diff --git a/pkg/agent/route/route_windows_test.go b/pkg/agent/route/route_windows_test.go index f83840b75f6..ae5267aef93 100644 --- a/pkg/agent/route/route_windows_test.go +++ b/pkg/agent/route/route_windows_test.go @@ -53,7 +53,7 @@ func TestRouteOperation(t *testing.T) { nr := netroute.New() defer nr.Exit() - client, err := NewClient(serviceCIDR, &config.NetworkConfig{}, false) + client, err := NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{}, false, false) require.Nil(t, err) nodeConfig := &config.NodeConfig{ GatewayConfig: &config.GatewayConfig{ diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 4585d358148..0fdfad8098f 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -22,6 +22,7 @@ package testing import ( gomock "github.com/golang/mock/gomock" config "github.com/vmware-tanzu/antrea/pkg/agent/config" + types "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" net "net" reflect "reflect" ) @@ -49,6 +50,34 @@ func (m *MockInterface) EXPECT() *MockInterfaceMockRecorder { return m.recorder } +// AddNodePort mocks base method +func (m *MockInterface) AddNodePort(arg0 []net.IP, arg1 *types.ServiceInfo, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodePort", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodePort indicates an expected call of AddNodePort +func (mr *MockInterfaceMockRecorder) AddNodePort(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePort", reflect.TypeOf((*MockInterface)(nil).AddNodePort), arg0, arg1, arg2) +} + +// AddNodePortRoute mocks base method +func (m *MockInterface) AddNodePortRoute(arg0 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddNodePortRoute", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddNodePortRoute indicates an expected call of AddNodePortRoute +func (mr *MockInterfaceMockRecorder) AddNodePortRoute(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNodePortRoute", reflect.TypeOf((*MockInterface)(nil).AddNodePortRoute), arg0) +} + // AddRoutes mocks base method func (m *MockInterface) AddRoutes(arg0 *net.IPNet, arg1, arg2 net.IP) error { m.ctrl.T.Helper() @@ -63,6 +92,20 @@ func (mr *MockInterfaceMockRecorder) AddRoutes(arg0, arg1, arg2 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRoutes", reflect.TypeOf((*MockInterface)(nil).AddRoutes), arg0, arg1, arg2) } +// DeleteNodePort mocks base method +func (m *MockInterface) DeleteNodePort(arg0 []net.IP, arg1 *types.ServiceInfo, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteNodePort", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteNodePort indicates an expected call of DeleteNodePort +func (mr *MockInterfaceMockRecorder) DeleteNodePort(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNodePort", reflect.TypeOf((*MockInterface)(nil).DeleteNodePort), arg0, arg1, arg2) +} + // DeleteRoutes mocks base method func (m *MockInterface) DeleteRoutes(arg0 *net.IPNet) error { m.ctrl.T.Helper() @@ -119,6 +162,20 @@ func (mr *MockInterfaceMockRecorder) Reconcile(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockInterface)(nil).Reconcile), arg0) } +// ReconcileNodePort mocks base method +func (m *MockInterface) ReconcileNodePort(arg0 []net.IP, arg1 []*types.ServiceInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReconcileNodePort", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReconcileNodePort indicates an expected call of ReconcileNodePort +func (mr *MockInterfaceMockRecorder) ReconcileNodePort(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReconcileNodePort", reflect.TypeOf((*MockInterface)(nil).ReconcileNodePort), arg0, arg1) +} + // Run mocks base method func (m *MockInterface) Run(arg0 <-chan struct{}) { m.ctrl.T.Helper() diff --git a/pkg/agent/util/ipset/ipset.go b/pkg/agent/util/ipset/ipset.go index 4bfd933f43a..6e02a3214ec 100644 --- a/pkg/agent/util/ipset/ipset.go +++ b/pkg/agent/util/ipset/ipset.go @@ -28,6 +28,8 @@ const ( // The lookup time grows linearly with the number of the different prefix values added to the set. HashNet SetType = "hash:net" HashIP SetType = "hash:ip" + // The hash:ip,port set type uses a hash to store IP address and protocol-port pairs in it. + HashIPPort SetType = "hash:ip,port" ) // memberPattern is used to match the members part of ipset list result. diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 1b70345eb82..ef9eb8ba37b 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -38,6 +38,7 @@ const ( MasqueradeTarget = "MASQUERADE" MarkTarget = "MARK" ConnTrackTarget = "CT" + DNATTarget = "DNAT" NoTrackTarget = "NOTRACK" PreRoutingChain = "PREROUTING" @@ -122,8 +123,9 @@ func (c *Client) ChainExists(table string, chain string) (bool, error) { return false, nil } -// EnsureRule checks if target rule already exists, appends it if not. -func (c *Client) EnsureRule(table string, chain string, ruleSpec []string) error { +// EnsureRule checks if target rule already exists, add it if not. If prepend is true, the rule will be added to the top +// of the chain. Otherwise, the rule will be appended to the chain. +func (c *Client) EnsureRule(table string, chain string, ruleSpec []string, prepend bool) error { for idx := range c.ipts { ipt := c.ipts[idx] exist, err := ipt.Exists(table, chain, ruleSpec...) @@ -133,7 +135,13 @@ func (c *Client) EnsureRule(table string, chain string, ruleSpec []string) error if exist { return nil } - if err := ipt.Append(table, chain, ruleSpec...); err != nil { + var f func() error + if !prepend { + f = func() error { return ipt.Append(table, chain, ruleSpec...) } + } else { + f = func() error { return ipt.Insert(table, chain, 1, ruleSpec...) } + } + if err := f(); err != nil { return fmt.Errorf("error appending rule %v to table %s chain %s: %v", ruleSpec, table, chain, err) } } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index a8e4aca2370..8ed516dc839 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -45,6 +45,10 @@ const ( // Service traffic. AntreaProxy featuregate.Feature = "AntreaProxy" + // alpha: v0.14 + // Enable NodePort Service support in AntreaProxy in antrea-agent. + AntreaProxyNodePort featuregate.Feature = "AntreaProxyNodePort" + // alpha: v0.8 // beta: v0.11 // Allows to trace path from a generated packet. @@ -75,13 +79,14 @@ var ( // To add a new feature, define a key for it above and add it here. The features will be // available throughout Antrea binaries. defaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - AntreaPolicy: {Default: false, PreRelease: featuregate.Alpha}, - AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, - EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, - Traceflow: {Default: true, PreRelease: featuregate.Beta}, - FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, - NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, - NodePortLocal: {Default: false, PreRelease: featuregate.Alpha}, + AntreaPolicy: {Default: false, PreRelease: featuregate.Alpha}, + AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, + EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, + AntreaProxyNodePort: {Default: false, PreRelease: featuregate.Alpha}, + Traceflow: {Default: true, PreRelease: featuregate.Beta}, + FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, + NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, + NodePortLocal: {Default: false, PreRelease: featuregate.Alpha}, } // UnsupportedFeaturesOnWindows records the features not supported on @@ -95,7 +100,8 @@ var ( // can have different FeatureSpecs between Linux and Windows, we should // still define a separate defaultAntreaFeatureGates map for Windows. unsupportedFeaturesOnWindows = map[featuregate.Feature]struct{}{ - NodePortLocal: {}, + AntreaProxyNodePort: {}, + NodePortLocal: {}, } ) diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index 612dc4c140d..8121c9cce5a 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -24,11 +24,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" "github.com/vmware-tanzu/antrea/pkg/agent/apiserver/handlers/podinterface" "github.com/vmware-tanzu/antrea/pkg/agent/config" "github.com/vmware-tanzu/antrea/pkg/agent/openflow/cookie" + "github.com/vmware-tanzu/antrea/pkg/features" ) // TestDeploy is a "no-op" test that simply performs setup and teardown. @@ -351,7 +353,11 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo continue } route := Route{} - if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { + m1 := matches[1] + if !strings.Contains(m1, "/") { + m1 = m1 + "/32" + } + if _, route.peerPodCIDR, err = net.ParseCIDR(m1); err != nil { return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) } if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { @@ -369,6 +375,12 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo } else if encapMode == config.TrafficEncapModeHybrid { expectedRtNumMin = 1 } + agentFeatures, err := data.GetAgentFeatures(antreaNamespace) + require.NoError(t, err) + if agentFeatures.Enabled(features.AntreaProxy) && agentFeatures.Enabled(features.AntreaProxyNodePort) { + expectedRtNumMin += 1 + expectedRtNumMax += 1 + } t.Logf("Retrieving gateway routes on Node '%s'", nodeName) var routes []Route diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 2e0e6b2845e..1f0071990d2 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1233,6 +1233,11 @@ func (data *TestData) createNginxClusterIPService(name string, affinity bool, ip return data.createService(name, 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeClusterIP, ipFamily) } +// createNginxNodePortService create a NodePort nginx service with the given name. +func (data *TestData) createNginxNodePortService(affinity bool, ipFamily *corev1.IPFamily) (*corev1.Service, error) { + return data.createService("nginx", 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeNodePort, ipFamily) +} + func (data *TestData) createNginxLoadBalancerService(affinity bool, ingressIPs []string, ipFamily *corev1.IPFamily) (*corev1.Service, error) { svc, err := data.createService(nginxLBService, 80, 80, map[string]string{"app": "nginx"}, affinity, corev1.ServiceTypeLoadBalancer, ipFamily) if err != nil { diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index ef9e31968c6..3eff6dcb5c3 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -42,6 +42,46 @@ func skipIfProxyDisabled(t *testing.T, data *TestData) { } } +func TestProxyNodePortService(t *testing.T) { + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + skipIfProxyDisabled(t, data) + skipIfNumNodesLessThan(t, 2) + + nodeName := nodeName(1) + require.NoError(t, data.createNginxPod("nginx", nodeName)) + _, err = data.podWaitForIPs(defaultTimeout, "nginx", testNamespace) + require.NoError(t, err) + require.NoError(t, data.podWaitForRunning(defaultTimeout, "nginx", testNamespace)) + ipProctol := corev1.IPv4Protocol + svc, err := data.createNginxNodePortService(true, &ipProctol) + require.NoError(t, err) + require.NoError(t, data.createBusyboxPodOnNode("busybox", nodeName)) + require.NoError(t, data.podWaitForRunning(defaultTimeout, "busybox", testNamespace)) + var nodePort string + for _, port := range svc.Spec.Ports { + if port.NodePort != 0 { + nodePort = fmt.Sprint(port.NodePort) + break + } + } + busyboxPod, err := data.podWaitFor(defaultTimeout, "busybox", testNamespace, func(pod *corev1.Pod) (bool, error) { + return pod.Status.Phase == corev1.PodRunning, nil + }) + require.NoError(t, err) + require.NotNil(t, busyboxPod.Status) + _, _, err = data.runCommandFromPod(testNamespace, "busybox", busyboxContainerName, []string{"wget", "-O", "-", net.JoinHostPort(busyboxPod.Status.HostIP, nodePort), "-T", "1"}) + require.NoError(t, err, "Service NodePort should be able to be connected from Pod") + _, _, _, err = RunCommandOnNode(controlPlaneNodeName(), strings.Join([]string{"wget", "-O", "-", net.JoinHostPort(busyboxPod.Status.HostIP, nodePort), "-T", "1"}, " ")) + require.NoError(t, err, "Service NodePort should be able to be connected from Node IP address on Node which does not have Endpoint") + _, _, _, err = RunCommandOnNode(controlPlaneNodeName(), strings.Join([]string{"wget", "-O", "-", net.JoinHostPort("127.0.0.1", nodePort), "-T", "1"}, " ")) + require.NoError(t, err, "Service NodePort should be able to be connected from loopback address on Node which does not have Endpoint") +} + func TestProxyServiceSessionAffinity(t *testing.T) { skipIfProviderIs(t, "kind", "#881 Does not work in Kind, needs to be investigated.") data, err := setupTest(t) @@ -192,19 +232,41 @@ func testProxyEndpointLifeCycle(ipFamily *corev1.IPFamily, data *TestData, t *te keywords := make(map[int]string) keywords[42] = fmt.Sprintf("nat(dst=%s)", net.JoinHostPort(nginxIP, "80")) // endpointNATTable + var groupKeywords []string + if *ipFamily == corev1.IPv6Protocol { + groupKeywords = append(groupKeywords, fmt.Sprintf("set_field:0x%s->xxreg3", strings.TrimPrefix(hex.EncodeToString(*nginxIPs.ipv6), "0"))) + } else { + groupKeywords = append(groupKeywords, fmt.Sprintf("0x%s->NXM_NX_REG3[]", strings.TrimPrefix(hex.EncodeToString(nginxIPs.ipv4.To4()), "0"))) + } + for tableID, keyword := range keywords { tableOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%d", tableID)}) require.NoError(t, err) require.Contains(t, tableOutput, keyword) } + groupOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName}) + require.NoError(t, err) + for _, k := range groupKeywords { + require.Contains(t, groupOutput, k) + } + require.NoError(t, data.deletePodAndWait(defaultTimeout, nginx)) + // Wait for one second to make sure the pipeline to be updated. + time.Sleep(time.Second) + for tableID, keyword := range keywords { tableOutput, _, err := data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-flows", defaultBridgeName, fmt.Sprintf("table=%d", tableID)}) require.NoError(t, err) require.NotContains(t, tableOutput, keyword) } + + groupOutput, _, err = data.runCommandFromPod(metav1.NamespaceSystem, agentName, "antrea-agent", []string{"ovs-ofctl", "dump-groups", defaultBridgeName}) + require.NoError(t, err) + for _, k := range groupKeywords { + require.NotContains(t, groupOutput, k) + } } func TestProxyServiceLifeCycle(t *testing.T) { diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index e5c1e890ab3..c92e651a8b0 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -108,7 +108,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -135,7 +135,7 @@ func TestConnectivityFlows(t *testing.T) { } func TestReplayFlowsConnectivityFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -162,7 +162,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { } func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -335,7 +335,7 @@ func TestNetworkPolicyFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -445,7 +445,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { // Initialize ovs metrics (Prometheus) to test them metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, nil, nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -476,7 +476,7 @@ type svcConfig struct { } func TestProxyServiceFlows(t *testing.T) { - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, net.ParseIP("169.254.169.110"), nil, true, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) diff --git a/test/integration/agent/route_test.go b/test/integration/agent/route_test.go index 327d6d9680b..4b0f0e9df34 100644 --- a/test/integration/agent/route_test.go +++ b/test/integration/agent/route_test.go @@ -137,7 +137,7 @@ func TestInitialize(t *testing.T) { for _, tc := range tcs { t.Logf("Running Initialize test with mode %s node config %s", tc.networkConfig.TrafficEncapMode, nodeConfig) - routeClient, err := route.NewClient(serviceCIDR, tc.networkConfig, tc.noSNAT) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, tc.networkConfig, tc.noSNAT, false) assert.NoError(t, err) var xtablesReleasedTime, initializedTime time.Time @@ -241,7 +241,7 @@ func TestIpTablesSync(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + routeClient, err := route.NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false) assert.Nil(t, err) inited := make(chan struct{}) @@ -304,7 +304,7 @@ func TestAddAndDeleteRoutes(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false, false) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -400,7 +400,7 @@ func TestReconcile(t *testing.T) { for _, tc := range tcs { t.Logf("Running test with mode %s added routes %v desired routes %v", tc.mode, tc.addedRoutes, tc.desiredPeerCIDRs) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false, false) assert.NoError(t, err) err = routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) @@ -438,9 +438,9 @@ func TestRouteTablePolicyOnly(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly}, false) + routeClient, err := route.NewClient(net.ParseIP("169.254.169.110"), nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeNetworkPolicyOnly}, false, false) assert.NoError(t, err) - err = routeClient.Initialize(nodeConfig, func() {}) + routeClient.Initialize(nodeConfig, func() {}) assert.NoError(t, err) // Verify gw IP gwName := nodeConfig.GatewayConfig.Name @@ -497,7 +497,7 @@ func TestIPv6RoutesAndNeighbors(t *testing.T) { gwLink := createDummyGW(t) defer netlink.LinkDel(gwLink) - routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false) + routeClient, err := route.NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, false, false) assert.Nil(t, err) _, ipv6Subnet, _ := net.ParseCIDR("fd74:ca9b:172:19::/64") gwIPv6 := net.ParseIP("fd74:ca9b:172:19::1")