Skip to content

Commit

Permalink
Merge 204f0da into f10ef36
Browse files Browse the repository at this point in the history
  • Loading branch information
luolanzone authored Mar 30, 2023
2 parents f10ef36 + 204f0da commit fd1b482
Show file tree
Hide file tree
Showing 29 changed files with 3,117 additions and 263 deletions.
5 changes: 3 additions & 2 deletions ci/jenkins/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ function deliver_antrea_windows {
sleep 5
# Some tests need us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13 image but it is not for windows/amd64 10.0.17763
# Use e2eteam/agnhost:2.13 instead
harbor_images=("sigwindowstools-kube-proxy:v1.18.0" "agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2")
antrea_images=("sigwindowstools/kube-proxy:v1.18.0" "e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "k8s.gcr.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "e2eteam/pause:3.2")
harbor_images=("sigwindowstools-kube-proxy:v1.18.0" "agnhost:2.13" "agnhost:2.13" "agnhost:2.13" "agnhost:2.29" "e2eteam-jessie-dnsutils:1.0" "e2eteam-jessie-dnsutils:1.0" "e2eteam-pause:3.2" "e2eteam-pause:3.2" "e2eteam-busybox:1.29-windows-amd64-1809")
antrea_images=("sigwindowstools/kube-proxy:v1.18.0" "e2eteam/agnhost:2.13" "us.gcr.io/k8s-artifacts-prod/e2e-test-images/agnhost:2.13" "k8sprow.azurecr.io/kubernetes-e2e-test-images/agnhost:2.13" "k8s.gcr.io/e2e-test-images/agnhost:2.29" "e2eteam/jessie-dnsutils:1.0" "gcr.io/kubernetes-e2e-test-images/jessie-dnsutils:1.0" "e2eteam/pause:3.2" "k8s.gcr.io/pause:3.2" "docker.io/library/busybox:1.29")
common_images=("mcr.microsoft.com/windows/servercore/iis:latest")
# Pull necessary images in advance to avoid transient error
for i in "${!harbor_images[@]}"; do
ssh -o StrictHostKeyChecking=no -n Administrator@${IP} "docker pull -q ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} && docker tag ${DOCKER_REGISTRY}/antrea/${harbor_images[i]} ${antrea_images[i]}" || true
Expand Down
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func run(o *Options) error {
k8sClient,
crdClient,
ovsBridgeClient,
ovsctl.NewClient(o.config.OVSBridge),
ofClient,
routeClient,
ifaceStore,
Expand Down Expand Up @@ -267,6 +268,7 @@ func run(o *Options) error {
k8sClient,
informerFactory,
ofClient,
ovsctl.NewClient(o.config.OVSBridge),
ovsBridgeClient,
routeClient,
ifaceStore,
Expand Down
39 changes: 25 additions & 14 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ function generate_mocks {
"pkg/agent/querier AgentQuerier testing"
"pkg/agent/route Interface testing"
"pkg/agent/ipassigner IPAssigner testing"
"pkg/agent/util/ipset Interface testing"
"pkg/agent/util/iptables Interface testing mock_iptables_linux.go" # Must specify linux.go suffix, otherwise compilation would fail on windows platform as source file has linux build tag.
"pkg/agent/util/netlink Interface testing mock_netlink_linux.go"
"pkg/antctl AntctlClient ."
"pkg/controller/networkpolicy EndpointQuerier testing"
"pkg/controller/querier ControllerQuerier testing"
Expand All @@ -68,21 +71,29 @@ function generate_mocks {
current_year=$(date +"%Y")
sed -i "s/YEAR/${current_year}/g" hack/boilerplate/license_header.raw.txt
for target in "${MOCKGEN_TARGETS[@]}"; do
read -r package interfaces mock_package <<<"${target}"
package_name=$(basename "${package}")
if [[ "${mock_package}" == "." ]]; then # generate mocks in same package as src
$GOPATH/bin/mockgen \
-copyright_file hack/boilerplate/license_header.raw.txt \
-destination "${package}/mock_${package_name}_test.go" \
-package="${package_name}" \
"${ANTREA_PKG}/${package}" "${interfaces}"
else # generate mocks in subpackage
$GOPATH/bin/mockgen \
-copyright_file hack/boilerplate/license_header.raw.txt \
-destination "${package}/${mock_package}/mock_${package_name}.go" \
-package="${mock_package}" \
"${ANTREA_PKG}/${package}" "${interfaces}"
read -r src_package interfaces dst_package_name dst_file_name <<<"${target}"
src_package_name=$(basename "${src_package}")
# Generate mocks in the same package as src if dst_file_name is ".", otherwise create a sub package.
if [[ "${dst_package_name}" == "." ]]; then
package="${src_package_name}"
if [ -n "${dst_file_name}" ]; then
destination="${src_package}/${dst_file_name}"
else
destination="${src_package}/mock_${src_package_name}_test.go"
fi
else
package="${dst_package_name}"
if [ -n "${dst_file_name}" ]; then
destination="${src_package}/${dst_package_name}/${dst_file_name}"
else
destination="${src_package}/${dst_package_name}/mock_${src_package_name}.go"
fi
fi
$GOPATH/bin/mockgen \
-copyright_file hack/boilerplate/license_header.raw.txt \
-destination "${destination}" \
-package "${package}" \
"${ANTREA_PKG}/${src_package}" "${interfaces}"
done
git checkout HEAD -- hack/boilerplate/license_header.raw.txt
}
Expand Down
65 changes: 51 additions & 14 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Initializer struct {
client clientset.Interface
crdClient versioned.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ovsCtlClient ovsctl.OVSCtlClient
ofClient openflow.Client
routeClient route.Interface
wireGuardClient wireguard.Interface
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewInitializer(
k8sClient clientset.Interface,
crdClient versioned.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ovsCtlClient ovsctl.OVSCtlClient,
ofClient openflow.Client,
routeClient route.Interface,
ifaceStore interfacestore.InterfaceStore,
Expand All @@ -142,6 +144,7 @@ func NewInitializer(
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
Expand Down Expand Up @@ -279,7 +282,6 @@ func (i *Initializer) initInterfaceStore() error {
return intf
}
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
ovsCtlClient := ovsctl.NewClient(i.ovsBridge)
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
Expand All @@ -297,6 +299,8 @@ func (i *Initializer) initInterfaceStore() error {
case interfacestore.AntreaUplink:
intf = parseUplinkInterfaceFunc(port, ovsPort)
case interfacestore.AntreaTunnel:
fallthrough
case interfacestore.AntreaIPsecTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
case interfacestore.AntreaHost:
if port.Name == i.ovsBridge {
Expand All @@ -314,9 +318,6 @@ func (i *Initializer) initInterfaceStore() error {
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true)
case interfacestore.AntreaTrafficControl:
intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort)
if err := ovsCtlClient.SetPortNoFlood(int(ovsPort.OFPort)); err != nil {
klog.ErrorS(err, "Failed to set port with no-flood config", "PortName", port.Name)
}
default:
klog.InfoS("Unknown Antrea interface type", "type", interfaceType)
}
Expand All @@ -340,7 +341,11 @@ func (i *Initializer) initInterfaceStore() error {
fallthrough
case port.IFType == ovsconfig.STTTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
antreaIFType = interfacestore.AntreaTunnel
if intf.Type == interfacestore.IPSecTunnelInterface {
antreaIFType = interfacestore.AntreaIPsecTunnel
} else {
antreaIFType = interfacestore.AntreaTunnel
}
case port.Name == i.ovsBridge:
intf = nil
antreaIFType = interfacestore.AntreaHost
Expand Down Expand Up @@ -368,6 +373,26 @@ func (i *Initializer) initInterfaceStore() error {
return nil
}

func (i *Initializer) restorePortConfigs() error {
interfaces := i.ifaceStore.ListInterfaces()
for _, intf := range interfaces {
switch intf.Type {
case interfacestore.IPSecTunnelInterface:
fallthrough
case interfacestore.TrafficControlInterface:
if intf.OFPort < 0 {
klog.InfoS("Skipped setting no-flood for port due to invalid ofPort", "port", intf.InterfaceName, "ofport", intf.OFPort)
continue
}
if err := i.ovsCtlClient.SetPortNoFlood(int(intf.OFPort)); err != nil {
return fmt.Errorf("failed to set no-flood for port %s: %w", intf.InterfaceName, err)
}
klog.InfoS("Set no-flood for port", "port", intf.InterfaceName)
}
}
return nil
}

// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
Expand All @@ -386,6 +411,10 @@ func (i *Initializer) Initialize() error {
return err
}

if err := i.restorePortConfigs(); err != nil {
return err
}

// initializeWireGuard must be executed after setupOVSBridge as it requires gateway addresses on the OVS bridge.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeWireGuard {
if err := i.initializeWireGuard(); err != nil {
Expand Down Expand Up @@ -487,14 +516,15 @@ func persistRoundNum(num uint64, bridgeClient ovsconfig.OVSBridgeClient, interva

// initOpenFlowPipeline sets up necessary Openflow entries, including pipeline, classifiers, conn_track, and gateway flows
// Every time the agent is (re)started, we go through the following sequence:
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
//
// The rationale for not persisting the new round number until after all previous flows have been
// deleted is to avoid a situation in which some stale flows are never deleted because of successive
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
Expand Down Expand Up @@ -552,6 +582,13 @@ func (i *Initializer) initOpenFlowPipeline() error {
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")

klog.InfoS("Restoring OF port configs to OVS bridge")
if err := i.restorePortConfigs(); err != nil {
klog.ErrorS(err, "Failed to restore OF port configs")
} else {
klog.InfoS("Port configs restoration completed")
}

if i.ovsBridgeClient.GetOVSDatapathType() == ovsconfig.OVSDatapathNetdev {
// we don't set flow-restore-wait when using the OVS netdev datapath
return
Expand All @@ -561,7 +598,7 @@ func (i *Initializer) initOpenFlowPipeline() error {
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
err = wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/ip"
)
Expand Down Expand Up @@ -518,3 +519,78 @@ func mockConfigureLinkAddress(returnedErr error) func() {
configureLinkAddresses = originalConfigureLinkAddresses
}
}

func TestRestorePortConfigs(t *testing.T) {
ipsecTunnelInterface := interfacestore.NewIPSecTunnelInterface("antrea-ipsec1",
ovsconfig.GeneveTunnel,
"node1",
net.ParseIP("1.1.1.1"),
"abcdefg",
"node1")
ipsecTunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: 11, PortUUID: "uuid1"}
tunnelInterface := interfacestore.NewTunnelInterface(defaultTunInterfaceName,
ovsconfig.GeneveTunnel,
0,
net.ParseIP("1.1.1.10"),
true)
tunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: 12}
trafficControlInterface1 := interfacestore.NewTrafficControlInterface("antrea-tap1")
trafficControlInterface1.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: 13, PortUUID: "uuid3"}
trafficControlInterface2 := interfacestore.NewTrafficControlInterface("antrea-tap2")
trafficControlInterface2.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: -1, PortUUID: "uuid3"}

tests := []struct {
name string
existingInterfaces []*interfacestore.InterfaceConfig
expectedOVSCtlCalls func(client *ovsctltest.MockOVSCtlClientMockRecorder)
expectedErr string
}{
{
name: "success",
existingInterfaces: []*interfacestore.InterfaceConfig{
ipsecTunnelInterface,
tunnelInterface,
trafficControlInterface1,
trafficControlInterface2,
},
expectedOVSCtlCalls: func(client *ovsctltest.MockOVSCtlClientMockRecorder) {
client.SetPortNoFlood(11).Return(nil)
client.SetPortNoFlood(13).Return(nil)
},
},
{
name: "fail",
existingInterfaces: []*interfacestore.InterfaceConfig{
{
InterfaceName: "antrea-tap1",
Type: interfacestore.TrafficControlInterface,
OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: 10, PortUUID: "uuid3"},
},
},
expectedOVSCtlCalls: func(client *ovsctltest.MockOVSCtlClientMockRecorder) {
client.SetPortNoFlood(10).Return(fmt.Errorf("server unavailable"))
},
expectedErr: "failed to set no-flood for port antrea-tap1: server unavailable",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
controller := mock.NewController(t)
defer controller.Finish()
mockOVSCtlClient := ovsctltest.NewMockOVSCtlClient(controller)
ifaceStore := interfacestore.NewInterfaceStore()
initializer := &Initializer{
ifaceStore: ifaceStore,
ovsCtlClient: mockOVSCtlClient,
}
ifaceStore.Initialize(tt.existingInterfaces)
tt.expectedOVSCtlCalls(mockOVSCtlClient.EXPECT())
err := initializer.restorePortConfigs()
if tt.expectedErr == "" {
assert.NoError(t, err)
} else {
assert.ErrorContains(t, err, tt.expectedErr)
}
})
}
}
Loading

0 comments on commit fd1b482

Please sign in to comment.