Skip to content

Commit

Permalink
feat(reports) change reporting to TLS (#2089)
Browse files Browse the repository at this point in the history
Changes the reporting system to use TLS instead of UDP. Connections are
no longer part of the reporter, and are instead scoped to the individual
runs of send().

Add test utility scaffolding to run a TLS server.
  • Loading branch information
Travis Raines authored and rainest committed Jan 19, 2022
1 parent fa1b111 commit 69c525c
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 61 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
- [0.0.5](#005---20180602)
- [0.0.4 and prior](#004-and-prior)

## [1.3.3] - 2021/10/01
## [1.3.3]

> Release date: 2021/10/01
#### Fixed

Expand Down
47 changes: 22 additions & 25 deletions pkg/util/reports.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"crypto/tls"
"net"
"strconv"
"time"
Expand All @@ -10,8 +11,10 @@ import (

var (
reportsHost = "kong-hf.konghq.com"
reportsPort = 61829
reportsPort = 61833
pingInterval = 3600
tlsConf = tls.Config{MinVersion: tls.VersionTLS13}
dialer = net.Dialer{Timeout: time.Second * 30}
)

const (
Expand All @@ -34,12 +37,11 @@ type Reporter struct {
Info Info

serializedInfo string
conn *net.UDPConn

Logger logrus.FieldLogger
}

func (r *Reporter) once() error {
func (r *Reporter) once() {
var serializedInfo string
serializedInfo = serializedInfo + "v=" + r.Info.KICVersion + ";"
serializedInfo = serializedInfo + "k8sv=" + r.Info.KubernetesVersion + ";"
Expand All @@ -48,28 +50,11 @@ func (r *Reporter) once() error {
serializedInfo = serializedInfo + "id=" + r.Info.ID + ";"
serializedInfo = serializedInfo + "hn=" + r.Info.Hostname + ";"
r.serializedInfo = serializedInfo

addr, err := net.ResolveUDPAddr("udp", reportsHost+":"+
strconv.Itoa(reportsPort))
if err != nil {
return err
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return err
}
r.conn = conn
return nil
}

// Run starts the reporter. It will send reports until done is closed.
func (r Reporter) Run(done <-chan struct{}) {
err := r.once()
if err != nil {
r.Logger.Errorf("failed to initialize reporter: %s", err)
return
}
defer r.conn.Close()
r.once()

r.sendStart()
ticker := time.NewTicker(time.Duration(pingInterval) * time.Second)
Expand All @@ -85,20 +70,32 @@ func (r Reporter) Run(done <-chan struct{}) {
}
}

func (r Reporter) sendStart() {
func (r *Reporter) sendStart() {
signal := prd + "-start"
r.send(signal, 0)
}

func (r Reporter) sendPing(uptime int) {
func (r *Reporter) sendPing(uptime int) {
signal := prd + "-ping"
r.send(signal, uptime)
}

func (r Reporter) send(signal string, uptime int) {
func (r *Reporter) send(signal string, uptime int) {
message := "<14>signal=" + signal + ";uptime=" +
strconv.Itoa(uptime) + ";" + r.serializedInfo
_, err := r.conn.Write([]byte(message))
conn, err := tls.DialWithDialer(&dialer, "tcp", net.JoinHostPort(reportsHost,
strconv.FormatUint(uint64(reportsPort), 10)), &tlsConf)
if err != nil {
r.Logger.Errorf("failed to connect to reporting server: %s", err)
return
}
err = conn.SetDeadline(time.Now().Add(time.Minute))
if err != nil {
r.Logger.Errorf("failed to set report connection deadline: %s", err)
return
}
defer conn.Close()
_, err = conn.Write([]byte(message))
if err != nil {
r.Logger.Errorf("failed to send report: %s", err)
}
Expand Down
194 changes: 159 additions & 35 deletions pkg/util/reports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package util

import (
"bytes"
"context"
"crypto/tls"
"net"
"os"
"strconv"
Expand All @@ -12,9 +14,65 @@ import (
"github.com/stretchr/testify/assert"
)

type TLSPair struct {
Key, Cert string
}

var (
reportTestTLSCert = TLSPair{
Cert: `-----BEGIN CERTIFICATE-----
MIIC2DCCAcACCQC32eFOsWpKojANBgkqhkiG9w0BAQsFADAuMRcwFQYDVQQDDA5z
ZWN1cmUtZm9vLWJhcjETMBEGA1UECgwKa29uZ2hxLm9yZzAeFw0xODEyMTgyMTI4
MDBaFw0xOTEyMTgyMTI4MDBaMC4xFzAVBgNVBAMMDnNlY3VyZS1mb28tYmFyMRMw
EQYDVQQKDAprb25naHEub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
AQEAqhl/HSwV6PbMv+cMFU9X+HuM7QbNNPh39GKa4pkxzFgiAnuuJ4jw9V/bzsEy
S+ZIyjzo+QKB1LzmgdcX4vkdI22BjxUd9HPHdZxtv3XilbNmSk9UOl2Hh1fORJoS
7YH+VbvVwiz5lo7qKRepbg/jcKkbs6AUE0YWFygtDLTvhP2qkphQkxZ0m8qroW91
CWgI73Ar6U2W/YQBRI3+LwtsKo0p2ASDijvqxElQBgBIiyGIr0RZc5pkCJ1eQdDB
2F6XaMfpeEyBj0MxypNL4S9HHfchOt55J1KOzYnUPkQnSoxp6oEjef4Q/ZCj5BRL
EGZnTb3tbwzHZCxGtgl9KqO9pQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQAKQ5BX
kkBL+alERL31hsOgWgRiUMw+sPDtRS96ozUlPtVvAg9XFdpY4ldtWkxFcmBnhKzp
UewjrHkf9rR16NISwUTjlGIwaJu/ACQrY15v+r301Crq2DV+GjiUJFVuT495dp/l
0LZbt2Sh/uD+r3UNTcJpJ7jb1V0UP7FWXFj8oafsoFSgmxAPjpKQySTC54JK4AYb
QSnWu1nQLyohnrB9qLZhe2+jOQZnkKuCcWJQ5njvU6SxT3SOKE5XaOZCezEQ6IVL
U47YCCXsq+7wKWXBhKl4H2Ztk6x3HOC56l0noXWezsMfrou/kjwGuuViGnrjqelS
WQ7uVeNCUBY+l+qY
-----END CERTIFICATE-----`,
Key: `-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCqGX8dLBXo9sy/
5wwVT1f4e4ztBs00+Hf0YprimTHMWCICe64niPD1X9vOwTJL5kjKPOj5AoHUvOaB
1xfi+R0jbYGPFR30c8d1nG2/deKVs2ZKT1Q6XYeHV85EmhLtgf5Vu9XCLPmWjuop
F6luD+NwqRuzoBQTRhYXKC0MtO+E/aqSmFCTFnSbyquhb3UJaAjvcCvpTZb9hAFE
jf4vC2wqjSnYBIOKO+rESVAGAEiLIYivRFlzmmQInV5B0MHYXpdox+l4TIGPQzHK
k0vhL0cd9yE63nknUo7NidQ+RCdKjGnqgSN5/hD9kKPkFEsQZmdNve1vDMdkLEa2
CX0qo72lAgMBAAECggEADxMTYNJ3Xp4Ap0EioQDXGv5YDul7ZiZe+xmCAHLzJtjo
qq+rT3WjZRuJr1kPzAosiT+8pdTDDMdw5jDZvRO2sV0TDksgzHk2RAYI897OpdWw
SwWcwU9oo2X0sb+1zbang5GR8BNsSxt/RQUDzu05itJx0gltvgeIDaVR2L5wO6ja
USa8OVuj/92XtIIve9OtyK9jAzgR6LQOTFrCCEv89/vmy5Bykv4Uz8s8swZmTs3v
XJmAmruHGuSLMfXk8lBRp/gVyNTi3uMsdph5AJbVKnra5TZLguEozZKbLdNUYk0p
+aAc7rxDcH2sPqa/7DwRvei9dvd5oB3VJlxGVgC8AQKBgQDfznRSSKAD15hoSDzt
cKNyhLgWAL+MD0jhHKUy3x+Z9OCvf0DVnmru5HfQKq5UfT0t8VTRPGKmOtAMD4cf
LYjIurvMvpVzQGSJfhtHQuULZTh3dfsM7xivMqSV+9txklMAakM7vGQlOQxhrScM
21Mp5LWDU6+e2pFCrQPop0IPkQKBgQDCkVE+dou2yFuJx3uytCH1yKPSy9tkdhQH
dGF12B5dq8MZZozAz5P9YN/COa9WjsNKDqWbEgLEksEQUq4t8SBjHnSV/D3x7rEF
qgwii0GETYxax6gms8nueIqWZQf+0NbX7Gc5mTqeVb7v3TrhsKr0VNMFRXXQwP2E
M/pxJq8q1QKBgQC3rH7oXLP+Ez0AMHDYSL3LKULOw/RvpMeh/9lQA6+ysTaIsP3r
kuSdhCEUVULXEiVYhBug0FcBp3jAvSmem8cLPb0Mjkim2mzoLfeDJ1JEZODPoaLU
fZEbj4tlj9oLvhOiXpMo/jaOGeCgdPN8aK86zXlt+wtBao0WVFnF4SalEQKBgQC1
uLfi2SGgs/0a8B/ORoO5ZY3s4c2lRMtsMvyb7iBeaIAuByPLKZUVABe89deXxnsL
fiaacPX41wBO2IoqCp2vNdC6DP9mKQNZQPtYgCvPAAbo+rVIgH9HpXn7AZ24FyGy
RfAbUcv3+in9KelGxZTF4zu8HqXtNXMSuOFeMT1FiQKBgF0R+IFDGHhD4nudAQvo
hncXsgyzK6QUzak6HmFji/CMZ6EU9q6A67JkiEWrYoKqIAKZ2Og8+Eucr/rDdGWc
kqlmLPBJAJeUsP/9KidBjTE5mIbn/2n089VPMBvnlt2xIcuB6+zrf2NjvlcZEyKS
Gn+T2uCyOP4a1DTUoPyoNJXo
-----END PRIVATE KEY-----`,
}
)

func TestMain(m *testing.M) {
reportsHost = "localhost"
pingInterval = 1
tlsConf = tls.Config{InsecureSkipVerify: true} //nolint:gosec
os.Exit(m.Run())
}

Expand All @@ -32,7 +90,15 @@ func TestReporterOnce(t *testing.T) {
Info: info,
Logger: logrus.New(),
}
assert.Nil(reporter.once())
reqs := make(chan []byte)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
listener, err := getTLSListener()
assert.Nil(err)
defer listener.Close()
go runTestTLSServer(ctx, t, listener, reqs)

reporter.once()
want := "v=kic.version;k8sv=k8s.version;kv=kong.version;db=off;" +
"id=6acb7447-eedf-4815-a193-d714c5108f7b;hn=example.local;"
assert.Equal(want, reporter.serializedInfo)
Expand All @@ -52,24 +118,27 @@ func TestReporterSendStart(t *testing.T) {
Info: info,
Logger: logrus.New(),
}
assert.Nil(reporter.once())
addr, err := net.ResolveUDPAddr("udp", reportsHost+
":"+strconv.Itoa(reportsPort))
assert.Nil(err)
conn, err := net.ListenUDP("udp", addr)

reqs := make(chan []byte)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
listener, err := getTLSListener()
assert.Nil(err)
defer conn.Close()
defer listener.Close()
go runTestTLSServer(ctx, t, listener, reqs)

reporter.once()

reporter.sendStart()

buffer := make([]byte, 1024)
n, _, err := conn.ReadFromUDP(buffer)
serialized := "<14>signal=kic-start;uptime=0;v=kic.version;" +
"k8sv=k8s.version;kv=kong.version;db=off;" +
"id=6acb7447-eedf-4815-a193-d714c5108f7b;hn=example.local;"
assert.Equal(len(serialized), n)
assert.Nil(err)
assert.Equal(serialized, string(bytes.Trim(buffer, "\x00")))
received, ok := <-reqs
assert.True(ok)
short := string(bytes.Trim(received, "\x00"))
assert.Equal(len(serialized), len(short))
assert.Equal(serialized, short)
}

func TestReporterSendPing(t *testing.T) {
Expand All @@ -86,24 +155,27 @@ func TestReporterSendPing(t *testing.T) {
Info: info,
Logger: logrus.New(),
}
assert.Nil(reporter.once())
addr, err := net.ResolveUDPAddr("udp", reportsHost+
":"+strconv.Itoa(reportsPort))
assert.Nil(err)
conn, err := net.ListenUDP("udp", addr)

reqs := make(chan []byte)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
listener, err := getTLSListener()
assert.Nil(err)
defer conn.Close()
defer listener.Close()
go runTestTLSServer(ctx, t, listener, reqs)

reporter.once()

reporter.sendPing(42)

buffer := make([]byte, 1024)
n, _, err := conn.ReadFromUDP(buffer)
serialized := "<14>signal=kic-ping;uptime=42;v=kic.version;" +
"k8sv=k8s.version;kv=kong.version;db=off;" +
"id=6acb7447-eedf-4815-a193-d714c5108f7b;hn=example.local;"
assert.Equal(len(serialized), n)
assert.Nil(err)
assert.Equal(serialized, string(bytes.Trim(buffer, "\x00")))
received, ok := <-reqs
assert.True(ok)
short := string(bytes.Trim(received, "\x00"))
assert.Equal(len(serialized), len(short))
assert.Equal(serialized, short)
}

func TestReporterRun(t *testing.T) {
Expand All @@ -120,13 +192,16 @@ func TestReporterRun(t *testing.T) {
Info: info,
Logger: logrus.New(),
}
assert.Nil(reporter.once())
addr, err := net.ResolveUDPAddr("udp", reportsHost+
":"+strconv.Itoa(reportsPort))
assert.Nil(err)
conn, err := net.ListenUDP("udp", addr)

reqs := make(chan []byte)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
listener, err := getTLSListener()
assert.Nil(err)
defer conn.Close()
defer listener.Close()
go runTestTLSServer(ctx, t, listener, reqs)

reporter.once()
done := make(chan struct{})

var wg sync.WaitGroup
Expand All @@ -152,14 +227,63 @@ func TestReporterRun(t *testing.T) {
"id=6acb7447-eedf-4815-a193-d714c5108f7b;hn=example.local;",
}
for _, expect := range serializedContent {
buffer := make([]byte, 1024)
n, _, err := conn.ReadFromUDP(buffer)
assert.Equal(len(expect), n)
assert.Nil(err)
assert.Equal(expect, string(bytes.Trim(buffer, "\x00")))

received, ok := <-reqs
assert.True(ok)
short := string(bytes.Trim(received, "\x00"))
assert.Equal(len(expect), len(short))
assert.Equal(expect, short)
}
close(done)
}()
wg.Wait()
}

// getTLSListener builds a TLS listener using the test certificates
func getTLSListener() (net.Listener, error) {
testCertificate, err := tls.X509KeyPair([]byte(reportTestTLSCert.Cert), []byte(reportTestTLSCert.Key))
if err != nil {
return nil, err
}
conf := &tls.Config{
Certificates: []tls.Certificate{
testCertificate,
},
MinVersion: tls.VersionTLS13,
}
listen, err := tls.Listen("tcp", net.JoinHostPort(reportsHost, strconv.FormatUint(uint64(reportsPort), 10)), conf)
if err != nil {
return nil, err
}
return listen, nil
}

// runTLSServer creates a new test TLS server for the reporting system. It accepts connections using the provided
// listener and sends all requests it receives over the reqs channel
func runTestTLSServer(ctx context.Context, t *testing.T, listen net.Listener, reqs chan []byte) {
defer close(reqs)
for {
select {
case <-ctx.Done():
listen.Close()
return
default:
conn, err := listen.Accept()
if err != nil {
// we expect "use of closed network connection" when the test ends, since it will be blocked on accept
t.Logf("could not accept TLS connection: %v", err)
return
}
go handleConnection(reqs, conn)
}
}
}

func handleConnection(reqs chan []byte, conn net.Conn) {
defer conn.Close()
buffer := make([]byte, 1024)
_, err := conn.Read(buffer)
if err != nil {
close(reqs)
}
reqs <- buffer
}

0 comments on commit 69c525c

Please sign in to comment.