Skip to content

Commit

Permalink
Support capturing only the dropped packet in live-traffic Traceflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jianjuns committed Apr 7, 2021
1 parent 7a86c23 commit 14c6197
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 86 deletions.
7 changes: 7 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,11 @@ spec:
name: Live-Traffic
priority: 10
type: boolean
- description: Capture only the dropped packet.
jsonPath: .spec.droppedOnly
name: Dropped-Only
priority: 10
type: boolean
- description: Timeout in seconds.
jsonPath: .spec.timeout
name: Timeout
Expand All @@ -1317,6 +1322,8 @@ spec:
service:
type: string
type: object
droppedOnly:
type: boolean
liveTraffic:
type: boolean
packet:
Expand Down
7 changes: 7 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,11 @@ spec:
name: Live-Traffic
priority: 10
type: boolean
- description: Capture only the dropped packet.
jsonPath: .spec.droppedOnly
name: Dropped-Only
priority: 10
type: boolean
- description: Timeout in seconds.
jsonPath: .spec.timeout
name: Timeout
Expand All @@ -1317,6 +1322,8 @@ spec:
service:
type: string
type: object
droppedOnly:
type: boolean
liveTraffic:
type: boolean
packet:
Expand Down
7 changes: 7 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,11 @@ spec:
name: Live-Traffic
priority: 10
type: boolean
- description: Capture only the dropped packet.
jsonPath: .spec.droppedOnly
name: Dropped-Only
priority: 10
type: boolean
- description: Timeout in seconds.
jsonPath: .spec.timeout
name: Timeout
Expand All @@ -1317,6 +1322,8 @@ spec:
service:
type: string
type: object
droppedOnly:
type: boolean
liveTraffic:
type: boolean
packet:
Expand Down
7 changes: 7 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,11 @@ spec:
name: Live-Traffic
priority: 10
type: boolean
- description: Capture only the dropped packet.
jsonPath: .spec.droppedOnly
name: Dropped-Only
priority: 10
type: boolean
- description: Timeout in seconds.
jsonPath: .spec.timeout
name: Timeout
Expand All @@ -1317,6 +1322,8 @@ spec:
service:
type: string
type: object
droppedOnly:
type: boolean
liveTraffic:
type: boolean
packet:
Expand Down
7 changes: 7 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,11 @@ spec:
name: Live-Traffic
priority: 10
type: boolean
- description: Capture only the dropped packet.
jsonPath: .spec.droppedOnly
name: Dropped-Only
priority: 10
type: boolean
- description: Timeout in seconds.
jsonPath: .spec.timeout
name: Timeout
Expand All @@ -1317,6 +1322,8 @@ spec:
service:
type: string
type: object
droppedOnly:
type: boolean
liveTraffic:
type: boolean
packet:
Expand Down
7 changes: 7 additions & 0 deletions build/yamls/base/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ spec:
name: Live-Traffic
type: boolean
priority: 10
- jsonPath: .spec.droppedOnly
description: Capture only the dropped packet.
name: Dropped-Only
type: boolean
priority: 10
- jsonPath: .spec.timeout
description: Timeout in seconds.
name: Timeout
Expand Down Expand Up @@ -222,6 +227,8 @@ spec:
type: integer
liveTraffic:
type: boolean
droppedOnly:
type: boolean
timeout:
type: integer
status:
Expand Down
9 changes: 7 additions & 2 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
}
update := tf.DeepCopy()
update.Status.Results = append(update.Status.Results, *nodeResult)
update.Status.CapturedPacket = packet
if packet != nil {
update.Status.CapturedPacket = packet
}
_, err = c.traceflowClient.CrdV1alpha1().Traceflows().UpdateStatus(context.TODO(), update, v1.UpdateOptions{})
if err != nil {
klog.Warningf("Update traceflow failed: %+v", err)
Expand Down Expand Up @@ -108,7 +110,10 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl
// Uninstall the OVS flows after receiving the first packet, to
// avoid capturing too many matched packets.
c.ofClient.UninstallTraceflowFlows(tag)
if tfState.isSender {
// Report the captured dropped packet, if the Traceflow is for
// the dropped packet only; otherwise only the sender reports
// the first captured packet.
if tfState.isSender || tfState.droppedOnly {
capturedPacket = parseCapturedPacket(pktIn)
}
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type traceflowState struct {
name string
tag uint8
liveTraffic bool
droppedOnly bool
isSender bool
// Agent received the first Traceflow packet from OVS.
receivedPacket bool
Expand Down Expand Up @@ -308,6 +309,9 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error {
srcOFPort = uint32(podInterfaces[0].OFPort)
// On the source Node, trace the first packet of the first
// connection that matches the Traceflow spec.
// TODO: support specifying only the Destination Pod for
// live-traffic Traceflow, which will trace the matched traffic
// to the destination Pod from any source.
if liveTraffic {
matchPacket = packet
}
Expand All @@ -316,7 +320,10 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error {

// Store Traceflow to cache.
c.runningTraceflowsMutex.Lock()
tfState := traceflowState{name: tf.Name, tag: tf.Status.DataplaneTag, liveTraffic: tf.Spec.LiveTraffic, isSender: isSender}
tfState := traceflowState{
name: tf.Name, tag: tf.Status.DataplaneTag,
liveTraffic: liveTraffic, droppedOnly: tf.Spec.DroppedOnly && liveTraffic,
isSender: isSender}
c.runningTraceflows[tfState.tag] = &tfState
c.runningTraceflowsMutex.Unlock()

Expand All @@ -326,7 +333,7 @@ func (c *Controller) startTraceflow(tf *crdv1alpha1.Traceflow) error {
if timeout == 0 {
timeout = crdv1alpha1.DefaultTraceflowTimeout
}
err = c.ofClient.InstallTraceflowFlows(tfState.tag, liveTraffic, matchPacket, srcOFPort, timeout)
err = c.ofClient.InstallTraceflowFlows(tfState.tag, liveTraffic, tfState.droppedOnly, matchPacket, srcOFPort, timeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -506,8 +513,8 @@ func (c *Controller) preparePacket(tf *crdv1alpha1.Traceflow, intf *interfacesto
}
}

if packet.IPProto == 0 || packet.IPProto == protocol.Type_ICMP || packet.IPProto == protocol.Type_IPv6ICMP {
// IPProto defaults to ICMP.
// Defaults to ICMP if not live-traffic Traceflow.
if packet.IPProto == 0 && !liveTraffic || packet.IPProto == protocol.Type_ICMP || packet.IPProto == protocol.Type_IPv6ICMP {
isICMP = true
}
if isICMP {
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type Client interface {
SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, inPort uint32, outPort int32) error

// InstallTraceflowFlows installs flows for a Traceflow request.
InstallTraceflowFlows(dataplaneTag uint8, liveTraffic bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error
InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error

// UninstallTraceflowFlows uninstalls flows for a Traceflow request.
UninstallTraceflowFlows(dataplaneTag uint8) error
Expand Down Expand Up @@ -860,11 +860,11 @@ func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet,
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error {
func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly bool, packet *binding.Packet, srcOFPort uint32, timeoutSeconds uint16) error {
cacheKey := fmt.Sprintf("%x", dataplaneTag)
flows := []binding.Flow{}
flows = append(flows, c.traceflowConnectionTrackFlows(dataplaneTag, packet, srcOFPort, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowL2ForwardOutputFlows(dataplaneTag, liveTraffic, droppedOnly, timeoutSeconds, cookie.Default)...)
flows = append(flows, c.traceflowNetworkPolicyFlows(dataplaneTag, timeoutSeconds, cookie.Default)...)
return c.addFlows(c.tfFlowCache, cacheKey, flows)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func Test_client_InstallTraceflowFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
c := tt.prepareFunc(ctrl)
if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, nil, 0, 300); (err != nil) != tt.wantErr {
if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, false, nil, 0, 300); (err != nil) != tt.wantErr {
t.Errorf("InstallTraceflowFlows() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
51 changes: 30 additions & 21 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,89 +936,98 @@ func (c *client) l2ForwardCalcFlow(dstMAC net.HardwareAddr, ofPort uint32, skipI

// traceflowL2ForwardOutputFlows generates Traceflow specific flows that outputs traceflow packets
// to OVS port and Antrea Agent after L2forwarding calculation.
func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, liveTraffic bool, timeout uint16, category cookie.Category) []binding.Flow {
func (c *client) traceflowL2ForwardOutputFlows(dataplaneTag uint8, liveTraffic, droppedOnly bool, timeout uint16, category cookie.Category) []binding.Flow {
flows := []binding.Flow{}
l2FwdOutTable := c.pipeline[L2ForwardingOutTable]
for _, ipProtocol := range []binding.Protocol{binding.ProtocolIP, binding.ProtocolIPv6} {
if c.encapMode.SupportsEncap() {
// SendToController and Output if output port is tunnel port.
flows = append(flows, l2FwdOutTable.BuildFlow(priorityNormal+3).
fb1 := l2FwdOutTable.BuildFlow(priorityNormal+3).
MatchReg(int(PortCacheReg), config.DefaultTunOFPort).
MatchIPDSCP(dataplaneTag).
SetHardTimeout(timeout).
MatchProtocol(ipProtocol).
MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange).
Action().OutputRegRange(int(PortCacheReg), ofPortRegRange).
Action().SendToController(uint8(PacketInReasonTF)).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
Cookie(c.cookieAllocator.Request(category).Raw())
// For injected packets, only SendToController if output port is local
// gateway. In encapMode, a Traceflow packet going out of the gateway
// port (i.e. exiting the overlay) essentially means that the Traceflow
// request is complete.
flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+2).
fb2 := l2FwdOutTable.BuildFlow(priorityNormal+2).
MatchReg(int(PortCacheReg), config.HostGatewayOFPort).
MatchIPDSCP(dataplaneTag).
SetHardTimeout(timeout).
MatchProtocol(ipProtocol).
MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange).
Action().SendToController(uint8(PacketInReasonTF)).
Cookie(c.cookieAllocator.Request(category).Raw())

// Do not send to controller if captures only dropped packet.
if !droppedOnly {
fb1 = fb1.Action().SendToController(uint8(PacketInReasonTF))
fb2 = fb2.Action().SendToController(uint8(PacketInReasonTF))
}
if liveTraffic {
// Clear the loaded DSCP bits before output.
flowBuilder = flowBuilder.Action().LoadIPDSCP(0).
fb2 = fb2.Action().LoadIPDSCP(0).
Action().OutputRegRange(int(PortCacheReg), ofPortRegRange)
}
flows = append(flows, flowBuilder.Done())
flows = append(flows, fb1.Done(), fb2.Done())
} else {
// SendToController and Output if output port is local gateway. Unlike in
// encapMode, inter-Node Pod-to-Pod traffic is expected to go out of the
// gateway port on the way to its destination.
flows = append(flows, l2FwdOutTable.BuildFlow(priorityNormal+2).
fb1 := l2FwdOutTable.BuildFlow(priorityNormal+2).
MatchReg(int(PortCacheReg), config.HostGatewayOFPort).
MatchIPDSCP(dataplaneTag).
SetHardTimeout(timeout).
MatchProtocol(ipProtocol).
MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange).
Action().OutputRegRange(int(PortCacheReg), ofPortRegRange).
Action().SendToController(uint8(PacketInReasonTF)).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
Cookie(c.cookieAllocator.Request(category).Raw())
if !droppedOnly {
fb1 = fb1.Action().SendToController(uint8(PacketInReasonTF))
}
flows = append(flows, fb1.Done())
}
// Only SendToController if output port is local gateway and destination IP is gateway.
gatewayIP := c.nodeConfig.GatewayConfig.IPv4
if ipProtocol == binding.ProtocolIPv6 {
gatewayIP = c.nodeConfig.GatewayConfig.IPv6
}
if gatewayIP != nil {
flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+3).
fb := l2FwdOutTable.BuildFlow(priorityNormal+3).
MatchReg(int(PortCacheReg), config.HostGatewayOFPort).
MatchDstIP(gatewayIP).
MatchIPDSCP(dataplaneTag).
SetHardTimeout(timeout).
MatchProtocol(ipProtocol).
MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange).
Action().SendToController(uint8(PacketInReasonTF)).
Cookie(c.cookieAllocator.Request(category).Raw())
if !droppedOnly {
fb = fb.Action().SendToController(uint8(PacketInReasonTF))
}
if liveTraffic {
flowBuilder = flowBuilder.Action().LoadIPDSCP(0).
fb = fb.Action().LoadIPDSCP(0).
Action().OutputRegRange(int(PortCacheReg), ofPortRegRange)
}
flows = append(flows, flowBuilder.Done())
flows = append(flows, fb.Done())
}
// Only SendToController if output port is Pod port.
flowBuilder := l2FwdOutTable.BuildFlow(priorityNormal+2).
fb := l2FwdOutTable.BuildFlow(priorityNormal+2).
MatchIPDSCP(dataplaneTag).
SetHardTimeout(timeout).
MatchProtocol(ipProtocol).
MatchRegRange(int(marksReg), portFoundMark, ofPortMarkRange).
Action().SendToController(uint8(PacketInReasonTF)).
Cookie(c.cookieAllocator.Request(category).Raw())
if !droppedOnly {
fb = fb.Action().SendToController(uint8(PacketInReasonTF))
}
if liveTraffic {
flowBuilder = flowBuilder.Action().LoadIPDSCP(0).
fb = fb.Action().LoadIPDSCP(0).
Action().OutputRegRange(int(PortCacheReg), ofPortRegRange)
}
flows = append(flows, flowBuilder.Done())
flows = append(flows, fb.Done())
}
return flows
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 14c6197

Please sign in to comment.