From 0ea9daf078c52d4fc6f7cf5a347c58b215e87415 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Sun, 7 Apr 2024 19:21:51 +0800 Subject: [PATCH] Fix that AntreaProxy could unintentionally delete conntrack entries in zone 0 This is a subsequent PR for #5112. As mentioned in #5112: > Due to the restriction of the go library 'netlink', there is no API to specify a target zone. As a result, when deleting a stale conntrack entry with a destination port (such as NodePort), not only will the conntrack entry whose destination port is the port added by AntreaProxy be deleted, but also the conntrack entry that is not added by AntreaProxy will be deleted. This behavior is unexpected, as only the conntrack entries added by AntreaProxy should be deleted. This PR resolves the issue by integrating a CT zone filter, now available in the latest Go library `netlink`. Leveraging this feature, AntreaProxy can accurately delete stale UDP conntrack entries. Signed-off-by: Hongliang Liu --- go.mod | 2 +- go.sum | 5 +- pkg/agent/route/route_linux.go | 27 +++++--- pkg/agent/route/route_linux_test.go | 85 ++++++++++++++++++++++-- test/integration/agent/cniserver_test.go | 3 +- 5 files changed, 102 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index 867aa6fe97d..1dd70a1e769 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 github.com/ti-mo/conntrack v0.5.1 - github.com/vishvananda/netlink v1.2.1-beta.2 + github.com/vishvananda/netlink v1.2.1-beta.2.0.20240523162130-1e68b2710dc3 github.com/vmware/go-ipfix v0.9.0 go.uber.org/mock v0.4.0 golang.org/x/crypto v0.23.0 diff --git a/go.sum b/go.sum index 7145b7b165b..1177647e37a 100644 --- a/go.sum +++ b/go.sum @@ -718,8 +718,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= -github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= -github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240523162130-1e68b2710dc3 h1:ZmyAuEEjihUtb1eFIcXJyXqwIj15S0e2YtkEGb2EcVw= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240523162130-1e68b2710dc3/go.mod h1:whJevzBpTrid75eZy99s3DqCmy05NfibNaF2Ol5Ox5A= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= @@ -933,6 +933,7 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 31890415aa5..4eb8a7fe22b 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -33,6 +33,7 @@ import ( utilnet "k8s.io/utils/net" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/servicecidr" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util/ipset" @@ -278,7 +279,7 @@ func (c *Client) syncRoute() error { routeKeys := sets.New[routeKey]() for i := range routeList { r := &routeList[i] - if r.Dst == nil { + if r.Dst == nil || r.Dst.IP.IsUnspecified() { continue } routeKeys.Insert(routeKey{ @@ -1918,29 +1919,37 @@ func (c *Client) DeleteRouteForLink(cidr *net.IPNet, linkIndex int) error { func (c *Client) ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, endpointIP net.IP, protocol binding.Protocol) error { var protoVar uint8 - var ipFamily netlink.InetFamily + var ipFamilyVar int + var zone uint16 switch protocol { case binding.ProtocolTCP: - ipFamily = unix.AF_INET + ipFamilyVar = unix.AF_INET protoVar = unix.IPPROTO_TCP + zone = openflow.CtZone case binding.ProtocolTCPv6: - ipFamily = unix.AF_INET6 + ipFamilyVar = unix.AF_INET6 protoVar = unix.IPPROTO_TCP + zone = openflow.CtZoneV6 case binding.ProtocolUDP: - ipFamily = unix.AF_INET + ipFamilyVar = unix.AF_INET protoVar = unix.IPPROTO_UDP + zone = openflow.CtZone case binding.ProtocolUDPv6: - ipFamily = unix.AF_INET6 + ipFamilyVar = unix.AF_INET6 protoVar = unix.IPPROTO_UDP + zone = openflow.CtZoneV6 case binding.ProtocolSCTP: - ipFamily = unix.AF_INET + ipFamilyVar = unix.AF_INET protoVar = unix.IPPROTO_SCTP + zone = openflow.CtZone case binding.ProtocolSCTPv6: - ipFamily = unix.AF_INET6 + ipFamilyVar = unix.AF_INET6 protoVar = unix.IPPROTO_SCTP + zone = openflow.CtZoneV6 } filter := &netlink.ConntrackFilter{} filter.AddProtocol(protoVar) + filter.AddZone(zone) if svcIP != nil { filter.AddIP(netlink.ConntrackOrigDstIP, svcIP) } @@ -1950,7 +1959,7 @@ func (c *Client) ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, end if endpointIP != nil { filter.AddIP(netlink.ConntrackReplySrcIP, endpointIP) } - _, err := c.netlink.ConntrackDeleteFilter(netlink.ConntrackTable, ipFamily, filter) + _, err := c.netlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(ipFamilyVar), filter) return err } diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 631129e426f..5104f81c587 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -23,9 +23,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/vishvananda/netlink" "go.uber.org/mock/gomock" + "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/sets" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util/ipset" @@ -33,7 +35,7 @@ import ( "antrea.io/antrea/pkg/agent/util/iptables" iptablestest "antrea.io/antrea/pkg/agent/util/iptables/testing" netlinktest "antrea.io/antrea/pkg/agent/util/netlink/testing" - "antrea.io/antrea/pkg/ovs/openflow" + binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/ip" ) @@ -1318,7 +1320,7 @@ func TestAddNodePort(t *testing.T) { name string nodePortAddresses []net.IP port uint16 - protocol openflow.Protocol + protocol binding.Protocol expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) }{ { @@ -1328,7 +1330,7 @@ func TestAddNodePort(t *testing.T) { net.ParseIP("1.1.2.2"), }, port: 30000, - protocol: openflow.ProtocolTCP, + protocol: binding.ProtocolTCP, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.AddEntry(antreaNodePortIPSet, "1.1.1.1,tcp:30000") ipset.AddEntry(antreaNodePortIPSet, "1.1.2.2,tcp:30000") @@ -1341,7 +1343,7 @@ func TestAddNodePort(t *testing.T) { net.ParseIP("fd00:1234:5678:dead:beaf::2"), }, port: 30001, - protocol: openflow.ProtocolUDPv6, + protocol: binding.ProtocolUDPv6, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.AddEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::1,udp:30001") ipset.AddEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::2,udp:30001") @@ -1364,7 +1366,7 @@ func TestDeleteNodePort(t *testing.T) { name string nodePortAddresses []net.IP port uint16 - protocol openflow.Protocol + protocol binding.Protocol expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) }{ { @@ -1374,7 +1376,7 @@ func TestDeleteNodePort(t *testing.T) { net.ParseIP("1.1.2.2"), }, port: 30000, - protocol: openflow.ProtocolTCP, + protocol: binding.ProtocolTCP, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.DelEntry(antreaNodePortIPSet, "1.1.1.1,tcp:30000") ipset.DelEntry(antreaNodePortIPSet, "1.1.2.2,tcp:30000") @@ -1387,7 +1389,7 @@ func TestDeleteNodePort(t *testing.T) { net.ParseIP("fd00:1234:5678:dead:beaf::2"), }, port: 30001, - protocol: openflow.ProtocolUDPv6, + protocol: binding.ProtocolUDPv6, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.DelEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::1,udp:30001") ipset.DelEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::2,udp:30001") @@ -2143,3 +2145,72 @@ COMMIT }) } } + +func TestClearConntrackEntryForService(t *testing.T) { + testCases := []struct { + name string + svcIP net.IP + svcPort uint16 + endpointIP net.IP + protocol binding.Protocol + expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) + }{ + { + name: "TCPv4 with all parameters", + svcIP: net.ParseIP("192.168.1.1"), + svcPort: 80, + endpointIP: net.ParseIP("10.10.0.2"), + protocol: binding.ProtocolTCP, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(unix.IPPROTO_TCP) + filter.AddZone(openflow.CtZone) + filter.AddIP(netlink.ConntrackOrigDstIP, net.ParseIP("192.168.1.1")) + filter.AddPort(netlink.ConntrackOrigDstPort, 80) + filter.AddIP(netlink.ConntrackReplySrcIP, net.ParseIP("10.10.0.2")) + mockNetlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(unix.AF_INET), filter).Times(1) + }, + }, + { + name: "UDPv4 with svcIP and svcPort", + svcIP: net.ParseIP("192.168.1.1"), + svcPort: 53, + endpointIP: nil, + protocol: binding.ProtocolUDP, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(unix.IPPROTO_UDP) + filter.AddZone(openflow.CtZone) + filter.AddIP(netlink.ConntrackOrigDstIP, net.ParseIP("192.168.1.1")) + filter.AddPort(netlink.ConntrackOrigDstPort, 53) + mockNetlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(unix.AF_INET), filter).Times(1) + }, + }, + { + name: "SCTPv6 with endpointIP", + svcIP: nil, + svcPort: 0, + endpointIP: net.ParseIP("fec0::2"), + protocol: binding.ProtocolSCTPv6, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(unix.IPPROTO_SCTP) + filter.AddZone(openflow.CtZoneV6) + filter.AddIP(netlink.ConntrackReplySrcIP, net.ParseIP("fec0::2")) + mockNetlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(unix.AF_INET6), filter).Times(1) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockNetlink := netlinktest.NewMockInterface(ctrl) + c := &Client{ + netlink: mockNetlink, + } + tc.expectedCalls(mockNetlink.EXPECT()) + assert.NoError(t, c.ClearConntrackEntryForService(tc.svcIP, tc.svcPort, tc.endpointIP, tc.protocol)) + }) + } +} diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 290eb3cdbeb..5956b6ab343 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -336,7 +336,8 @@ func matchRoute(expectedCIDR string, routes []netlink.Route) (*netlink.Route, er return nil, err } for _, route := range routes { - if route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP) { + // For default route, `Dst` is 0.0.0.0/0 or ::/0, rather than nil. + if route.Dst != nil && route.Dst.IP.IsUnspecified() && route.Src == nil && route.Gw.Equal(gwIP) { return &route, nil } }