Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add L7 Antrea native NetworkPolicy datapath support #4410

Merged
merged 1 commit into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func run(o *Options) error {

ovsDatapathType := ovsconfig.OVSDatapathType(o.config.OVSDatapathType)
ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, ovsDatapathType, ovsdbConnection)
ovsCtlClient := ovsctl.NewClient(o.config.OVSBridge)
ovsBridgeMgmtAddr := ofconfig.GetMgmtAddress(o.config.OVSRunDir, o.config.OVSBridge)
multicastEnabled := features.DefaultFeatureGate.Enabled(features.Multicast)
ofClient := openflow.NewClient(o.config.OVSBridge, ovsBridgeMgmtAddr,
Expand Down Expand Up @@ -236,6 +237,7 @@ func run(o *Options) error {
k8sClient,
crdClient,
ovsBridgeClient,
ovsCtlClient,
ofClient,
routeClient,
ifaceStore,
Expand Down Expand Up @@ -666,7 +668,7 @@ func run(o *Options) error {
tcController := trafficcontrol.NewTrafficControlController(ofClient,
ifaceStore,
ovsBridgeClient,
ovsctl.NewClient(o.config.OVSBridge),
ovsCtlClient,
trafficControlInformer,
localPodInformer,
namespaceInformer,
Expand Down
12 changes: 11 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Initializer struct {
client clientset.Interface
crdClient versioned.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ovsCtlClient ovsctl.OVSCtlClient
ofClient openflow.Client
routeClient route.Interface
wireGuardClient wireguard.Interface
Expand All @@ -107,6 +108,7 @@ type Initializer struct {
wireGuardConfig *config.WireGuardConfig
egressConfig *config.EgressConfig
serviceConfig *config.ServiceConfig
l7NetworkPolicyConfig *config.L7NetworkPolicyConfig
enableProxy bool
connectUplinkToBridge bool
// networkReadyCh should be closed once the Node's network is ready.
Expand All @@ -122,6 +124,7 @@ func NewInitializer(
k8sClient clientset.Interface,
crdClient versioned.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ovsCtlClient ovsctl.OVSCtlClient,
ofClient openflow.Client,
routeClient route.Interface,
ifaceStore interfacestore.InterfaceStore,
Expand All @@ -142,6 +145,7 @@ func NewInitializer(
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
Expand All @@ -154,6 +158,7 @@ func NewInitializer(
wireGuardConfig: wireGuardConfig,
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
networkReadyCh: networkReadyCh,
stopCh: stopCh,
nodeType: nodeType,
Expand Down Expand Up @@ -386,6 +391,11 @@ func (i *Initializer) Initialize() error {
return err
}

// prepareL7NetworkPolicyInterfaces must be executed after setupOVSBridge since it requires interfaceStore.
if err := i.prepareL7NetworkPolicyInterfaces(); err != nil {
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// initializeWireGuard must be executed after setupOVSBridge as it requires gateway addresses on the OVS bridge.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeWireGuard {
if err := i.initializeWireGuard(); err != nil {
Expand Down Expand Up @@ -511,7 +521,7 @@ func (i *Initializer) initOpenFlowPipeline() error {
roundInfo := getRoundInfo(i.ovsBridgeClient)

// Set up all basic flows.
ofConnCh, err := i.ofClient.Initialize(roundInfo, i.nodeConfig, i.networkConfig, i.egressConfig, i.serviceConfig)
ofConnCh, err := i.ofClient.Initialize(roundInfo, i.nodeConfig, i.networkConfig, i.egressConfig, i.serviceConfig, i.l7NetworkPolicyConfig)
if err != nil {
klog.Errorf("Failed to initialize openflow client: %v", err)
return err
Expand Down
55 changes: 55 additions & 0 deletions pkg/agent/agent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,58 @@ func (i *Initializer) prepareOVSBridgeForVM() error {
func (i *Initializer) installVMInitialFlows() error {
return nil
}

// prepareL7NetworkPolicyInterfaces creates two OVS internal ports. An application-aware engine will connect to OVS
// through these two ports.
func (i *Initializer) prepareL7NetworkPolicyInterfaces() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a

trafficControlPortExternalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTrafficControl,
}

for _, portName := range []string{config.L7NetworkPolicyTargetPortName, config.L7NetworkPolicyReturnPortName} {
_, exists := i.ifaceStore.GetInterface(portName)
if exists {
continue
}

portUUID, err := i.ovsBridgeClient.CreateInternalPort(portName, 0, "", trafficControlPortExternalIDs)
if err != nil {
return err
}
if pollErr := wait.PollImmediate(time.Second, 5*time.Second, func() (bool, error) {
_, _, err := util.SetLinkUp(portName)
if err == nil {
return true, nil
}
if _, ok := err.(util.LinkNotFound); ok {
return false, nil
}
return false, err
}); pollErr != nil {
return pollErr
}

ofPort, err := i.ovsBridgeClient.GetOFPort(portName, false)
if err != nil {
return err
}

itf := interfacestore.NewTrafficControlInterface(portName)
itf.OVSPortConfig = &interfacestore.OVSPortConfig{PortUUID: portUUID, OFPort: ofPort}
i.ifaceStore.AddInterface(itf)
}

targetPort, _ := i.ifaceStore.GetInterfaceByName(config.L7NetworkPolicyTargetPortName)
returnPort, _ := i.ifaceStore.GetInterfaceByName(config.L7NetworkPolicyReturnPortName)
i.l7NetworkPolicyConfig.TargetOFPort = uint32(targetPort.OFPort)
i.l7NetworkPolicyConfig.ReturnOFPort = uint32(returnPort.OFPort)
// Set the ports with no-flood to reject ARP flood packets.
if err := i.ovsCtlClient.SetPortNoFlood(int(targetPort.OFPort)); err != nil {
return fmt.Errorf("failed to set port %s with no-flood config: %w", config.L7NetworkPolicyTargetPortName, err)
}
if err := i.ovsCtlClient.SetPortNoFlood(int(returnPort.OFPort)); err != nil {
return fmt.Errorf("failed to set port %s with no-flood config: %w", config.L7NetworkPolicyReturnPortName, err)
}

return nil
}
4 changes: 4 additions & 0 deletions pkg/agent/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,7 @@ func (i *Initializer) installVMInitialFlows() error {
}
return nil
}

func (i *Initializer) prepareL7NetworkPolicyInterfaces() error {
return nil
}
11 changes: 11 additions & 0 deletions pkg/agent/config/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (
IPv6ExtraOverhead = 20
)

const (
L7NetworkPolicyTargetPortName = "antrea-l7-tap0"
L7NetworkPolicyReturnPortName = "antrea-l7-tap1"
)

var (
// VirtualServiceIPv4 or VirtualServiceIPv6 is used in the following scenarios:
// - The IP is used to perform SNAT for packets of Service sourced from Antrea gateway and destined for external
Expand Down Expand Up @@ -233,3 +238,9 @@ type ServiceConfig struct {
NodePortAddressesIPv4 []net.IP
NodePortAddressesIPv6 []net.IP
}

// L7NetworkPolicyConfig includes target and return ofPorts for L7 NetworkPolicy.
type L7NetworkPolicyConfig struct {
TargetOFPort uint32 // Matched L7 NetworkPolicy traffic is forwarded to an application-aware engine via this ofPort.
ReturnOFPort uint32 // Scanned L7 NetworkPolicy traffic is returned from an application-aware engine via this ofPort.
}
60 changes: 60 additions & 0 deletions pkg/agent/controller/networkpolicy/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,63 @@ func (a *idAllocator) release(id uint32) error {
a.availableSlice = append(a.availableSlice, id)
return nil
}

// l7VlanIDAllocator provides interfaces to allocate and release VLAN IDs for L7 rules. It also caches the mapping of
// rule IDs to released VLAN IDs and provides an interface for L7 rule to query its allocated VLAN ID.
type l7VlanIDAllocator struct {
sync.RWMutex

idCounter uint32
recycled []uint32
ruleIDToVlanID map[string]uint32
}

func newL7VlanIDAllocator() *l7VlanIDAllocator {
return &l7VlanIDAllocator{
ruleIDToVlanID: make(map[string]uint32),
}
}

func (l *l7VlanIDAllocator) allocate(ruleID string) uint32 {
l.Lock()
defer l.Unlock()

if vlanID, ok := l.ruleIDToVlanID[ruleID]; ok {
return vlanID
}

var vlanID uint32
if len(l.recycled) != 0 {
vlanID = l.recycled[len(l.recycled)-1]
l.recycled = l.recycled[:len(l.recycled)-1]
} else {
l.idCounter += 1
vlanID = l.idCounter
}
l.ruleIDToVlanID[ruleID] = vlanID
return vlanID
}

func (l *l7VlanIDAllocator) release(ruleID string) {
l.Lock()
defer l.Unlock()

vlanID, ok := l.ruleIDToVlanID[ruleID]
if !ok {
return
}

l.recycled = append(l.recycled, vlanID)
delete(l.ruleIDToVlanID, ruleID)
}

func (l *l7VlanIDAllocator) query(ruleID string) uint32 {
l.RLock()
defer l.RUnlock()

vlanID, ok := l.ruleIDToVlanID[ruleID]
if ok {
return vlanID
}
return 0
}
28 changes: 28 additions & 0 deletions pkg/agent/controller/networkpolicy/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,31 @@ func TestIdAllocatorWorker(t *testing.T) {
})
}
}

func TestVlanIDAllocator(t *testing.T) {
vlanIDAllocator := newL7VlanIDAllocator()
ruleID1 := "rule1"
ruleID2 := "rule2"
ruleID3 := "rule3"
ruleID4 := "rule4"

vlanID1 := vlanIDAllocator.allocate(ruleID1)
assert.Equal(t, vlanID1, vlanIDAllocator.query(ruleID1))

vlanID2 := vlanIDAllocator.allocate(ruleID2)
assert.Equal(t, vlanID2, vlanIDAllocator.query(ruleID2))

vlanIDAllocator.release(ruleID1)
assert.Equal(t, uint32(0), vlanIDAllocator.query(ruleID1))

vlanIDAllocator.release(ruleID2)
assert.Equal(t, uint32(0), vlanIDAllocator.query(ruleID2))

vlanID3 := vlanIDAllocator.allocate(ruleID3)
assert.Equal(t, vlanID3, vlanIDAllocator.query(ruleID3))
assert.Equal(t, vlanID2, vlanID3)

vlanID4 := vlanIDAllocator.allocate(ruleID4)
assert.Equal(t, vlanID4, vlanIDAllocator.query(ruleID4))
assert.Equal(t, vlanID1, vlanID4)
}
5 changes: 4 additions & 1 deletion pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type rule struct {
// Protocols and Ports of this rule.
Services []v1beta.Service
// Layer 7 protocols of this rule.
L7Protocols []v1beta.Protocol
L7Protocols []v1beta.L7Protocol
// Name of this rule. Empty for k8s NetworkPolicy.
Name string
// Action of this rule. nil for k8s NetworkPolicy.
Expand Down Expand Up @@ -149,6 +149,8 @@ type CompletedRule struct {
ToAddresses v1beta.GroupMemberSet
// Target GroupMembers of this rule.
TargetMembers v1beta.GroupMemberSet
// Vlan ID allocated for this rule if this rule is for L7 NetworkPolicy.
L7RuleVlanID *uint32
}

// String returns the string representation of the CompletedRule.
Expand Down Expand Up @@ -676,6 +678,7 @@ func toRule(r *v1beta.NetworkPolicyRule, policy *v1beta.NetworkPolicy, maxPriori
From: r.From,
To: r.To,
Services: r.Services,
L7Protocols: r.L7Protocols,
Action: r.Action,
Priority: r.Priority,
PolicyPriority: policy.Priority,
Expand Down
Loading