Skip to content

Commit

Permalink
support for random port and add GetAddress for collector
Browse files Browse the repository at this point in the history
  • Loading branch information
zyiou committed Dec 1, 2020
1 parent 02e0484 commit 7ea9f81
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 71 deletions.
8 changes: 8 additions & 0 deletions pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (cp *CollectingProcess) Stop() {
}
}

func (cp *CollectingProcess) GetAddress() net.Addr {
return cp.address
}

func (cp *CollectingProcess) GetMsgChan() chan *entities.Message {
return cp.messageChan
}
Expand Down Expand Up @@ -290,6 +294,10 @@ func (cp *CollectingProcess) deleteTemplate(obsDomainID uint32, templateID uint1
delete(cp.templatesMap[obsDomainID], templateID)
}

func (cp *CollectingProcess) updateAddress(address net.Addr) {
cp.address = address
}

// getMessageLength returns buffer length by decoding the header
func getMessageLength(msgBuffer *bytes.Buffer) (int, error) {
packet := entities.Message{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ func (cp *CollectingProcess) startTCPServer() {
klog.Errorf("Cannot start tls collecting process on %s: %v", cp.address.String(), err)
return
}
cp.updateAddress(listener.Addr())
klog.Infof("Start tls collecting process on %s", cp.address.String())
} else {
listener, err = net.Listen("tcp", cp.address.String())
if err != nil {
klog.Errorf("Cannot start collecting process on %s: %v", cp.address.String(), err)
return
}
cp.updateAddress(listener.Addr())
klog.Infof("Start %s collecting process on %s", cp.address.Network(), cp.address.String())
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (cp *CollectingProcess) startUDPServer() {
klog.Error(err)
return
}
cp.updateAddress(listener.Addr())
klog.Infof("Start dtls collecting process on %s", cp.address.String())
conn, err = listener.Accept()
if err != nil {
Expand Down Expand Up @@ -90,6 +91,7 @@ func (cp *CollectingProcess) startUDPServer() {
klog.Error(err)
return
}
cp.updateAddress(conn.LocalAddr())
klog.Infof("Start %s collecting process on %s", cp.address.Network(), cp.address.String())
var wg sync.WaitGroup
defer conn.Close()
Expand Down
37 changes: 27 additions & 10 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func (a *AggregationProcess) Stop() {

// AggregateMsgByFlowKey gets flow key from records in message and stores in cache
func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error {
addOriginalExporterInfo(message)
err := addOriginalExporterInfo(message)
if err != nil {
return err
}
if message.Set.GetSetType() == entities.Template { // skip template records
return nil
}
Expand Down Expand Up @@ -270,21 +273,35 @@ func addOriginalExporterInfo(message *entities.Message) error {
set := message.Set
records := set.GetRecords()
for _, record := range records {
var originalExporterIPv4Address, originalObservationDomainId *entities.InfoElementWithValue

// Add originalExporterIPv4Address
ie, err := registry.GetInfoElement("originalExporterIPv4Address", registry.IANAEnterpriseID)
if err != nil {
return fmt.Errorf("IANA Registry is not loaded correctly with originalExporterIPv4Address.")
var originalExporterAddress, originalObservationDomainId *entities.InfoElementWithValue
var ie *entities.InfoElement
var address net.IP
var err error
// Add original exporter address
if net.ParseIP(message.ExportAddress).To4() != nil { // IPv4
ie, err = registry.GetInfoElement("originalExporterIPv4Address", registry.IANAEnterpriseID)
if err != nil {
return fmt.Errorf("IANA Registry is not loaded correctly with originalExporterIPv4Address.")
}
address = net.ParseIP(message.ExportAddress).To4()
} else if net.ParseIP(message.ExportAddress).To16() != nil { // IPv6
ie, err = registry.GetInfoElement("originalExporterIPv6Address", registry.IANAEnterpriseID)
if err != nil {
return fmt.Errorf("IANA Registry is not loaded correctly with originalExporterIPv6Address.")
}
address = net.ParseIP(message.ExportAddress).To16()
} else {
return fmt.Errorf("original exporter address is not in correct format.")
}
if set.GetSetType() == entities.Template {
originalExporterIPv4Address = entities.NewInfoElementWithValue(ie, nil)
originalExporterAddress = entities.NewInfoElementWithValue(ie, nil)
} else if set.GetSetType() == entities.Data {
originalExporterIPv4Address = entities.NewInfoElementWithValue(ie, net.ParseIP(message.ExportAddress))
originalExporterAddress = entities.NewInfoElementWithValue(ie, address)
} else {
return fmt.Errorf("Set type %d is not supported.", set.GetSetType())
}
_, err = record.AddInfoElement(originalExporterIPv4Address, false)

_, err = record.AddInfoElement(originalExporterAddress, false)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func TestGetTupleRecordMap(t *testing.T) {
}

func TestAggregateMsgByFlowKey(t *testing.T) {
registry.LoadRegistry()
messageChan := make(chan *entities.Message)
aggregationProcess, _ := InitAggregationProcess(messageChan, 2, fields)
// Template records should be ignored
Expand Down
16 changes: 16 additions & 0 deletions pkg/test/collector_intermediate_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Copyright 2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build integration

package test

import (
Expand Down
123 changes: 62 additions & 61 deletions pkg/test/exporter_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,120 +32,120 @@ import (

const (
fakeKey = `-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCvTekfTcktH3bp
sB+pRW9B9OqtjmXumWKLsKJq0MxA0gUuRfKr3dc5uKexk2HDM/gTCEMhDSe+SrAF
PNE6oIb69us8V53XB1AxCQM1G2gZB277Glaw/3o0fxSOXxGYnYO7ac44rrjudqMl
Tp7DPoQaa0rp00G6eBuzOewUmSxj/i5p5t+i8s5kj5ny014NcXAoVGeec0lI35qp
+/gda3u+E70BgKxCxaF9bE0DQmE0GClzSKULclV+UBCuoCCgU2iyajVMsUNapelt
vJC+qjHEpsTGGzSsb0LTCktjSQRooYYkMccmafLpTDhEa0Qmt2L8ilwlxg6c1PRv
XE25qncPAgMBAAECggEBAJE/z6GFVOPTRza3HHSnOFkA8hVdgC2i31j4wIoaeLJY
kbxWboxiofqMej2S7RTNEYXLebt/5+cugQvF6WJXMZ/tSNlVi01oHNSUMBknnSfn
1deuahf7hijLBqA0OyMll8mIEDs84bOLjv/RVZBWUySEs6xrwvEapXDp1Cb5ByPN
T1iGZ3chcOgGPX6MTq9+P4yREREQXjPZ9uKSiLqQg2rVg/j4sC/iPgiE/nSShPIk
gpOW3kgUuiYGsTQSJ2YIyr81MEgudmUCnJbu/5P8dqtHiqmHOW1psirwVB7xCow3
h8JBuxz2jHTqnsAfXwWdmvZyXvAycR+9/t9CCGwee3kCgYEA1ozhdC5h6MfyaagP
9Hl0i8Jlh6r1WVMXLpPy0pQGPnw1JJUHHiEIU4Yp/tzO+DHOSe2mvKLGrsNIRH89
Vh0maStI26brPyiw7w5hjelxrJ/zH0UdWzWxbZ8HRNh8F3WGoXkGoaLRMQUfYvOI
lT/HlOSmyl9UCByzU7sq5bkIU50CgYEA0SwFyGX/rpBC7YWpe1VsLBF8GSat9SUc
UAXn0/6x4eOvLtdPk67HrnU3FIvV376HuTY5hCC2sQTJ+cxzhAj3cpbJjOpjlJZj
nAYrVNAQHmgynKjCNP8v2W8LQbi39UPE5Zf6dphFbpgQgqYqMQV0iIWRv4WKJKAD
w3GMwB6pA5sCgYEAlHT/PAksLorMLlfgUmYIQvzMjEe7ZYedLtmo2BUdDPedPibw
ueRZgpH/VR8tB4hPGdCb40Mu/5aY1uzEYGXjQjp1O6gQd6+MXp4w2qWBxtUWwbht
S8OndhboTLcPhpwIAItiD04+OhE1Wp7xD3UGgPyGfNnhp4tUese0MykJnfECgYEA
ok8MtbIgMq6SoIjFOITSiWeP6lxPRBhl3dqXR7MtCOGKQEim4SwQmlkuQm03qoTI
AHoJK3PPD5FtwL5bLKtgh7Rl9UizuMrxxFItMYS53T5xd4qkGEekM46tJ3RUmqbZ
lGbX3UrPJcAtn5Oczak0AfPTYtAWn9Di2rezxiiEcd0CgYA0RSCk8XgtZxAoPQJC
Y2PJ6FHlSLMtDhsAsUtD+mXlt8+o+tyMG7ZysQZKHsjDMzEZZRK7F8W9+xzzl1fa
Ok+B9v1BFakMXRc5zcA8XH1ng9Ml2DfVYPXxwmaMsGPnwPZsftUJPNbArS60vJJh
w9ajWgCA6SGtD17ZpHfgIiMvhA==
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDz3PD8TwjDfQg2
IiBVZMZT1RJLZwZZXWYkukWfhx7Zp1iJeAX1ExSEcnq1b9Hc1cP0PXsw0eoqXomH
tAimX/k0d6IqLHTiB7fFJF2cuAjQbMBoGxN5v+0elJOBCwAaY8E0iwPwuKa4VFua
6rSqSfkhQ4FQqaUqhy6eqp3vD/ycHhje1VX3S66hwGMaDLi2trnm88fjm81I1A0d
bwPRPO0246m2loG7xB8RYfMLgCLzhOQLz2Uow0HfjbJQ+IlDQ3uuWdzr46nqQaes
T6ULLz8N560+rjQD0Dg3GGeqaHEP7OaLmVQBqtsj0GroD7xscDKkgq9fnW9VuGjZ
2tKElip5AgMBAAECggEBAO/NrnSOS7Hg+/gvqtgOVzEM8AaR8x5hyBYJznlHaEDk
XR4hlsoezyhHYv+UTCz7UMyWwNOLONgdSuTVV0Q0UF0V37PVL8Mtj7sfPablGlXK
+5HkPkyVPVm7BSn6ZUmOGunOYjuPePL+kW5PqwVh5MifF0T47eBaOq/wW4pAkEn7
KzHcVEy08VnaK3XIC7lS9f5o/L9KSlZQU15TUAjFZ/kn4phzsR/z5462qKl9obYQ
gOqtK5ZA4ZqTX0nbzaqM20rhZEv+FkDWmSzcAaGfjvPtjFUmrChhpVSRqtcxlgEK
qwVM01i8t/E8p0Y4/2+GSDs4yKFeJNVuQ9AsLdhpJRkCgYEA//C+d/ltVhQtKaoA
dJy+AESTaVWS5IUmcE2x4fu9vyDR/U6FyPUmU4imwO4wS0O8jLdExjtb2kaIiGth
b3cmoUe77Xpi+iLSIAVk27QYWlnAQY/VkAZZZtY3JS75jdo4TShiafYq4vb0Gux7
jDkh+VW2mTRpp0if88y/61HPH8cCgYEA8+t6ON3CY3U19wtoQEA01kLk13B182ps
7beRidxW37ndOVXGj27xeEuzHlj/ry+kh+AOkHRbiz1RvhdX2POotgwX0Tdq/SXv
CIYsL02ZS1m33GwrvoVfsTOpvKrQMlX7WpXYpghagf2y6TEJiCtWlSROJhje9K2I
eL5jwV8g478CgYEAiYNEExoE0NcOXPBmRkFhJKuzuEiuH/IacQSNqqmjjWmI6dyi
rRJqgT9OuSJA+G9wgvqFDS0fcOust/9Z3pXaP5VXN4UmYNcMpv++7PyaiRDn51Hs
oPGIX2SBRI00sC6rSWmFVwFYkZG2HjEpQHIB+wE+lpo+mg6/QjKkez79VkkCgYAP
BnFX8WkZAU5asmwwkQPwMtyv3LCXVvXwyr7/VABR9bwH3R3HFhlvxJH7C5Zsby3e
ZNHg2hoNgLB5WizCI3hABoytCZHgmCaaStGL9Ga9+n/V5x/ms4aKftk00vzSLPO3
x8U5rQgOO9d6f9fLeIfz1fGubRfG0K24alnwvnBjNwKBgDtu5OUrOtAW4pZVj5rs
YK1g7EZMreBtU1ox80vhU7QT707EI2LfI7J9MGfEHcstjl868Z8JBIyjyEmAJxYv
08oooxSfn83rnXzn+XFaQKSuAYzbQXulKh3DGucZY1n8tPT9I+jbtGvGBj94FuBQ
SuTFwBux90EAGPE4SyuCHgYo
-----END PRIVATE KEY-----
`
fakeCert = `-----BEGIN CERTIFICATE-----
MIIDhjCCAm6gAwIBAgIJAP3U+C7liWf8MA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV
MIIDhjCCAm6gAwIBAgIJAJPXwf3JizlIMA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV
BAYTAlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2Vs
Zi1zaWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1z
aWduZWQgY2VydGlmaWNhdGUwHhcNMjAxMTA1MDU0NjUyWhcNMjIxMTA1MDU0NjUy
aWduZWQgY2VydGlmaWNhdGUwHhcNMjAxMjAxMDQyNjA0WhcNMjIxMjAxMDQyNjA0
WjB4MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAe
BgNVBAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4w
LjE6IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAr03pH03JLR926bAfqUVvQfTqrY5l7plii7CiatDMQNIFLkXy
q93XObinsZNhwzP4EwhDIQ0nvkqwBTzROqCG+vbrPFed1wdQMQkDNRtoGQdu+xpW
sP96NH8Ujl8RmJ2Du2nOOK647najJU6ewz6EGmtK6dNBungbsznsFJksY/4uaebf
ovLOZI+Z8tNeDXFwKFRnnnNJSN+aqfv4HWt7vhO9AYCsQsWhfWxNA0JhNBgpc0il
C3JVflAQrqAgoFNosmo1TLFDWqXpbbyQvqoxxKbExhs0rG9C0wpLY0kEaKGGJDHH
Jmny6Uw4RGtEJrdi/IpcJcYOnNT0b1xNuap3DwIDAQABoxMwETAPBgNVHREECDAG
hwQAAAAAMA0GCSqGSIb3DQEBCwUAA4IBAQAE6/mSUMVerL8B3Xs2+3YVmhd94Ql5
ZKLwmEhsvOhP/3KRSncA8bIr4ZGCyvyEgsJqktjHJ4OYUIw3auYOBZgnUe3kM4NI
H7SS1JEtMu7okoXL/zHZcNrGHslFoEnIzvtoooSTQglcHclo8NWnGng6nJkSsY7w
DivAX9M7xtyKvGFgh6HuKYSZ3Yd6DeCkpnL2aOXf7cmFk4FT3SIbrtLNsLetbPl3
rsA9pUDwTYRP8PDOLC3BKyDl84Dpb8JScqVpBMDRBW1dre0emORlh17JllyhA+9b
fKNX/D1XinAd/OftM5gYBWs7M6uZTm7JxMCvA2kckoN7B+BdrzisxTUR
AQ8AMIIBCgKCAQEA89zw/E8Iw30INiIgVWTGU9USS2cGWV1mJLpFn4ce2adYiXgF
9RMUhHJ6tW/R3NXD9D17MNHqKl6Jh7QIpl/5NHeiKix04ge3xSRdnLgI0GzAaBsT
eb/tHpSTgQsAGmPBNIsD8LimuFRbmuq0qkn5IUOBUKmlKocunqqd7w/8nB4Y3tVV
90uuocBjGgy4tra55vPH45vNSNQNHW8D0TztNuOptpaBu8QfEWHzC4Ai84TkC89l
KMNB342yUPiJQ0N7rlnc6+Op6kGnrE+lCy8/DeetPq40A9A4NxhnqmhxD+zmi5lU
AarbI9Bq6A+8bHAypIKvX51vVbho2drShJYqeQIDAQABoxMwETAPBgNVHREECDAG
hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQBninXJdEqqsJtqJKwog5X/EY7+Qgxh
f4Ye7cI3JvGuH1InmZkLmgJAtRy5MDo64GZmPjKJsr4a5M+DARFcRnoFhqo7uXoS
M3D5l9hbUUrXDrQzTww+6ga2wwZEFaOkzNLaTZ2bw01vVM3Xpqn5y0N4+7fTw6Ka
M2xhaFYgJzDy9nQjmTn2F3G8s27xQ3ebAy/KcNGJJwf3vFC76i76sa78jWYmGOO2
hyHgFXOE2+9q4J364PTipSwLgFr7lz4CF+OCzttpyFqB34c54atCQ7ste/NXysIO
F90n06ISWkonCXiZ09r3ooH5py6GBFf15ZKrBzetZN+aSJcBjHXasQrA
-----END CERTIFICATE-----
`
fakeKey2 = `-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQg1h0K9jGfyBQMttaz
ija4rnsXfTQf1KvXl2o9SABhtvmhRANCAAQnICXGTyc72J2mpIgbZz3mvgmqUzGJ
FaU0IQHwImuqwIjbsJtnj6XgozycBwTPGPkuQeyKp3k3ADE7UOCqsSOH
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgnOocXNDRcH6BZ86v
4GwZF6JiqYbF7bxrssfJ/Ge0jbihRANCAARX5PJ6Za+ZkYvliOtKO2fCbJG07/Pw
nrBDHZzPbrdW0TJNZ9psQjj0dgG15/Jimn1YnnSYF0g153EEtFmeTk72
-----END PRIVATE KEY-----
`
fakeCert2 = `-----BEGIN CERTIFICATE-----
MIIB+jCCAaCgAwIBAgIJALfqenQRnGoHMAoGCCqGSM49BAMCMHgxCzAJBgNVBAYT
MIIB+jCCAaCgAwIBAgIJAOtRkOrJBEY0MAoGCCqGSM49BAMCMHgxCzAJBgNVBAYT
AlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2VsZi1z
aWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1zaWdu
ZWQgY2VydGlmaWNhdGUwHhcNMjAxMTA4MDgwNjQ2WhcNMjIxMTA4MDgwNjQ2WjB4
ZWQgY2VydGlmaWNhdGUwHhcNMjAxMjAxMDQzMTU1WhcNMjIxMjAxMDQzMTU1WjB4
MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAeBgNV
BAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4wLjE6
IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcD
QgAEJyAlxk8nO9idpqSIG2c95r4JqlMxiRWlNCEB8CJrqsCI27CbZ4+l4KM8nAcE
zxj5LkHsiqd5NwAxO1DgqrEjh6MTMBEwDwYDVR0RBAgwBocEfwAAATAKBggqhkjO
PQQDAgNIADBFAiEAzUT2hG3WChJh8cBo7EMQan2eJiF96OlSB+rWKKMaoGACIGOp
RVaPKj9ad0Z/3GiwaxtW+74bvc2vF3JS9cRU6DhY
QgAEV+TyemWvmZGL5YjrSjtnwmyRtO/z8J6wQx2cz263VtEyTWfabEI49HYBtefy
Ypp9WJ50mBdINedxBLRZnk5O9qMTMBEwDwYDVR0RBAgwBocEfwAAATAKBggqhkjO
PQQDAgNIADBFAiEA+g3X1x27qV+LRx81AudIagHdvcVvLVbRJh0eXNFfPzUCIFHg
JSnRKkDuZ/d5wYR59eIld9FsJPFWPCQth2cKnBsM
-----END CERTIFICATE-----
`
)

func TestSingleRecordUDPTransport(t *testing.T) {
address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4630")
address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
testExporterToCollector(address, false, false, t)
}

func TestSingleRecordTCPTransport(t *testing.T) {
address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4631")
address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
testExporterToCollector(address, false, false, t)
}

func TestMultipleRecordUDPTransport(t *testing.T) {
address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4632")
address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
testExporterToCollector(address, true, false, t)
}

func TestMultipleRecordTCPTransport(t *testing.T) {
address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4633")
address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
testExporterToCollector(address, true, false, t)
}

func TestTLSTransport(t *testing.T) {
address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4634")
address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
testExporterToCollector(address, false, true, t)
}

func TestDTLSTransport(t *testing.T) {
address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4635")
address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
t.Error(err)
}
Expand All @@ -169,21 +169,22 @@ func testExporterToCollector(address net.Addr, isMultipleRecord bool, isEncrypte
}

go func() { // Start exporting process in go routine
time.Sleep(2 * time.Second) // wait for collector to be ready
time.Sleep(time.Second) // wait for collector to be ready
var export *exporter.ExportingProcess
var err error
collectorAddr := cp.GetAddress()
if isEncrypted {
if address.Network() == "tcp" { // use TLS
export, err = exporter.InitExportingProcess(address, 1, 0, true, []byte(fakeCert))
export, err = exporter.InitExportingProcess(collectorAddr, 1, 0, true, []byte(fakeCert))
} else if address.Network() == "udp" { // use DTLS
export, err = exporter.InitExportingProcess(address, 1, 0, true, []byte(fakeCert2))
export, err = exporter.InitExportingProcess(collectorAddr, 1, 0, true, []byte(fakeCert2))
}
} else {
export, err = exporter.InitExportingProcess(address, 1, 0, false, nil)
export, err = exporter.InitExportingProcess(collectorAddr, 1, 0, false, nil)
}

if err != nil {
klog.Fatalf("Got error when connecting to %s", address.String())
klog.Fatalf("Got error when connecting to %s", collectorAddr.String())
}

// Create template record with 5 fields
Expand Down Expand Up @@ -318,7 +319,7 @@ func testExporterToCollector(address net.Addr, isMultipleRecord bool, isEncrypte
klog.Fatalf("Got error when sending record: %v", err)
}
export.CloseConnToCollector() // Close exporting process
time.Sleep(2 * time.Second)
time.Sleep(time.Second)
cp.Stop() // Close collecting process
}()
go func() {
Expand Down

0 comments on commit 7ea9f81

Please sign in to comment.