Skip to content

Commit

Permalink
test: Try flow matching from progressive offsets in case of failures
Browse files Browse the repository at this point in the history
Retry flow matching again if all flow requirements can not be
met. Start the new round from the flow following the previous first
match. This effectively retries flow matching from different L4
connections, as the ephemeral source port found from the first match
is fixed for the remaining requirements. By skipping the previous
first match the match can succeed on a different L4 connection.

Merge flow validation results for each flow requirement before returning.
Without this flow validation was reported successful when at least one
flow validation requirement was met, hiding failures in later requirements.

For example, if successful DNS reply from proxy is required, it may be
on a different UDP or TCP port than the first DNS resolution exchange
that may have failed.

This capability was inadvertently removed in #319.

Fixes: #319
Signed-off-by: Jarno Rajahalme <[email protected]>
  • Loading branch information
jrajahalme authored and tklauser committed Sep 1, 2021
1 parent b764b09 commit 088eb3f
Showing 1 changed file with 47 additions and 41 deletions.
88 changes: 47 additions & 41 deletions connectivity/check/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,26 +260,20 @@ func (a *Action) printFlows(peer TestPeer) {
a.Log()
}

func (a *Action) matchFlowRequirements(ctx context.Context, flows flowsSet, offset int, req *filters.FlowSetRequirement) FlowRequirementResults {
r := FlowRequirementResults{
Matched: MatchMap{},
FirstMatch: -1,
LastMatch: -1,
}

// Skip 'offset' amount of flows.
flows = flows[offset:]
flowCtx := filters.NewFlowContext()
func (a *Action) matchFlowRequirements(ctx context.Context, flows flowsSet, req *filters.FlowSetRequirement) FlowRequirementResults {
var offset int
r := FlowRequirementResults{Failures: -1} // -1 to get the loop started

match := func(expect bool, f filters.FlowRequirement, fc *filters.FlowContext) (int, bool, *flow.Flow) {
index, match, flow := flows.Contains(f.Filter, fc)
index, match, flow := flows[offset:].Contains(f.Filter, fc)
index += offset

if match {
r.Matched[offset+index] = expect
r.Matched[index] = expect

// Update last match index and timestamp
if r.LastMatch < offset+index {
r.LastMatch = offset + index
if r.LastMatch < index {
r.LastMatch = index
if t := flow.GetTime(); t != nil && t.IsValid() {
r.LastMatchTimestamp = t.AsTime()
}
Expand All @@ -289,7 +283,7 @@ func (a *Action) matchFlowRequirements(ctx context.Context, flows flowsSet, offs
if match != expect {
msgSuffix := "not found"
if match {
msgSuffix = fmt.Sprintf("found at %d", offset+index)
msgSuffix = fmt.Sprintf("found at %d", index)
}

a.Infof("%s %s %s", f.Msg, f.Filter.String(fc), msgSuffix)
Expand All @@ -299,7 +293,7 @@ func (a *Action) matchFlowRequirements(ctx context.Context, flows flowsSet, offs
} else {
msgSuffix := "not found"
if match {
msgSuffix = fmt.Sprintf("found at %d", offset+index)
msgSuffix = fmt.Sprintf("found at %d", index)
}

a.Logf("✅ %s %s", f.Msg, msgSuffix)
Expand All @@ -308,32 +302,49 @@ func (a *Action) matchFlowRequirements(ctx context.Context, flows flowsSet, offs
return index, match, flow
}

index, matched, _ := match(true, req.First, &flowCtx)
if !matched {
r.NeedMoreFlows = true
// No point trying to match more if First does not match.
return r
}
for r.NeedMoreFlows || r.Failures != 0 {
// reinit for the new round
r.NeedMoreFlows = false
r.Failures = 0
r.Matched = MatchMap{}
r.FirstMatch = -1
r.LastMatch = -1

r.FirstMatch = offset + index
flowCtx := filters.NewFlowContext()

for _, f := range req.Middle {
if f.SkipOnAggregation && a.test.ctx.FlowAggregation() {
continue
index, matched, _ := match(true, req.First, &flowCtx)
if !matched {
r.NeedMoreFlows = true
// No point trying to match more if First does not match.
break
}
match(true, f, &flowCtx)
}
r.FirstMatch = index

if !(req.Last.SkipOnAggregation && a.test.ctx.FlowAggregation()) {
if _, match, _ := match(true, req.Last, &flowCtx); !match {
r.NeedMoreFlows = true
for _, f := range req.Middle {
if f.SkipOnAggregation && a.test.ctx.FlowAggregation() {
continue
}
match(true, f, &flowCtx)
}
}

for _, f := range req.Except {
match(false, f, &flowCtx)
}
if !(req.Last.SkipOnAggregation && a.test.ctx.FlowAggregation()) {
if _, match, _ := match(true, req.Last, &flowCtx); !match {
r.NeedMoreFlows = true
}
}

for _, f := range req.Except {
match(false, f, &flowCtx)
}

// Check if successfully fully matched
if !r.NeedMoreFlows && r.Failures == 0 {
break
}

// Try if some other flow instance would find both first and last required flows
offset = r.FirstMatch + 1
}
return r
}

Expand Down Expand Up @@ -674,16 +685,11 @@ func (a *Action) matchAllFlowRequirements(ctx context.Context, reqs []filters.Fl
out.NeedMoreFlows = true
return out
}

a.flowsMu.Lock()
defer a.flowsMu.Unlock()

for _, req := range reqs {
res := a.matchFlowRequirements(ctx, a.flows, 0, &req)
if res.NeedMoreFlows || res.FirstMatch == -1 {
break
}

res := a.matchFlowRequirements(ctx, a.flows, &req)
//TODO(timo): The matcher should probably take in all requirements
// and return its verdict in a single struct.
out.Merge(&res)
Expand Down

0 comments on commit 088eb3f

Please sign in to comment.