Skip to content

Commit

Permalink
fix(minipipeline): include probe/control resolved addrs (#1405)
Browse files Browse the repository at this point in the history
This information is useful for debugging. Also, I noticed an issue in
how I compute DNS consistency, which suggests that I need to refactor
how to do that. Having the control/probe resolved addrs is required to
do that.

Part of ooni/probe#2634
  • Loading branch information
bassosimone authored Nov 30, 2023
1 parent cc808b5 commit 2a8aeac
Show file tree
Hide file tree
Showing 72 changed files with 6,007 additions and 525 deletions.
44 changes: 38 additions & 6 deletions internal/cmd/minipipeline/testdata/observations.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"DNSLookupFailure": "dns_no_answer",
"DNSQueryType": "AAAA",
"DNSEngine": "udp",
"DNSResolvedAddrs": null,
"IPAddress": null,
"IPAddressASN": null,
"IPAddressBogon": null,
Expand All @@ -29,6 +30,9 @@
"TagFetchBody": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": null,
"MatchWithControlIPAddress": null,
"MatchWithControlIPAddressASN": null,
Expand All @@ -45,6 +49,7 @@
"DNSLookupFailure": "dns_no_answer",
"DNSQueryType": "AAAA",
"DNSEngine": "doh",
"DNSResolvedAddrs": null,
"IPAddress": null,
"IPAddressASN": null,
"IPAddressBogon": null,
Expand All @@ -68,6 +73,9 @@
"TagFetchBody": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": null,
"MatchWithControlIPAddress": null,
"MatchWithControlIPAddressASN": null,
Expand All @@ -86,6 +94,9 @@
"DNSLookupFailure": "",
"DNSQueryType": "A",
"DNSEngine": "udp",
"DNSResolvedAddrs": [
"130.192.16.171"
],
"IPAddress": "130.192.16.171",
"IPAddressASN": 137,
"IPAddressBogon": false,
Expand All @@ -107,8 +118,11 @@
"HTTPResponseIsFinal": null,
"TagDepth": null,
"TagFetchBody": null,
"ControlDNSDomain": null,
"ControlDNSLookupFailure": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": null,
"MatchWithControlIPAddress": null,
"MatchWithControlIPAddressASN": null,
Expand All @@ -125,6 +139,9 @@
"DNSLookupFailure": "",
"DNSQueryType": "ANY",
"DNSEngine": "getaddrinfo",
"DNSResolvedAddrs": [
"130.192.16.171"
],
"IPAddress": "130.192.16.171",
"IPAddressASN": 137,
"IPAddressBogon": false,
Expand All @@ -146,8 +163,11 @@
"HTTPResponseIsFinal": null,
"TagDepth": null,
"TagFetchBody": null,
"ControlDNSDomain": null,
"ControlDNSLookupFailure": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": null,
"MatchWithControlIPAddress": null,
"MatchWithControlIPAddressASN": null,
Expand All @@ -164,6 +184,9 @@
"DNSLookupFailure": "",
"DNSQueryType": "A",
"DNSEngine": "doh",
"DNSResolvedAddrs": [
"130.192.16.171"
],
"IPAddress": "130.192.16.171",
"IPAddressASN": 137,
"IPAddressBogon": false,
Expand All @@ -185,8 +208,11 @@
"HTTPResponseIsFinal": null,
"TagDepth": null,
"TagFetchBody": null,
"ControlDNSDomain": null,
"ControlDNSLookupFailure": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": null,
"MatchWithControlIPAddress": null,
"MatchWithControlIPAddressASN": null,
Expand All @@ -205,6 +231,9 @@
"DNSLookupFailure": "",
"DNSQueryType": null,
"DNSEngine": null,
"DNSResolvedAddrs": [
"130.192.16.171"
],
"IPAddress": "130.192.16.171",
"IPAddressASN": 137,
"IPAddressBogon": false,
Expand Down Expand Up @@ -243,6 +272,9 @@
"TagFetchBody": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": "",
"MatchWithControlIPAddress": true,
"MatchWithControlIPAddressASN": true,
Expand Down
16 changes: 14 additions & 2 deletions internal/cmd/minipipeline/testdata/observations_classic.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"DNSLookupFailure": "",
"DNSQueryType": "ANY",
"DNSEngine": "getaddrinfo",
"DNSResolvedAddrs": [
"130.192.16.171"
],
"IPAddress": "130.192.16.171",
"IPAddressASN": 137,
"IPAddressBogon": false,
Expand All @@ -28,8 +31,11 @@
"HTTPResponseIsFinal": null,
"TagDepth": null,
"TagFetchBody": null,
"ControlDNSDomain": null,
"ControlDNSLookupFailure": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": null,
"MatchWithControlIPAddress": null,
"MatchWithControlIPAddressASN": null,
Expand All @@ -48,6 +54,9 @@
"DNSLookupFailure": "",
"DNSQueryType": null,
"DNSEngine": null,
"DNSResolvedAddrs": [
"130.192.16.171"
],
"IPAddress": "130.192.16.171",
"IPAddressASN": 137,
"IPAddressBogon": false,
Expand Down Expand Up @@ -86,6 +95,9 @@
"TagFetchBody": null,
"ControlDNSDomain": "nexa.polito.it",
"ControlDNSLookupFailure": "",
"ControlDNSResolvedAddrs": [
"130.192.16.171"
],
"ControlTCPConnectFailure": "",
"MatchWithControlIPAddress": true,
"MatchWithControlIPAddressASN": true,
Expand Down
34 changes: 29 additions & 5 deletions internal/minipipeline/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type WebObservation struct {
// DNSEngine is the DNS engine that we're using (e.g., "getaddrinfo").
DNSEngine optional.Value[string]

// DNSResolvedAddrs contains the list of DNS-resolved addrs.
DNSResolvedAddrs optional.Value[Set[string]]

// The following fields are optional.Some in these cases:
//
// 1. when you process successful DNS lookup events from OONI measurements;
Expand Down Expand Up @@ -196,6 +199,9 @@ type WebObservation struct {
// ControlDNSLookupFailure is the corresponding control DNS lookup failure.
ControlDNSLookupFailure optional.Value[string]

// ControlDNSResolvedAddrs contains the list of addrs DNS-resolved by the control.
ControlDNSResolvedAddrs optional.Value[Set[string]]

// ControlTCPConnectFailure is the control's TCP connect failure.
ControlTCPConnectFailure optional.Value[string]

Expand Down Expand Up @@ -297,14 +303,16 @@ func (c *WebObservationsContainer) ingestDNSLookupSuccesses(evs ...*model.Archiv
}

// walk through the answers
utilsForEachIPAddress(ev.Answers, func(ipAddr string) {
addrs := NewSet(utilsResolvedAddresses(ev.Answers)...)
for _, ipAddr := range addrs.Keys() {
// create the record
obs := &WebObservation{
DNSTransactionID: optional.Some(ev.TransactionID),
DNSDomain: optional.Some(ev.Hostname),
DNSLookupFailure: optional.Some(""),
DNSQueryType: optional.Some(ev.QueryType),
DNSEngine: optional.Some(ev.Engine),
DNSResolvedAddrs: optional.Some(addrs),
IPAddress: optional.Some(ipAddr),
IPAddressASN: utilsGeoipxLookupASN(ipAddr),
IPAddressBogon: optional.Some(netxlite.IsBogon(ipAddr)),
Expand All @@ -318,7 +326,7 @@ func (c *WebObservationsContainer) ingestDNSLookupSuccesses(evs ...*model.Archiv
if _, found := c.knownIPAddresses[ipAddr]; !found {
c.knownIPAddresses[ipAddr] = obs
}
})
}
}
}

Expand All @@ -345,6 +353,7 @@ func (c *WebObservationsContainer) IngestTCPConnectEvents(evs ...*model.Archival
DNSTransactionID: obs.DNSTransactionID,
DNSDomain: obs.DNSDomain,
DNSLookupFailure: obs.DNSLookupFailure,
DNSResolvedAddrs: obs.DNSResolvedAddrs,
IPAddress: obs.IPAddress,
IPAddressASN: obs.IPAddressASN,
IPAddressBogon: obs.IPAddressBogon,
Expand Down Expand Up @@ -426,7 +435,10 @@ func (c *WebObservationsContainer) IngestControlMessages(req *model.THRequest, r
}

func (c *WebObservationsContainer) controlXrefDNSQueries(inputDomain string, resp *model.THResponse) {
for _, obs := range c.DNSLookupFailures {
var observations []*WebObservation
observations = append(observations, c.DNSLookupFailures...)
observations = append(observations, c.DNSLookupSuccesses...)
for _, obs := range observations {
// skip cases where the domain is different
if obs.DNSDomain.Unwrap() != inputDomain {
continue
Expand All @@ -435,8 +447,14 @@ func (c *WebObservationsContainer) controlXrefDNSQueries(inputDomain string, res
// register the corresponding DNS domain used by the control
obs.ControlDNSDomain = optional.Some(inputDomain)

// register the corresponding DNS lookup failure
// register the corresponding DNS lookup failure and skip in such a case
obs.ControlDNSLookupFailure = optional.Some(utilsStringPointerToString(resp.DNS.Failure))
if resp.DNS.Failure != nil {
continue
}

// register the resolved IP addresses
obs.ControlDNSResolvedAddrs = optional.Some(NewSet(resp.DNS.Addrs...))
}
}

Expand Down Expand Up @@ -467,6 +485,9 @@ func (c *WebObservationsContainer) controlMatchDNSLookupResults(inputDomain stri
// handle the case in which the IP address has been provided by the control, which
// is a case where the domain is empty and the IP address is in thAddrMap
if domain == "" && thAddrMap[addr] {
obs.ControlDNSDomain = optional.Some(inputDomain)
obs.ControlDNSLookupFailure = optional.Some(utilsStringPointerToString(resp.DNS.Failure))
obs.ControlDNSResolvedAddrs = optional.Some(NewSet(resp.DNS.Addrs...))
obs.MatchWithControlIPAddress = optional.Some(true)
obs.MatchWithControlIPAddressASN = optional.Some(true)
continue
Expand All @@ -478,14 +499,17 @@ func (c *WebObservationsContainer) controlMatchDNSLookupResults(inputDomain stri
}

// register the control DNS domain
obs.ControlDNSDomain = optional.Some(domain)
obs.ControlDNSDomain = optional.Some(inputDomain)

// register whether the control failed and skip in such a case
obs.ControlDNSLookupFailure = optional.Some(utilsStringPointerToString(resp.DNS.Failure))
if resp.DNS.Failure != nil {
continue
}

// register the resolved IP addresses
obs.ControlDNSResolvedAddrs = optional.Some(NewSet(resp.DNS.Addrs...))

// compute whether also the TH observed this addr
obs.MatchWithControlIPAddress = optional.Some(thAddrMap[addr])

Expand Down
79 changes: 79 additions & 0 deletions internal/minipipeline/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package minipipeline

import (
"encoding/json"
"sort"
)

// Set is a set containing keys with pretty JSON serialization
// and deserialization rules and a valid zero value.
type Set[T ~string | ~int64] struct {
state map[T]bool
}

var (
_ json.Marshaler = Set[int64]{}
_ json.Unmarshaler = &Set[int64]{}
)

// NewSet creates a new set containing the given keys.
func NewSet[T ~string | ~int64](keys ...T) Set[T] {
var sx Set[T]
sx.Add(keys...)
return sx
}

// Add adds the given key to the set.
func (sx *Set[T]) Add(keys ...T) {
if sx.state == nil {
sx.state = make(map[T]bool)
}
for _, key := range keys {
sx.state[key] = true
}
}

// Len returns the number of keys inside the set.
func (sx Set[T]) Len() int {
return len(sx.state)
}

// Remove removes the given key from the set.
func (sx Set[T]) Remove(keys ...T) {
for _, key := range keys {
delete(sx.state, key)
}
}

// Keys returns the keys.
func (sx Set[T]) Keys() []T {
keys := []T{}
for entry := range sx.state {
keys = append(keys, entry)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
return keys
}

// MarshalJSON implements json.Marshaler.
func (sx Set[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(sx.Keys())
}

// UnmarshalJSON implements json.Unmarshaler.
func (sx *Set[T]) UnmarshalJSON(data []byte) error {
var keys []T
if err := json.Unmarshal(data, &keys); err != nil {
return err
}
sx.Add(keys...)
return nil
}

// Contains returns whether the set contains a key.
func (sx *Set[T]) Contains(key T) bool {
_, found := sx.state[key]
return found
}
Loading

0 comments on commit 2a8aeac

Please sign in to comment.