Skip to content

Commit

Permalink
code polish
Browse files Browse the repository at this point in the history
  • Loading branch information
hongliangl committed Jun 16, 2021
1 parent 4a5645e commit 6bf06af
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
20 changes: 13 additions & 7 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ type proxier struct {
ofClient openflow.Client
routeClient route.Interface
nodePortIPMap map[int][]net.IP
virtualNodePortIP net.IP
hostGateWay string
isIPv6 bool
nodePortEnabled bool
Expand Down Expand Up @@ -409,19 +408,25 @@ func (p *proxier) installServices() {
continue
}
if p.nodePortEnabled && svcInfo.NodePort() > 0 {
err := p.ofClient.UninstallServiceFlows(p.virtualNodePortIP, uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol)
if err != nil {
klog.Errorf("Error when removing NodePort Service flows: %v", err)
continue
for _, nodeIPs := range p.nodePortIPMap {
for _, nodeIP := range nodeIPs {
err := p.ofClient.UninstallServiceFlows(nodeIP, uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol)
if err != nil {
klog.Errorf("Error when removing NodePort Service flows: %v", err)
continue
}
}
}
err = p.routeClient.DeleteNodePort(p.nodePortIPMap, pSvcInfo)

err := p.routeClient.DeleteNodePort(p.nodePortIPMap, pSvcInfo)
if err != nil {
klog.Errorf("Error when removing NodePort Service entries in IPSet: %v", err)
continue
}
}
}
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(svcInfo.StickyMaxAgeSeconds()), false); err != nil {
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol,
uint16(svcInfo.StickyMaxAgeSeconds()), false); err != nil {
klog.Errorf("Error when installing Service flows: %v", err)
continue
}
Expand Down Expand Up @@ -461,6 +466,7 @@ func (p *proxier) installServices() {
}

for _, nodeIPs := range p.nodePortIPMap {
fmt.Println(nodeIPs)
for _, nodeIP := range nodeIPs {
err := p.ofClient.InstallServiceFlows(nGroupID, nodeIP, uint16(svcInfo.NodePort()), svcInfo.OFProtocol,
uint16(svcInfo.StickyMaxAgeSeconds()), svcInfo.NodeLocalExternal())
Expand Down
25 changes: 5 additions & 20 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ import (
k8sproxy "antrea.io/antrea/third_party/proxy"
)

var virtualNodePortIP = net.ParseIP("169.254.169.110")

func makeNamespaceName(namespace, name string) apimachinerytypes.NamespacedName {
return apimachinerytypes.NamespacedName{Namespace: namespace, Name: name}
}
Expand Down Expand Up @@ -111,8 +109,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, isIPv
routeClient: routeClient,
serviceStringMap: map[string]k8sproxy.ServicePortName{},
isIPv6: isIPv6,
nodePortIPMap: map[int][]net.IP{2: {net.ParseIP("192.168.0.1")}},
virtualNodePortIP: virtualNodePortIP,
nodePortIPMap: map[int][]net.IP{2: {net.ParseIP("192.168.77.100")}},
nodePortEnabled: nodeportEnabled,
}
p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2)
Expand Down Expand Up @@ -226,34 +223,22 @@ func TestLoadbalancer(t *testing.T) {
fp.syncProxyRules()
}

func TestExternalNameToNodePort(t *testing.T) {
func TestNodePort(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFClient := ofmock.NewMockClient(ctrl)
mockRouteClient := routemock.NewMockInterface(ctrl)
fp := NewFakeProxier(mockRouteClient, mockOFClient, false, false)
fp := NewFakeProxier(mockRouteClient, mockOFClient, false, true)

svcIPv4 := net.ParseIP("10.20.30.41")
svcPort := 80
svcNodePortIPv4 := net.ParseIP("192.168.77.100")
svcNodePort := 31000
svcPortName := k8sproxy.ServicePortName{
NamespacedName: makeNamespaceName("ns1", "svc1"),
Port: "80",
Protocol: corev1.ProtocolTCP,
}
makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
svc.Spec.ClusterIP = svcIPv4.String() // Should be ignored.
svc.Spec.Type = corev1.ServiceTypeExternalName
svc.Spec.ExternalName = "a.b.c.com"
svc.Spec.Ports = []corev1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: corev1.ProtocolTCP,
}}
}),
)
fp.syncProxyRules()

makeServiceMap(fp,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) {
Expand Down Expand Up @@ -287,7 +272,7 @@ func TestExternalNameToNodePort(t *testing.T) {
bindingProtocol := binding.ProtocolTCP
mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), bindingProtocol, uint16(0), false).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(groupID, virtualNodePortIP, uint16(svcNodePort), bindingProtocol, uint16(0), false).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(groupID, svcNodePortIPv4, uint16(svcNodePort), bindingProtocol, uint16(0), false).Times(1)
mockRouteClient.EXPECT().AddNodePort(gomock.Any(), gomock.Any()).Times(1)

fp.syncProxyRules()
Expand Down
15 changes: 12 additions & 3 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,16 @@ func (c *Client) DeleteSNATRule(mark uint32) error {
}

func (c *Client) InitNodePort(nodePortIPMap map[int][]net.IP) error {
// Get the index of Antrea gateway interface.
gatewayIfIndex := getIndexByName(c.nodeConfig.GatewayConfig.Name)
// Clean left Linux TC config.
qdiscsClean(nodePortIPMap, gatewayIfIndex)

// Add qdisc to interfaces that have available NodePort IP addresses, not including Antrea gateway.
// Add a ingress qdisc to every interface which has available NodePort IP addresses. When a NodePort Service is
// created, for every interface which has available NodePort IP addresses, a filter matching destination IP address
// (NodePort IP address) and destination protocol/port(NodePort protocol/port) will be created and attached to
// the ingress qdisc, and its action is redirecting matched traffic to antrea gateway's egress. Note that, this
// filter is used to match NodePort request traffic.
for ifIndex := range nodePortIPMap {
qdisc := qdiscIngress
if ifIndex == loopbackIfIndex {
Expand All @@ -793,13 +798,17 @@ func (c *Client) InitNodePort(nodePortIPMap map[int][]net.IP) error {
}
}

// Add qdisc to Antrea gateway as its index is not in nodePortIPMap.
err := qdiscAdd(qdiscIngress, gatewayIfIndex)
if err != nil {
return err
}

// Add basic filter to Antrea gateway. These filters can distribute packets to different filter chains according to their
// source IPs.
// The design of filter for Antrea gateway is hierarchic. Here add basic filters to Antrea gateway. Note that, these
// filters are used to match NodePort response traffic. These filters are used to distribute packets to different sub filter
// chains according to their source IP Addresses(NodePort IP addresses). When a NodePort Service is created, a filter
// matching source IP address (NodePort IP address) and source protocol/port(NodePort protocol/port) will created and
// attached to every sub filter chain, and its action is redirecting matched traffic to an interface's egress.
for ifIndex, addrs := range nodePortIPMap {
err = gatewayBaseFilterAdd(ifIndex, addrs, c.nodeConfig.GatewayConfig.Name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/route/route_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRouteOperation(t *testing.T) {
nr := netroute.New()
defer nr.Exit()

client, err := NewClient(nil, nil, serviceCIDR, &config.NetworkConfig{}, false)
client, err := NewClient(serviceCIDR, &config.NetworkConfig{}, false)
require.Nil(t, err)
nodeConfig := &config.NodeConfig{
OVSBridge: "Loopback Pseudo-Interface 1",
Expand Down

0 comments on commit 6bf06af

Please sign in to comment.