Skip to content

Commit

Permalink
Flexible pipeline 7 Misc
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl committed Dec 7, 2021
1 parent e6073d8 commit a748070
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 68 deletions.
1 change: 0 additions & 1 deletion pkg/agent/apiserver/handlers/ovsflows/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func getPodFlows(aq agentquerier.AgentQuerier, podName, namespace string) ([]Res

flowKeys := aq.GetOpenflowClient().GetPodFlowKeys(interfaces[0].InterfaceName)
return dumpMatchedFlows(aq, flowKeys)

}

func getServiceFlows(aq agentquerier.AgentQuerier, serviceName, namespace string) ([]Response, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/apiserver/handlers/ovsflows/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (

"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/openflow"
oftest "antrea.io/antrea/pkg/agent/openflow/testing"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
agentquerier "antrea.io/antrea/pkg/agent/querier"
aqtest "antrea.io/antrea/pkg/agent/querier/testing"
cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing"
"antrea.io/antrea/pkg/querier"
queriertest "antrea.io/antrea/pkg/querier/testing"
Expand Down Expand Up @@ -131,6 +133,11 @@ func TestServiceFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// Create openflow.Client to ensure the OVS tables are added into the cache.
bridgeName := "testbr"
bridgeMgmtAddr := binding.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeName)
openflow.NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false)

testcases := []testCase{
{
test: "Existing Service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
ofclient "antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
agenttypes "antrea.io/antrea/pkg/agent/types"
cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
Expand Down Expand Up @@ -83,7 +82,7 @@ var (
Priority: nil,
Name: "",
FlowID: uint32(0),
TableID: ofclient.IngressRuleTable.GetID(),
TableID: uint8(10),
PolicyRef: &np1,
EnableLogging: false,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (exp *FlowExporter) findFlowType(conn flowexporter.Connection) uint8 {
return 0
}
if exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.SourceAddress) {
if conn.Mark == openflow.ServiceCTMark.GetValue() || exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.DestinationAddress) {
if conn.Mark&openflow.ServiceCTMark.GetRange().ToNXRange().ToUint32Mask() == openflow.ServiceCTMark.GetValue() || exp.nodeRouteController.IPInPodSubnets(conn.FlowKey.DestinationAddress) {
if conn.SourcePodName == "" || conn.DestinationPodName == "" {
return ipfixregistry.FlowTypeInterNode
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/openflow/client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func (c *client) InstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPor
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol))
flows = append(flows, c.featureService.loadBalancerServiceFromOutsideFlow(svcIP, svcPort, protocol))
cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort)
return c.addFlows(c.serviceFlowCache, cacheKey, flows)
return c.addFlows(c.featureService.serviceFlowCache, cacheKey, flows)
}

func (c *client) UninstallLoadBalancerServiceFromOutsideFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
cacheKey := fmt.Sprintf("L%s%s%x", svcIP, protocol, svcPort)
return c.deleteFlows(c.serviceFlowCache, cacheKey)
return c.deleteFlows(c.featureService.serviceFlowCache, cacheKey)
}
6 changes: 3 additions & 3 deletions pkg/agent/openflow/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
package openflow

import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"fmt"
"k8s.io/client-go/tools/cache"
"net"
"strconv"
"strings"
"sync"

"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
Expand Down
5 changes: 0 additions & 5 deletions pkg/agent/openflow/pipeline_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ import (
binding "antrea.io/antrea/pkg/ovs/openflow"
)

// TODO: refactor
func (c *client) snatMarkFlows(snatIP net.IP, mark uint32) []binding.Flow {
return []binding.Flow{c.snatIPFromTunnelFlow(snatIP, mark)}
}

// Feature: PodConnectivity
// Stage: ClassifierStage
// Tables: ClassifierTable
Expand Down
53 changes: 0 additions & 53 deletions pkg/agent/openflow/pipeline_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,62 +22,9 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
binding "antrea.io/antrea/pkg/ovs/openflow"
)

const (
// ctZoneSNAT is only used on Windows and only when AntreaProxy is enabled.
// When a Pod access a ClusterIP Service, and the IP of the selected endpoint
// is not in "cluster-cidr". The request packets need to be SNAT'd(set src IP to local Node IP)
// after have been DNAT'd(set dst IP to endpoint IP).
// For example, the endpoint Pod may run in hostNetwork mode and the IP of the endpoint
// will be the current Node IP.
// We need to use a different ct_zone to track the SNAT'd connection because OVS
// does not support doing both DNAT and SNAT in the same ct_zone.
//
// An example of the connection is a Pod accesses kubernetes API service:
// Pod --> DNAT(CtZone) --> SNAT(ctZoneSNAT) --> Endpoint(API server NodeIP)
// Pod <-- unDNAT(CtZone) <-- unSNAT(ctZoneSNAT) <-- Endpoint(API server NodeIP)
ctZoneSNAT = 0xffdc
)

var (
// snatCTMark indicates SNAT is performed for packets of the connection.
snatCTMark = binding.NewCTMark(0x40, 0, 31)
)

// TODO: refactor
func (c *client) snatMarkFlows(snatIP net.IP, mark uint32) []binding.Flow {
snatIPRange := &binding.IPRange{StartIP: snatIP, EndIP: snatIP}
nextTable := ConntrackCommitTable.GetNext()
flows := []binding.Flow{
c.snatIPFromTunnelFlow(snatIP, mark),
ConntrackCommitTable.BuildFlow(priorityNormal).
MatchProtocol(binding.ProtocolIP).
MatchCTStateNew(true).MatchCTStateTrk(true).MatchCTStateDNAT(false).
MatchPktMark(mark, &types.SNATIPMarkMask).
Action().CT(true, nextTable, CtZone).
SNAT(snatIPRange, nil).
LoadToCtMark(snatCTMark).CTDone().
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done(),
}

if c.enableProxy {
flows = append(flows, ConntrackCommitTable.BuildFlow(priorityNormal).
MatchProtocol(binding.ProtocolIP).
MatchCTStateNew(true).MatchCTStateTrk(true).MatchCTStateDNAT(true).
MatchPktMark(mark, &types.SNATIPMarkMask).
Action().CT(true, nextTable, ctZoneSNAT).
SNAT(snatIPRange, nil).
LoadToCtMark(snatCTMark).CTDone().
Cookie(c.cookieAllocator.Request(cookie.SNAT).Raw()).
Done())
}
return flows
}

// Feature: PodConnectivity
// Stage: ClassifierStage
// Tables: ClassifierTable
Expand Down

0 comments on commit a748070

Please sign in to comment.