Skip to content

Commit

Permalink
Fix #2092
Browse files Browse the repository at this point in the history
  • Loading branch information
hongliangl committed Apr 16, 2021
1 parent 599d663 commit be8ebcb
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 20 deletions.
11 changes: 7 additions & 4 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,13 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
group := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
if err := group.Add(); err != nil {
groups := c.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)

if err := c.bridge.AddOFEntriesInBundle(groups[0:1], nil, nil, groups[1:]); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group: %w", err)
}
c.groupCache.Store(groupID, group)

c.groupCache.Store(groupID, groups)
return nil
}

Expand Down Expand Up @@ -754,7 +756,8 @@ func (c *client) ReplayFlows() {
}

c.groupCache.Range(func(id, gEntry interface{}) bool {
if err := gEntry.(binding.Group).Add(); err != nil {
groups := gEntry.([]binding.OFEntry)
if err := c.bridge.AddOFEntriesInBundle(groups[0:1], nil, nil, groups[1:]); err != nil {
klog.Errorf("Error when replaying cached group %d: %v", id, err)
}
return true
Expand Down
35 changes: 27 additions & 8 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ const (
ipv6MulticastAddr = "FF00::/8"
// IPv6 link-local prefix
ipv6LinkLocalAddr = "FE80::/10"

// Insert-bucket number
insertBucketNum = 3
fakeGroupId = 0xffffffff
)

var (
Expand Down Expand Up @@ -476,7 +480,7 @@ func (c *client) AddOFEntries(ofEntries []binding.OFEntry) error {
d := time.Since(startTime)
metrics.OVSFlowOpsLatency.WithLabelValues("add").Observe(float64(d.Milliseconds()))
}()
if err := c.bridge.AddOFEntriesInBundle(ofEntries, nil, nil); err != nil {
if err := c.bridge.AddOFEntriesInBundle(ofEntries, nil, nil, nil); err != nil {
metrics.OVSFlowOpsErrorCount.WithLabelValues("add").Inc()
return err
}
Expand All @@ -490,7 +494,7 @@ func (c *client) DeleteOFEntries(ofEntries []binding.OFEntry) error {
d := time.Since(startTime)
metrics.OVSFlowOpsLatency.WithLabelValues("delete").Observe(float64(d.Milliseconds()))
}()
if err := c.bridge.AddOFEntriesInBundle(nil, nil, ofEntries); err != nil {
if err := c.bridge.AddOFEntriesInBundle(nil, nil, ofEntries, nil); err != nil {
metrics.OVSFlowOpsErrorCount.WithLabelValues("delete").Inc()
return err
}
Expand Down Expand Up @@ -1971,8 +1975,8 @@ func (c *client) hairpinSNATFlow(endpointIP net.IP) binding.Flow {
// serviceLBTable to trigger the learn flow, the learn flow will then send packets
// to endpointDNATTable. Otherwise, buckets will resubmit packets to
// endpointDNATTable directly.
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := c.bridge.CreateGroup(groupID).ResetBuckets()
func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) []binding.OFEntry {
var groups []binding.OFEntry
var resubmitTableID binding.TableIDType
var lbResultMark uint32
if withSessionAffinity {
Expand All @@ -1982,15 +1986,30 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
resubmitTableID = endpointDNATTable
lbResultMark = marksRegServiceSelected
}
endpointsNum := len(endpoints)

group := c.bridge.CreateGroup(groupID).ResetBuckets()
groups = append(groups, group)

for i := insertBucketNum; i < endpointsNum; i += insertBucketNum {
group := c.bridge.CreateGroup(fakeGroupId).ResetBuckets()
c.bridge.DeleteGroup(fakeGroupId)
group.SetGroupId(groupID)
groups = append(groups, group)
}
if groups == nil {
return groups
}

for _, endpoint := range endpoints {
for index, endpoint := range endpoints {
endpointPort, _ := endpoint.Port()
endpointIP := net.ParseIP(endpoint.IP())
portVal := portToUint16(endpointPort)
ipProtocol := getIPProtocol(endpointIP)
if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(endpointIP.To4())
group = group.Bucket().Weight(100).

groups[index/insertBucketNum] = groups[index/insertBucketNum].(binding.Group).Bucket().Weight(100).
LoadReg(int(endpointIPReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Expand All @@ -1999,7 +2018,7 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
groups[index/insertBucketNum] = groups[index/insertBucketNum].(binding.Group).Bucket().Weight(100).
LoadXXReg(int(endpointIPv6XXReg), ipVal).
LoadRegRange(int(endpointPortReg), uint32(portVal), endpointPortRegRange).
LoadRegRange(int(serviceLearnReg), lbResultMark, serviceLearnRegRange).
Expand All @@ -2008,7 +2027,7 @@ func (c *client) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAf
Done()
}
}
return group
return groups
}

// decTTLFlows decrements TTL by one for the packets forwarded across Nodes.
Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/ovs/openflow/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
AddMessage OFOperation = iota
ModifyMessage
DeleteMessage
InsertBucketMessage
)

// IPDSCPToSRange stores the DSCP bits in ToS field of IP header.
Expand All @@ -99,7 +100,7 @@ type Bridge interface {
// AddOFEntriesInBundle syncs multiple Openflow entries(including Flow and Group) in a single transaction. This
// operation could add new entries in "addEntries", modify entries in "modEntries", and remove entries in
// "delEntries" in the same bundle.
AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEntry, delEntries []OFEntry) error
AddOFEntriesInBundle(addEntries, modEntries, delEntries, insertBucketEntries []OFEntry) error
// Connect initiates connection to the OFSwitch. It will block until the connection is established. connectCh is used to
// send notification whenever the switch is connected or reconnected.
Connect(maxRetrySec int, connectCh chan struct{}) error
Expand Down Expand Up @@ -291,6 +292,7 @@ type Group interface {
OFEntry
ResetBuckets() Group
Bucket() BucketBuilder
SetGroupId(groupID GroupIDType)
}

type BucketBuilder interface {
Expand Down
7 changes: 4 additions & 3 deletions pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,9 @@ func (b *OFBridge) AddFlowsInBundle(addflows []Flow, modFlows []Flow, delFlows [
return nil
}

func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEntry, delEntries []OFEntry) error {
func (b *OFBridge) AddOFEntriesInBundle(addEntries, modEntries, delEntries, insertBucketEntries []OFEntry) error {
// If no Openflow entries are requested to be added or modified or deleted on the OVS bridge, return immediately.
if len(addEntries) == 0 && len(modEntries) == 0 && len(delEntries) == 0 {
if len(addEntries) == 0 && len(modEntries) == 0 && len(delEntries) == 0 && len(insertBucketEntries) == 0 {
klog.V(2).Info("No Openflow entries need to be synced to the OVS bridge, returning")
return nil
}
Expand Down Expand Up @@ -445,6 +445,7 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt
checkMessages(addEntries, AddMessage)
checkMessages(modEntries, ModifyMessage)
checkMessages(delEntries, DeleteMessage)
checkMessages(insertBucketEntries, InsertBucketMessage)

// Create a new transaction. Use ofctrl.Ordered to ensure the messages are realized on OVS in the order of adding
// messages. This type could ensure Group entry is realized on OVS in advance of Flow entry.
Expand Down Expand Up @@ -494,7 +495,7 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt
count, err := tx.Complete()
if err != nil {
return err
} else if count != len(addEntries)+len(modEntries)+len(delEntries) {
} else if count != len(addEntries)+len(modEntries)+len(delEntries)+len(insertBucketEntries) {
// This case should not be possible if all the calls to "tx.AddMessage" returned nil. This is just a sanity check.
tx.Abort()
return errors.New("failed to add all Openflow entries in one transaction, cancelling it")
Expand Down
10 changes: 8 additions & 2 deletions pkg/ovs/openflow/ofctrl_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (g *ofGroup) Bucket() BucketBuilder {
}
}

func (f *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) {
func (g *ofGroup) SetGroupId(groupID GroupIDType) {
g.ofctrl.ID = uint32(groupID)
}

func (g *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMessage, error) {
var operation int
switch entryOper {
case AddMessage:
Expand All @@ -66,8 +70,10 @@ func (f *ofGroup) GetBundleMessage(entryOper OFOperation) (ofctrl.OpenFlowModMes
operation = openflow13.OFPGC_MODIFY
case DeleteMessage:
operation = openflow13.OFPGC_DELETE
case InsertBucketMessage:
operation = openflow13.OFPGC_INSERT_BUCKET
}
message := f.ofctrl.GetBundleMessage(operation)
message := g.ofctrl.GetBundleMessage(operation)
return message, nil
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/ovs/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions test/integration/agent/openflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,66 @@ type svcConfig struct {
withSessionAffinity bool
}

func TestGroup(t *testing.T) {
c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false)
err := ofTestUtils.PrepareOVSBridge(br)
require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))

_, err = c.Initialize(roundInfo, &config1.NodeConfig{}, config1.TrafficEncapModeEncap)
require.Nil(t, err, "Failed to initialize OFClient")

defer func() {
err = c.Disconnect()
assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err))
//err = ofTestUtils.DeleteOVSBridge(br)
//assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err))
}()

endpoints := []k8sproxy.Endpoint{
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("1.1.1.1", "1"),
IsLocal: true,
}),
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("2.2.2.2", "2"),
IsLocal: false,
}),
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("3.3.3.3", "3"),
IsLocal: false,
}),
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("4.4.4.4", "4"),
IsLocal: false,
}),
}

stickyMaxAgeSeconds := uint16(30)

tcs := []struct {
svc svcConfig
gid uint32
endpoints []k8sproxy.Endpoint
stickyAge uint16
}{
{
svc: svcConfig{
protocol: ofconfig.ProtocolTCP,
ip: net.ParseIP("10.20.30.41"),
port: uint16(8000),
},
gid: 2,
endpoints: endpoints,
stickyAge: stickyMaxAgeSeconds,
},
}

groupID := ofconfig.GroupIDType(2)
err = c.InstallServiceGroup(groupID, tcs[0].svc.withSessionAffinity, tcs[0].endpoints)
assert.NoError(t, err, "no error should return when installing groups for Service")

}

func TestProxyServiceFlows(t *testing.T) {
c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false)
err := ofTestUtils.PrepareOVSBridge(br)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/ovs/ofctrl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,12 @@ func TestBundleWithGroupAndFlow(t *testing.T) {
bucket0 := "weight:100,actions=load:0xa0a0002->NXM_NX_REG1[],load:0x35->NXM_NX_REG2[],load:0xfff1->NXM_NX_REG3[],resubmit(,3)"
bucket1 := "weight:100,actions=load:0xa0a0202->NXM_NX_REG1[],load:0x35->NXM_NX_REG2[],load:0xfff1->NXM_NX_REG3[],resubmit(,3)"
expectedGroupBuckets := []string{bucket0, bucket1}
err = bridge.AddOFEntriesInBundle([]binding.OFEntry{flow, group}, nil, nil)
err = bridge.AddOFEntriesInBundle([]binding.OFEntry{flow, group}, nil, nil, nil)
require.Nil(t, err)
CheckFlowExists(t, ovsCtlClient, uint8(table.GetID()), true, expectedFlows)
CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, true)

err = bridge.AddOFEntriesInBundle(nil, nil, []binding.OFEntry{flow, group})
err = bridge.AddOFEntriesInBundle(nil, nil, []binding.OFEntry{flow, group}, nil)
require.Nil(t, err)
CheckFlowExists(t, ovsCtlClient, uint8(table.GetID()), false, expectedFlows)
CheckGroupExists(t, ovsCtlClient, groupID, "select", expectedGroupBuckets, false)
Expand Down

0 comments on commit be8ebcb

Please sign in to comment.