From 2eb85c8894edf3afae2611aa16933bcf363e67a1 Mon Sep 17 00:00:00 2001 From: zyiou Date: Tue, 20 Oct 2020 14:45:08 -0700 Subject: [PATCH] add tls/dtls support --- cmd/collector/collector.go | 10 +- go.mod | 3 +- go.sum | 31 ++- pkg/collector/process.go | 47 +++- pkg/collector/process_test.go | 233 +++++++++++++++++++- pkg/collector/tcp.go | 30 ++- pkg/collector/udp.go | 99 +++++++-- pkg/exporter/process.go | 74 +++++-- pkg/exporter/process_test.go | 281 +++++++++++++++++++++++- pkg/intermediate/aggregate.go | 22 +- pkg/intermediate/aggregate_test.go | 50 ++++- pkg/test/collector_intermediate_test.go | 31 ++- pkg/test/exporter_collector_test.go | 148 +++++++++++-- pkg/util/util.go | 4 +- 14 files changed, 953 insertions(+), 110 deletions(-) diff --git a/cmd/collector/collector.go b/cmd/collector/collector.go index da55ab88..3d5d2d9d 100644 --- a/cmd/collector/collector.go +++ b/cmd/collector/collector.go @@ -131,7 +131,15 @@ func run() error { return fmt.Errorf("input given ipfix.transport flag is not supported or valid") } // Initialize collecting process - cp, err := collector.InitCollectingProcess(netAddr, 65535, 0) + cpInput := collector.CollectorInput{ + Address: netAddr, + MaxBufferSize: 65535, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, err := collector.InitCollectingProcess(cpInput) if err != nil { return err } diff --git a/go.mod b/go.mod index fe1a8338..68e118d2 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,10 @@ go 1.15 require ( github.com/golang/mock v1.4.3 + github.com/pion/dtls/v2 v2.0.3 github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.5.1 + github.com/stretchr/testify v1.6.1 k8s.io/apimachinery v0.18.4 k8s.io/component-base v0.18.4 k8s.io/klog v1.0.0 diff --git a/go.sum b/go.sum index bcb8265a..00bda3f7 100644 --- a/go.sum +++ b/go.sum @@ -23,7 +23,6 @@ github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -73,7 +72,6 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -83,10 +81,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -107,6 +103,15 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14= +github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE= +github.com/pion/transport v0.10.1 h1:2W+yJT+0mOQ160ThZYUx5Zp2skzshiNgxrNE9GUfhJM= +github.com/pion/transport v0.10.1/go.mod h1:PBis1stIILMiis0PewDw91WJeLJkyIMcEk+DwKOzf4A= +github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI= +github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -136,8 +141,9 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -146,6 +152,9 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -160,6 +169,9 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -178,6 +190,7 @@ golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -193,6 +206,8 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -202,18 +217,16 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.18.4/go.mod h1:lOIQAKYgai1+vz9J7YcDZwC26Z0zQewYOGWdyIPUUQ4= diff --git a/pkg/collector/process.go b/pkg/collector/process.go index 71b141b9..0182c05c 100644 --- a/pkg/collector/process.go +++ b/pkg/collector/process.go @@ -47,6 +47,20 @@ type CollectingProcess struct { messageChan chan *entities.Message // maps each client to its client handler (required channels) clients map[string]*clientHandler + // isEncrypted indicates whether to use TLS/DTLS for communication + isEncrypted bool + // serverCert and serverKey are for storing encryption info when using TLS/DTLS + serverCert []byte + serverKey []byte +} + +type CollectorInput struct { + Address net.Addr + MaxBufferSize uint16 + TemplateTTL uint32 + IsEncrypted bool + ServerCert []byte + ServerKey []byte } type clientHandler struct { @@ -54,16 +68,19 @@ type clientHandler struct { errChan chan bool } -func InitCollectingProcess(address net.Addr, maxBufferSize uint16, templateTTL uint32) (*CollectingProcess, error) { +func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) { collectProc := &CollectingProcess{ templatesMap: make(map[uint32]map[uint16][]*entities.InfoElement), mutex: sync.RWMutex{}, - templateTTL: templateTTL, - address: address, - maxBufferSize: maxBufferSize, + templateTTL: input.TemplateTTL, + address: input.Address, + maxBufferSize: input.MaxBufferSize, stopChan: make(chan bool), messageChan: make(chan *entities.Message), clients: make(map[string]*clientHandler), + isEncrypted: input.IsEncrypted, + serverCert: input.ServerCert, + serverKey: input.ServerKey, } return collectProc, nil } @@ -80,6 +97,12 @@ func (cp *CollectingProcess) Stop() { cp.stopChan <- true } +func (cp *CollectingProcess) GetAddress() net.Addr { + cp.mutex.RLock() + defer cp.mutex.RUnlock() + return cp.address +} + func (cp *CollectingProcess) GetMsgChan() chan *entities.Message { return cp.messageChan } @@ -146,12 +169,12 @@ func (cp *CollectingProcess) decodePacket(packetBuffer *bytes.Buffer, exportAddr if setID == entities.TemplateSetID { set, err = cp.decodeTemplateSet(packetBuffer, obsDomainID) if err != nil { - return nil, fmt.Errorf("Error in decoding message: %v", err) + return nil, fmt.Errorf("error in decoding message: %v", err) } } else { set, err = cp.decodeDataSet(packetBuffer, obsDomainID, setID) if err != nil { - return nil, fmt.Errorf("Error in decoding message: %v", err) + return nil, fmt.Errorf("error in decoding message: %v", err) } } message.AddSet(set) @@ -229,7 +252,7 @@ func (cp *CollectingProcess) decodeDataSet(dataBuffer *bytes.Buffer, obsDomainID // make sure template exists template, err := cp.getTemplate(obsDomainID, templateID) if err != nil { - return nil, fmt.Errorf("Template %d with obsDomainID %d does not exist", templateID, obsDomainID) + return nil, fmt.Errorf("template %d with obsDomainID %d does not exist", templateID, obsDomainID) } dataSet := entities.NewSet(entities.Data, templateID, true) @@ -289,7 +312,7 @@ func (cp *CollectingProcess) getTemplate(obsDomainID uint32, templateID uint16) if elements, exists := cp.templatesMap[obsDomainID][templateID]; exists { return elements, nil } else { - return nil, fmt.Errorf("Template %d with obsDomainID %d does not exist.", templateID, obsDomainID) + return nil, fmt.Errorf("template %d with obsDomainID %d does not exist", templateID, obsDomainID) } } @@ -299,6 +322,12 @@ func (cp *CollectingProcess) deleteTemplate(obsDomainID uint32, templateID uint1 delete(cp.templatesMap[obsDomainID], templateID) } +func (cp *CollectingProcess) updateAddress(address net.Addr) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + cp.address = address +} + // getMessageLength returns buffer length by decoding the header func getMessageLength(msgBuffer *bytes.Buffer) (int, error) { var version, msgLen, setID, setLen uint16 @@ -307,7 +336,7 @@ func getMessageLength(msgBuffer *bytes.Buffer) (int, error) { // that decodes header based on the offset. err := util.Decode(msgBuffer, binary.BigEndian, &version, &msgLen, &exportTime, &sequencNum, &obsDomainID, &setID, &setLen) if err != nil { - return 0, fmt.Errorf("Cannot decode message: %v", err) + return 0, fmt.Errorf("cannot decode message: %v", err) } return int(msgLen), nil } diff --git a/pkg/collector/process_test.go b/pkg/collector/process_test.go index a7db3e22..610801ac 100644 --- a/pkg/collector/process_test.go +++ b/pkg/collector/process_test.go @@ -16,11 +16,14 @@ package collector import ( "bytes" + "crypto/tls" + "crypto/x509" "net" "sync" "testing" "time" + "github.com/pion/dtls/v2" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/wait" @@ -30,6 +33,81 @@ import ( var validTemplatePacket = []byte{0, 10, 0, 40, 95, 154, 107, 127, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 101, 255, 255, 0, 0, 220, 186} var validDataPacket = []byte{0, 10, 0, 33, 95, 154, 108, 18, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 17, 1, 2, 3, 4, 5, 6, 7, 8, 4, 112, 111, 100, 49} + +const ( + fakeKey = `-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCvTekfTcktH3bp +sB+pRW9B9OqtjmXumWKLsKJq0MxA0gUuRfKr3dc5uKexk2HDM/gTCEMhDSe+SrAF +PNE6oIb69us8V53XB1AxCQM1G2gZB277Glaw/3o0fxSOXxGYnYO7ac44rrjudqMl +Tp7DPoQaa0rp00G6eBuzOewUmSxj/i5p5t+i8s5kj5ny014NcXAoVGeec0lI35qp ++/gda3u+E70BgKxCxaF9bE0DQmE0GClzSKULclV+UBCuoCCgU2iyajVMsUNapelt +vJC+qjHEpsTGGzSsb0LTCktjSQRooYYkMccmafLpTDhEa0Qmt2L8ilwlxg6c1PRv +XE25qncPAgMBAAECggEBAJE/z6GFVOPTRza3HHSnOFkA8hVdgC2i31j4wIoaeLJY +kbxWboxiofqMej2S7RTNEYXLebt/5+cugQvF6WJXMZ/tSNlVi01oHNSUMBknnSfn +1deuahf7hijLBqA0OyMll8mIEDs84bOLjv/RVZBWUySEs6xrwvEapXDp1Cb5ByPN +T1iGZ3chcOgGPX6MTq9+P4yREREQXjPZ9uKSiLqQg2rVg/j4sC/iPgiE/nSShPIk +gpOW3kgUuiYGsTQSJ2YIyr81MEgudmUCnJbu/5P8dqtHiqmHOW1psirwVB7xCow3 +h8JBuxz2jHTqnsAfXwWdmvZyXvAycR+9/t9CCGwee3kCgYEA1ozhdC5h6MfyaagP +9Hl0i8Jlh6r1WVMXLpPy0pQGPnw1JJUHHiEIU4Yp/tzO+DHOSe2mvKLGrsNIRH89 +Vh0maStI26brPyiw7w5hjelxrJ/zH0UdWzWxbZ8HRNh8F3WGoXkGoaLRMQUfYvOI +lT/HlOSmyl9UCByzU7sq5bkIU50CgYEA0SwFyGX/rpBC7YWpe1VsLBF8GSat9SUc +UAXn0/6x4eOvLtdPk67HrnU3FIvV376HuTY5hCC2sQTJ+cxzhAj3cpbJjOpjlJZj +nAYrVNAQHmgynKjCNP8v2W8LQbi39UPE5Zf6dphFbpgQgqYqMQV0iIWRv4WKJKAD +w3GMwB6pA5sCgYEAlHT/PAksLorMLlfgUmYIQvzMjEe7ZYedLtmo2BUdDPedPibw +ueRZgpH/VR8tB4hPGdCb40Mu/5aY1uzEYGXjQjp1O6gQd6+MXp4w2qWBxtUWwbht +S8OndhboTLcPhpwIAItiD04+OhE1Wp7xD3UGgPyGfNnhp4tUese0MykJnfECgYEA +ok8MtbIgMq6SoIjFOITSiWeP6lxPRBhl3dqXR7MtCOGKQEim4SwQmlkuQm03qoTI +AHoJK3PPD5FtwL5bLKtgh7Rl9UizuMrxxFItMYS53T5xd4qkGEekM46tJ3RUmqbZ +lGbX3UrPJcAtn5Oczak0AfPTYtAWn9Di2rezxiiEcd0CgYA0RSCk8XgtZxAoPQJC +Y2PJ6FHlSLMtDhsAsUtD+mXlt8+o+tyMG7ZysQZKHsjDMzEZZRK7F8W9+xzzl1fa +Ok+B9v1BFakMXRc5zcA8XH1ng9Ml2DfVYPXxwmaMsGPnwPZsftUJPNbArS60vJJh +w9ajWgCA6SGtD17ZpHfgIiMvhA== +-----END PRIVATE KEY----- +` + fakeCert = `-----BEGIN CERTIFICATE----- +MIIDhjCCAm6gAwIBAgIJAP3U+C7liWf8MA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV +BAYTAlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2Vs +Zi1zaWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1z +aWduZWQgY2VydGlmaWNhdGUwHhcNMjAxMTA1MDU0NjUyWhcNMjIxMTA1MDU0NjUy +WjB4MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAe +BgNVBAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4w +LjE6IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAr03pH03JLR926bAfqUVvQfTqrY5l7plii7CiatDMQNIFLkXy +q93XObinsZNhwzP4EwhDIQ0nvkqwBTzROqCG+vbrPFed1wdQMQkDNRtoGQdu+xpW +sP96NH8Ujl8RmJ2Du2nOOK647najJU6ewz6EGmtK6dNBungbsznsFJksY/4uaebf +ovLOZI+Z8tNeDXFwKFRnnnNJSN+aqfv4HWt7vhO9AYCsQsWhfWxNA0JhNBgpc0il +C3JVflAQrqAgoFNosmo1TLFDWqXpbbyQvqoxxKbExhs0rG9C0wpLY0kEaKGGJDHH +Jmny6Uw4RGtEJrdi/IpcJcYOnNT0b1xNuap3DwIDAQABoxMwETAPBgNVHREECDAG +hwQAAAAAMA0GCSqGSIb3DQEBCwUAA4IBAQAE6/mSUMVerL8B3Xs2+3YVmhd94Ql5 +ZKLwmEhsvOhP/3KRSncA8bIr4ZGCyvyEgsJqktjHJ4OYUIw3auYOBZgnUe3kM4NI +H7SS1JEtMu7okoXL/zHZcNrGHslFoEnIzvtoooSTQglcHclo8NWnGng6nJkSsY7w +DivAX9M7xtyKvGFgh6HuKYSZ3Yd6DeCkpnL2aOXf7cmFk4FT3SIbrtLNsLetbPl3 +rsA9pUDwTYRP8PDOLC3BKyDl84Dpb8JScqVpBMDRBW1dre0emORlh17JllyhA+9b +fKNX/D1XinAd/OftM5gYBWs7M6uZTm7JxMCvA2kckoN7B+BdrzisxTUR +-----END CERTIFICATE----- +` + fakeKey2 = `-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQg1h0K9jGfyBQMttaz +ija4rnsXfTQf1KvXl2o9SABhtvmhRANCAAQnICXGTyc72J2mpIgbZz3mvgmqUzGJ +FaU0IQHwImuqwIjbsJtnj6XgozycBwTPGPkuQeyKp3k3ADE7UOCqsSOH +-----END PRIVATE KEY----- +` + fakeCert2 = `-----BEGIN CERTIFICATE----- +MIIB+jCCAaCgAwIBAgIJALfqenQRnGoHMAoGCCqGSM49BAMCMHgxCzAJBgNVBAYT +AlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2VsZi1z +aWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1zaWdu +ZWQgY2VydGlmaWNhdGUwHhcNMjAxMTA4MDgwNjQ2WhcNMjIxMTA4MDgwNjQ2WjB4 +MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAeBgNV +BAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4wLjE6 +IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcD +QgAEJyAlxk8nO9idpqSIG2c95r4JqlMxiRWlNCEB8CJrqsCI27CbZ4+l4KM8nAcE +zxj5LkHsiqd5NwAxO1DgqrEjh6MTMBEwDwYDVR0RBAgwBocEfwAAATAKBggqhkjO +PQQDAgNIADBFAiEAzUT2hG3WChJh8cBo7EMQan2eJiF96OlSB+rWKKMaoGACIGOp +RVaPKj9ad0Z/3GiwaxtW+74bvc2vF3JS9cRU6DhY +-----END CERTIFICATE----- +` +) + var elementsWithValue = []*entities.InfoElementWithValue{ {Element: &entities.InfoElement{Name: "sourceIPv4Address", ElementId: 8, DataType: 18, EnterpriseId: 0, Len: 4}, Value: nil}, {Element: &entities.InfoElement{Name: "destinationIPv4Address", ElementId: 12, DataType: 18, EnterpriseId: 0, Len: 4}, Value: nil}, @@ -45,7 +123,15 @@ func TestTCPCollectingProcess_ReceiveTemplateRecord(t *testing.T) { if err != nil { t.Error(err) } - cp, err := InitCollectingProcess(address, 1024, 0) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, err := InitCollectingProcess(input) if err != nil { t.Fatalf("TCP Collecting Process does not start correctly: %v", err) } @@ -72,7 +158,15 @@ func TestUDPCollectingProcess_ReceiveTemplateRecord(t *testing.T) { if err != nil { t.Error(err) } - cp, err := InitCollectingProcess(address, 1024, 0) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, err := InitCollectingProcess(input) if err != nil { t.Fatalf("UDP Collecting Process does not start correctly: %v", err) } @@ -104,7 +198,15 @@ func TestTCPCollectingProcess_ReceiveDataRecord(t *testing.T) { if err != nil { t.Error(err) } - cp, err := InitCollectingProcess(address, 1024, 0) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, err := InitCollectingProcess(input) // Add the templates before sending data record cp.addTemplate(uint32(1), uint16(256), elementsWithValue) if err != nil { @@ -133,7 +235,15 @@ func TestUDPCollectingProcess_ReceiveDataRecord(t *testing.T) { if err != nil { t.Error(err) } - cp, err := InitCollectingProcess(address, 1024, 0) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, err := InitCollectingProcess(input) // Add the templates before sending data record cp.addTemplate(uint32(1), uint16(256), elementsWithValue) if err != nil { @@ -165,7 +275,15 @@ func TestTCPCollectingProcess_ConcurrentClient(t *testing.T) { if err != nil { t.Error(err) } - cp, _ := InitCollectingProcess(address, 1024, 0) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, _ := InitCollectingProcess(input) go func() { // wait until collector is ready waitForCollectorReady(t, address) @@ -193,7 +311,15 @@ func TestUDPCollectingProcess_ConcurrentClient(t *testing.T) { if err != nil { t.Error(err) } - cp, _ := InitCollectingProcess(address, 1024, 0) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, _ := InitCollectingProcess(input) go cp.Start() // wait until collector is ready waitForCollectorReady(t, address) @@ -311,7 +437,15 @@ func TestUDPCollectingProcess_TemplateExpire(t *testing.T) { if err != nil { t.Error(err) } - cp, err := InitCollectingProcess(address, 1024, 1) + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 1, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, err := InitCollectingProcess(input) if err != nil { t.Fatalf("UDP Collecting Process does not start correctly: %v", err) } @@ -344,6 +478,91 @@ func TestUDPCollectingProcess_TemplateExpire(t *testing.T) { assert.NotNil(t, err, "Template should be deleted after 5 seconds.") } +func TestTLSCollectingProcess(t *testing.T) { + address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4739") + if err != nil { + t.Error(err) + } + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: true, + ServerCert: []byte(fakeCert), + ServerKey: []byte(fakeKey), + } + cp, err := InitCollectingProcess(input) + if err != nil { + t.Fatalf("Collecting Process does not initiate correctly: %v", err) + } + go cp.Start() + // wait until collector is ready + waitForCollectorReady(t, address) + go func() { + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM([]byte(fakeCert)) + if !ok { + t.Error("Failed to parse root certificate") + } + config := &tls.Config{RootCAs: roots} + + conn, err := tls.Dial("tcp", address.String(), config) + if err != nil { + t.Error(err) + return + } + defer conn.Close() + _, err = conn.Write(validTemplatePacket) + assert.NoError(t, err) + }() + <-cp.GetMsgChan() + cp.Stop() + assert.NotNil(t, cp.templatesMap[1], "TLS Collecting Process should receive and store the received template.") +} + +func TestDTLSCollectingProcess(t *testing.T) { + address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4740") + if err != nil { + t.Error(err) + } + input := CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: true, + ServerCert: []byte(fakeCert2), + ServerKey: []byte(fakeKey2), + } + cp, err := InitCollectingProcess(input) + if err != nil { + t.Fatalf("DTLS Collecting Process does not initiate correctly: %v", err) + } + go cp.Start() + // wait until collector is ready + waitForCollectorReady(t, address) + go func() { + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM([]byte(fakeCert2)) + if !ok { + t.Error("Failed to parse root certificate") + } + config := &dtls.Config{RootCAs: roots, + ExtendedMasterSecret: dtls.RequireExtendedMasterSecret} + + conn, err := dtls.Dial("udp", address, config) + if err != nil { + t.Error(err) + return + } + defer conn.Close() + _, err = conn.Write(validTemplatePacket) + assert.NoError(t, err) + }() + <-cp.GetMsgChan() + cp.Stop() + assert.NotNil(t, cp.templatesMap[1], "DTLS Collecting Process should receive and store the received template.") +} + func waitForCollectorReady(t *testing.T, address net.Addr) { checkConn := func() (bool, error) { if _, err := net.Dial(address.Network(), address.String()); err != nil { diff --git a/pkg/collector/tcp.go b/pkg/collector/tcp.go index 9789451b..afbc4f30 100644 --- a/pkg/collector/tcp.go +++ b/pkg/collector/tcp.go @@ -2,6 +2,7 @@ package collector import ( "bytes" + "crypto/tls" "io" "net" "sync" @@ -10,13 +11,32 @@ import ( ) func (cp *CollectingProcess) startTCPServer() { - listener, err := net.Listen("tcp", cp.address.String()) - if err != nil { - klog.Errorf("Cannot start collecting process on %s: %v", cp.address.String(), err) - return + var listener net.Listener + var err error + if cp.isEncrypted { // use TLS + cer, err := tls.X509KeyPair(cp.serverCert, cp.serverKey) + if err != nil { + klog.Error(err) + return + } + config := &tls.Config{Certificates: []tls.Certificate{cer}} + listener, err = tls.Listen("tcp", cp.address.String(), config) + if err != nil { + klog.Errorf("Cannot start tls collecting process on %s: %v", cp.address.String(), err) + return + } + cp.updateAddress(listener.Addr()) + klog.Infof("Start tls collecting process on %s", cp.address.String()) + } else { + listener, err = net.Listen("tcp", cp.address.String()) + if err != nil { + klog.Errorf("Cannot start collecting process on %s: %v", cp.address.String(), err) + return + } + cp.updateAddress(listener.Addr()) + klog.Infof("Start %s collecting process on %s", cp.address.Network(), cp.address.String()) } - klog.Infof("Start %s collecting process on %s", cp.address.Network(), cp.address.String()) var wg sync.WaitGroup go func() { defer listener.Close() diff --git a/pkg/collector/udp.go b/pkg/collector/udp.go index ae8a7d57..1f4453da 100644 --- a/pkg/collector/udp.go +++ b/pkg/collector/udp.go @@ -16,45 +16,102 @@ package collector import ( "bytes" + "crypto/tls" + "crypto/x509" "net" "sync" "time" + "github.com/pion/dtls/v2" "k8s.io/klog" "github.com/vmware/go-ipfix/pkg/entities" ) func (cp *CollectingProcess) startUDPServer() { - s, err := net.ResolveUDPAddr("udp", cp.address.String()) - if err != nil { - klog.Error(err) - return - } - conn, err := net.ListenUDP("udp", s) + var listener net.Listener + var err error + var conn net.Conn + var wg sync.WaitGroup + address, err := net.ResolveUDPAddr(cp.address.Network(), cp.address.String()) if err != nil { klog.Error(err) return } - klog.Infof("Start %s collecting process on %s", cp.address.Network(), cp.address.String()) - var wg sync.WaitGroup - defer conn.Close() - go func() { - for { - buff := make([]byte, cp.maxBufferSize) - size, address, err := conn.ReadFromUDP(buff) - if err != nil { - if size == 0 { // received stop collector message + if cp.isEncrypted { // use DTLS + cert, err := tls.X509KeyPair(cp.serverCert, cp.serverKey) + if err != nil { + klog.Error(err) + return + } + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(cp.serverCert) + config := &dtls.Config{ + Certificates: []tls.Certificate{cert}, + ExtendedMasterSecret: dtls.RequireExtendedMasterSecret, + ClientCAs: certPool, + } + listener, err = dtls.Listen("udp", address, config) + if err != nil { + klog.Error(err) + return + } + cp.updateAddress(listener.Addr()) + klog.Infof("Start dtls collecting process on %s", cp.address.String()) + conn, err = listener.Accept() + if err != nil { + klog.Error(err) + return + } + defer conn.Close() + go func() { + for { + buff := make([]byte, cp.maxBufferSize) + size, err := conn.Read(buff) + if err != nil { + if size == 0 { // received stop collector message + return + } + klog.Errorf("Error in collecting process: %v", err) + return + } + address, err = net.ResolveUDPAddr(conn.LocalAddr().Network(), conn.LocalAddr().String()) + if err != nil { + klog.Errorf("Error in dtls collecting process: %v", err) return } - klog.Errorf("Error in collecting process: %v", err) - return + klog.V(2).Infof("Receiving %d bytes from %s", size, address.String()) + cp.handleUDPClient(address, &wg) + cp.clients[address.String()].packetChan <- bytes.NewBuffer(buff[0:size]) } - klog.V(2).Infof("Receiving %d bytes from %s", size, address.String()) - cp.handleUDPClient(address, &wg) - cp.clients[address.String()].packetChan <- bytes.NewBuffer(buff[0:size]) + }() + } else { // use udp + conn, err := net.ListenUDP("udp", address) + if err != nil { + klog.Error(err) + return } - }() + cp.updateAddress(conn.LocalAddr()) + klog.Infof("Start %s collecting process on %s", cp.address.Network(), cp.address.String()) + var wg sync.WaitGroup + defer conn.Close() + go func() { + for { + buff := make([]byte, cp.maxBufferSize) + size, address, err := conn.ReadFromUDP(buff) + if err != nil { + if size == 0 { // received stop collector message + return + } + klog.Errorf("Error in udp collecting process: %v", err) + return + } + klog.V(2).Infof("Receiving %d bytes from %s", size, address.String()) + cp.handleUDPClient(address, &wg) + cp.clients[address.String()].packetChan <- bytes.NewBuffer(buff[0:size]) + } + }() + } <-cp.stopChan // stop all the workers before closing collector cp.closeAllClients() diff --git a/pkg/exporter/process.go b/pkg/exporter/process.go index 33bd1d05..622227ac 100644 --- a/pkg/exporter/process.go +++ b/pkg/exporter/process.go @@ -15,11 +15,14 @@ package exporter import ( + "crypto/tls" + "crypto/x509" "fmt" "net" "sync" "time" + "github.com/pion/dtls/v2" "k8s.io/klog" "github.com/vmware/go-ipfix/pkg/entities" @@ -49,6 +52,15 @@ type ExportingProcess struct { mutex sync.Mutex } +type ExporterInput struct { + CollectorAddr net.Addr + ObservationDomainID uint32 + TempRefTimeout uint32 + PathMTU int + IsEncrypted bool + Cert []byte +} + // InitExportingProcess takes in collector address(net.Addr format), obsID(observation ID) // and tempRefTimeout(template refresh timeout). tempRefTimeout is applicable only // for collectors listening over UDP; unit is seconds. For TCP, you can pass any @@ -58,34 +70,69 @@ type ExportingProcess struct { // 0 or a value more than 1500, we consider a default value of 512B as per RFC7011. // PathMTU is optional for TCP as we use max socket buffer size of 65535. It can // be provided as 0. -func InitExportingProcess(collectorAddr net.Addr, obsID uint32, tempRefTimeout uint32, pathMTU int) (*ExportingProcess, error) { - conn, err := net.Dial(collectorAddr.Network(), collectorAddr.String()) - if err != nil { - klog.Errorf("Cannot the create the connection to configured ExportingProcess %s: %v", collectorAddr.String(), err) - return nil, err - } +func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { + var conn net.Conn + var err error + if input.IsEncrypted { + if input.CollectorAddr.Network() == "tcp" { // use TLS + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(input.Cert) + if !ok { + return nil, fmt.Errorf("Failed to parse root certificate") + } + config := &tls.Config{RootCAs: roots} + conn, err = tls.Dial(input.CollectorAddr.Network(), input.CollectorAddr.String(), config) + if err != nil { + klog.Errorf("Cannot the create the tls connection to configured ExportingProcess %s: %v", input.CollectorAddr.String(), err) + return nil, err + } + } else if input.CollectorAddr.Network() == "udp" { // use DTLS + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM(input.Cert) + if !ok { + return nil, fmt.Errorf("Failed to parse root certificate") + } + config := &dtls.Config{RootCAs: roots, + ExtendedMasterSecret: dtls.RequireExtendedMasterSecret} + address, err := net.ResolveUDPAddr(input.CollectorAddr.Network(), input.CollectorAddr.String()) + if err != nil { + return nil, fmt.Errorf("Cannot resolve udp address %s", input.CollectorAddr.String()) + } + conn, err = dtls.Dial(address.Network(), address, config) + if err != nil { + klog.Errorf("Cannot the create the dtls connection to configured ExportingProcess %s: %v", address.String(), err) + return nil, err + } + } + } else { + conn, err = net.Dial(input.CollectorAddr.Network(), input.CollectorAddr.String()) + if err != nil { + klog.Errorf("Cannot the create the connection to configured ExportingProcess %s: %v", input.CollectorAddr.String(), err) + return nil, err + } + } expProc := &ExportingProcess{ connToCollector: conn, - obsDomainID: obsID, + obsDomainID: input.ObservationDomainID, seqNumber: 0, templateID: startTemplateID, - pathMTU: pathMTU, + pathMTU: input.PathMTU, templatesMap: make(map[uint16]templateValue), templateRefCh: make(chan struct{}), } - // Template refresh logic and pathMTU check is only required for UDP transport. - if collectorAddr.Network() == "udp" { + // Template refresh logic is only for UDP transport. + if input.CollectorAddr.Network() == "udp" { if expProc.pathMTU == 0 || expProc.pathMTU > entities.MaxUDPMsgSize { expProc.pathMTU = entities.DefaultUDPMsgSize } - if tempRefTimeout == 0 { + if input.TempRefTimeout == 0 { // Default value - tempRefTimeout = entities.TemplateRefreshTimeOut + input.TempRefTimeout = entities.TemplateRefreshTimeOut } go func() { - ticker := time.NewTicker(time.Duration(tempRefTimeout) * time.Second) + ticker := time.NewTicker(time.Duration(input.TempRefTimeout) * time.Second) defer ticker.Stop() for { select { @@ -102,7 +149,6 @@ func InitExportingProcess(collectorAddr net.Addr, obsID uint32, tempRefTimeout u } }() } - return expProc, nil } diff --git a/pkg/exporter/process_test.go b/pkg/exporter/process_test.go index 68dde9f9..31f71579 100644 --- a/pkg/exporter/process_test.go +++ b/pkg/exporter/process_test.go @@ -15,16 +15,92 @@ package exporter import ( + "crypto/tls" "net" "testing" "time" + "github.com/pion/dtls/v2" "github.com/stretchr/testify/assert" "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" ) +const ( + fakeKey = `-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCvTekfTcktH3bp +sB+pRW9B9OqtjmXumWKLsKJq0MxA0gUuRfKr3dc5uKexk2HDM/gTCEMhDSe+SrAF +PNE6oIb69us8V53XB1AxCQM1G2gZB277Glaw/3o0fxSOXxGYnYO7ac44rrjudqMl +Tp7DPoQaa0rp00G6eBuzOewUmSxj/i5p5t+i8s5kj5ny014NcXAoVGeec0lI35qp ++/gda3u+E70BgKxCxaF9bE0DQmE0GClzSKULclV+UBCuoCCgU2iyajVMsUNapelt +vJC+qjHEpsTGGzSsb0LTCktjSQRooYYkMccmafLpTDhEa0Qmt2L8ilwlxg6c1PRv +XE25qncPAgMBAAECggEBAJE/z6GFVOPTRza3HHSnOFkA8hVdgC2i31j4wIoaeLJY +kbxWboxiofqMej2S7RTNEYXLebt/5+cugQvF6WJXMZ/tSNlVi01oHNSUMBknnSfn +1deuahf7hijLBqA0OyMll8mIEDs84bOLjv/RVZBWUySEs6xrwvEapXDp1Cb5ByPN +T1iGZ3chcOgGPX6MTq9+P4yREREQXjPZ9uKSiLqQg2rVg/j4sC/iPgiE/nSShPIk +gpOW3kgUuiYGsTQSJ2YIyr81MEgudmUCnJbu/5P8dqtHiqmHOW1psirwVB7xCow3 +h8JBuxz2jHTqnsAfXwWdmvZyXvAycR+9/t9CCGwee3kCgYEA1ozhdC5h6MfyaagP +9Hl0i8Jlh6r1WVMXLpPy0pQGPnw1JJUHHiEIU4Yp/tzO+DHOSe2mvKLGrsNIRH89 +Vh0maStI26brPyiw7w5hjelxrJ/zH0UdWzWxbZ8HRNh8F3WGoXkGoaLRMQUfYvOI +lT/HlOSmyl9UCByzU7sq5bkIU50CgYEA0SwFyGX/rpBC7YWpe1VsLBF8GSat9SUc +UAXn0/6x4eOvLtdPk67HrnU3FIvV376HuTY5hCC2sQTJ+cxzhAj3cpbJjOpjlJZj +nAYrVNAQHmgynKjCNP8v2W8LQbi39UPE5Zf6dphFbpgQgqYqMQV0iIWRv4WKJKAD +w3GMwB6pA5sCgYEAlHT/PAksLorMLlfgUmYIQvzMjEe7ZYedLtmo2BUdDPedPibw +ueRZgpH/VR8tB4hPGdCb40Mu/5aY1uzEYGXjQjp1O6gQd6+MXp4w2qWBxtUWwbht +S8OndhboTLcPhpwIAItiD04+OhE1Wp7xD3UGgPyGfNnhp4tUese0MykJnfECgYEA +ok8MtbIgMq6SoIjFOITSiWeP6lxPRBhl3dqXR7MtCOGKQEim4SwQmlkuQm03qoTI +AHoJK3PPD5FtwL5bLKtgh7Rl9UizuMrxxFItMYS53T5xd4qkGEekM46tJ3RUmqbZ +lGbX3UrPJcAtn5Oczak0AfPTYtAWn9Di2rezxiiEcd0CgYA0RSCk8XgtZxAoPQJC +Y2PJ6FHlSLMtDhsAsUtD+mXlt8+o+tyMG7ZysQZKHsjDMzEZZRK7F8W9+xzzl1fa +Ok+B9v1BFakMXRc5zcA8XH1ng9Ml2DfVYPXxwmaMsGPnwPZsftUJPNbArS60vJJh +w9ajWgCA6SGtD17ZpHfgIiMvhA== +-----END PRIVATE KEY----- +` + fakeCert = `-----BEGIN CERTIFICATE----- +MIIDhjCCAm6gAwIBAgIJAP3U+C7liWf8MA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV +BAYTAlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2Vs +Zi1zaWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1z +aWduZWQgY2VydGlmaWNhdGUwHhcNMjAxMTA1MDU0NjUyWhcNMjIxMTA1MDU0NjUy +WjB4MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAe +BgNVBAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4w +LjE6IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAr03pH03JLR926bAfqUVvQfTqrY5l7plii7CiatDMQNIFLkXy +q93XObinsZNhwzP4EwhDIQ0nvkqwBTzROqCG+vbrPFed1wdQMQkDNRtoGQdu+xpW +sP96NH8Ujl8RmJ2Du2nOOK647najJU6ewz6EGmtK6dNBungbsznsFJksY/4uaebf +ovLOZI+Z8tNeDXFwKFRnnnNJSN+aqfv4HWt7vhO9AYCsQsWhfWxNA0JhNBgpc0il +C3JVflAQrqAgoFNosmo1TLFDWqXpbbyQvqoxxKbExhs0rG9C0wpLY0kEaKGGJDHH +Jmny6Uw4RGtEJrdi/IpcJcYOnNT0b1xNuap3DwIDAQABoxMwETAPBgNVHREECDAG +hwQAAAAAMA0GCSqGSIb3DQEBCwUAA4IBAQAE6/mSUMVerL8B3Xs2+3YVmhd94Ql5 +ZKLwmEhsvOhP/3KRSncA8bIr4ZGCyvyEgsJqktjHJ4OYUIw3auYOBZgnUe3kM4NI +H7SS1JEtMu7okoXL/zHZcNrGHslFoEnIzvtoooSTQglcHclo8NWnGng6nJkSsY7w +DivAX9M7xtyKvGFgh6HuKYSZ3Yd6DeCkpnL2aOXf7cmFk4FT3SIbrtLNsLetbPl3 +rsA9pUDwTYRP8PDOLC3BKyDl84Dpb8JScqVpBMDRBW1dre0emORlh17JllyhA+9b +fKNX/D1XinAd/OftM5gYBWs7M6uZTm7JxMCvA2kckoN7B+BdrzisxTUR +-----END CERTIFICATE----- +` + fakeKey2 = `-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQg1h0K9jGfyBQMttaz +ija4rnsXfTQf1KvXl2o9SABhtvmhRANCAAQnICXGTyc72J2mpIgbZz3mvgmqUzGJ +FaU0IQHwImuqwIjbsJtnj6XgozycBwTPGPkuQeyKp3k3ADE7UOCqsSOH +-----END PRIVATE KEY----- +` + fakeCert2 = `-----BEGIN CERTIFICATE----- +MIIB+jCCAaCgAwIBAgIJALfqenQRnGoHMAoGCCqGSM49BAMCMHgxCzAJBgNVBAYT +AlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2VsZi1z +aWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1zaWdu +ZWQgY2VydGlmaWNhdGUwHhcNMjAxMTA4MDgwNjQ2WhcNMjIxMTA4MDgwNjQ2WjB4 +MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAeBgNV +BAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4wLjE6 +IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcD +QgAEJyAlxk8nO9idpqSIG2c95r4JqlMxiRWlNCEB8CJrqsCI27CbZ4+l4KM8nAcE +zxj5LkHsiqd5NwAxO1DgqrEjh6MTMBEwDwYDVR0RBAgwBocEfwAAATAKBggqhkjO +PQQDAgNIADBFAiEAzUT2hG3WChJh8cBo7EMQan2eJiF96OlSB+rWKKMaoGACIGOp +RVaPKj9ad0Z/3GiwaxtW+74bvc2vF3JS9cRU6DhY +-----END CERTIFICATE----- +` +) + func init() { registry.LoadRegistry() } @@ -59,7 +135,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) { }() // Create exporter using local server info - exporter, err := InitExportingProcess(listener.Addr(), 1, 0, 0) + input := ExporterInput{ + CollectorAddr: listener.Addr(), + ObservationDomainID: 1, + TempRefTimeout: 0, + PathMTU: 0, + IsEncrypted: false, + Cert: nil, + } + exporter, err := InitExportingProcess(input) if err != nil { t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) } @@ -129,7 +213,15 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) { }() // Create exporter using local server info - exporter, err := InitExportingProcess(conn.LocalAddr(), 1, 1, 0) + input := ExporterInput{ + CollectorAddr: conn.LocalAddr(), + ObservationDomainID: 1, + TempRefTimeout: 1, + PathMTU: 0, + IsEncrypted: false, + Cert: nil, + } + exporter, err := InitExportingProcess(input) if err != nil { t.Fatalf("Got error when connecting to local server %s: %v", conn.LocalAddr().String(), err) } @@ -203,7 +295,15 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { }() // Create exporter using local server info - exporter, err := InitExportingProcess(listener.Addr(), 1, 0, 0) + input := ExporterInput{ + CollectorAddr: listener.Addr(), + ObservationDomainID: 1, + TempRefTimeout: 0, + PathMTU: 0, + IsEncrypted: false, + Cert: nil, + } + exporter, err := InitExportingProcess(input) if err != nil { t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err) } @@ -260,7 +360,7 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) { err := dataSet.AddRecord(elements, templateID) assert.NoError(t, err) } - bytesSent, err = exporter.SendSet(dataSet) + _, err = exporter.SendSet(dataSet) assert.Error(t, err) exporter.CloseConnToCollector() @@ -293,7 +393,15 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) { }() // Create exporter using local server info - exporter, err := InitExportingProcess(conn.LocalAddr(), 1, 0, 0) + input := ExporterInput{ + CollectorAddr: conn.LocalAddr(), + ObservationDomainID: 1, + TempRefTimeout: 0, + PathMTU: 0, + IsEncrypted: false, + Cert: nil, + } + exporter, err := InitExportingProcess(input) if err != nil { t.Fatalf("Got error when connecting to local server %s: %v", conn.LocalAddr().String(), err) } @@ -349,8 +457,169 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) { for i := 0; i < 100; i++ { dataSet.AddRecord(elements, templateID) } - bytesSent, err = exporter.SendSet(dataSet) + _, err = exporter.SendSet(dataSet) assert.Error(t, err) exporter.CloseConnToCollector() } + +func TestExportingProcessWithTLS(t *testing.T) { + // Create local server for testing + address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4830") + if err != nil { + t.Fatalf("Got error when resolving tcp address: %v", err) + } + cer, err := tls.X509KeyPair([]byte(fakeCert), []byte(fakeKey)) + if err != nil { + t.Error(err) + return + } + config := &tls.Config{Certificates: []tls.Certificate{cer}} + listener, err := tls.Listen("tcp", address.String(), config) + if err != nil { + t.Errorf("Cannot start tls collecting process on %s: %v", listener.Addr().String(), err) + return + } + + buffCh := make(chan []byte) + go func() { + defer listener.Close() + conn, err := listener.Accept() + if err != nil { + return + } + defer conn.Close() + t.Log("Accept the connection from exporter") + buff := make([]byte, 32) + _, err = conn.Read(buff) + if err != nil { + t.Error(err) + } + // Compare only template record part. Remove message header and set header. + buffCh <- buff[20:] + return + }() + + // Create exporter using local server info + input := ExporterInput{ + CollectorAddr: address, + ObservationDomainID: 1, + TempRefTimeout: 0, + IsEncrypted: true, + Cert: []byte(fakeCert), + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local tls server %s: %v", address, err) + } + t.Logf("Created exporter connecting to local tls server with address: %s", address) + + // Create template record with two fields + templateID := exporter.NewTemplateID() + templateSet := entities.NewSet(entities.Template, templateID, false) + elements := make([]*entities.InfoElementWithValue, 0) + element, err := registry.GetInfoElement("sourceIPv4Address", registry.IANAEnterpriseID) + if err != nil { + t.Errorf("Did not find the element with name sourceIPv4Address") + } + ie := entities.NewInfoElementWithValue(element, nil) + elements = append(elements, ie) + element, err = registry.GetInfoElement("destinationIPv4Address", registry.IANAEnterpriseID) + if err != nil { + t.Errorf("Did not find the element with name destinationIPv4Address") + } + ie = entities.NewInfoElementWithValue(element, nil) + elements = append(elements, ie) + templateSet.AddRecord(elements, templateID) + + bytesSent, err := exporter.SendSet(templateSet) + if err != nil { + t.Fatalf("Got error when sending record: %v", err) + } + // 32 is the size of the IPFIX message including all headers + assert.Equal(t, 32, bytesSent) + assert.Equal(t, uint32(0), exporter.seqNumber) + exporter.CloseConnToCollector() +} + +func TestExportingProcessWithDTLS(t *testing.T) { + // Create local server for testing + address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4831") + if err != nil { + t.Fatalf("Got error when resolving udp address: %v", err) + } + cert, err := tls.X509KeyPair([]byte(fakeCert2), []byte(fakeKey2)) + if err != nil { + t.Error(err) + return + } + config := &dtls.Config{ + Certificates: []tls.Certificate{cert}, + ExtendedMasterSecret: dtls.RequireExtendedMasterSecret, + } + listener, err := dtls.Listen("udp", address, config) + if err != nil { + t.Errorf("Cannot start dtls collecting process on 0.0.0.0:4739: %v", err) + return + } + + buffCh := make(chan []byte) + go func() { + defer listener.Close() + conn, err := listener.Accept() + if err != nil { + return + } + defer conn.Close() + t.Log("Accept the connection from exporter") + buff := make([]byte, 32) + _, err = conn.Read(buff) + if err != nil { + t.Error(err) + } + // Compare only template record part. Remove message header and set header. + buffCh <- buff[20:] + return + }() + + // Create exporter using local server info + input := ExporterInput{ + CollectorAddr: address, + ObservationDomainID: 1, + TempRefTimeout: 0, + IsEncrypted: true, + Cert: []byte(fakeCert2), + } + exporter, err := InitExportingProcess(input) + if err != nil { + t.Fatalf("Got error when connecting to local dtls server %s: %v", address, err) + } + t.Logf("Created exporter connecting to local dtls server with address: %s", address) + + // Create template record with two fields + templateID := exporter.NewTemplateID() + templateSet := entities.NewSet(entities.Template, templateID, false) + elements := make([]*entities.InfoElementWithValue, 0) + element, err := registry.GetInfoElement("sourceIPv4Address", registry.IANAEnterpriseID) + if err != nil { + t.Errorf("Did not find the element with name sourceIPv4Address") + } + ie := entities.NewInfoElementWithValue(element, nil) + elements = append(elements, ie) + element, err = registry.GetInfoElement("destinationIPv4Address", registry.IANAEnterpriseID) + if err != nil { + t.Errorf("Did not find the element with name destinationIPv4Address") + } + ie = entities.NewInfoElementWithValue(element, nil) + elements = append(elements, ie) + templateSet.AddRecord(elements, templateID) + + bytesSent, err := exporter.SendSet(templateSet) + if err != nil { + t.Fatalf("Got error when sending record: %v", err) + } + // 32 is the size of the IPFIX message including all headers + assert.Equal(t, 32, bytesSent) + assert.Equal(t, uint32(0), exporter.seqNumber) + exporter.CloseConnToCollector() +} diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index 56fec47f..22de017f 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -37,23 +37,29 @@ type FlowKey struct { DestinationPort uint16 } +type AggregationInput struct { + MessageChan chan *entities.Message + WorkerNum int + CorrelateFields []string +} + type FlowKeyRecordMapCallBack func(key FlowKey, records []entities.Record) error // InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message) // and correlateFields (fields to be correlated and filled). -func InitAggregationProcess(messageChan chan *entities.Message, workerNum int, correlateFields []string) (*AggregationProcess, error) { - if messageChan == nil { - return nil, fmt.Errorf("Cannot create AggregationProcess process without message channel.") - } else if workerNum <= 0 { - return nil, fmt.Errorf("Worker number cannot be <= 0.") +func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error) { + if input.MessageChan == nil { + return nil, fmt.Errorf("cannot create AggregationProcess process without message channel") + } else if input.WorkerNum <= 0 { + return nil, fmt.Errorf("worker number cannot be <= 0") } return &AggregationProcess{ make(map[FlowKey][]entities.Record), sync.RWMutex{}, - messageChan, - workerNum, + input.MessageChan, + input.WorkerNum, make([]*worker, 0), - correlateFields, + input.CorrelateFields, make(chan bool), }, nil } diff --git a/pkg/intermediate/aggregate_test.go b/pkg/intermediate/aggregate_test.go index 1b7b4dce..db77d05c 100644 --- a/pkg/intermediate/aggregate_test.go +++ b/pkg/intermediate/aggregate_test.go @@ -162,25 +162,45 @@ func createMsgwithDataSetIPv6() *entities.Message { return message } +func init() { + registry.LoadRegistry() +} + func TestInitAggregationProcess(t *testing.T) { - aggregationProcess, err := InitAggregationProcess(nil, 2, fields) + input := AggregationInput{ + MessageChan: nil, + WorkerNum: 2, + CorrelateFields: fields, + } + aggregationProcess, err := InitAggregationProcess(input) assert.NotNil(t, err) assert.Nil(t, aggregationProcess) messageChan := make(chan *entities.Message) - aggregationProcess, err = InitAggregationProcess(messageChan, 2, fields) + input.MessageChan = messageChan + aggregationProcess, err = InitAggregationProcess(input) assert.Nil(t, err) assert.Equal(t, 2, aggregationProcess.workerNum) } func TestGetTupleRecordMap(t *testing.T) { messageChan := make(chan *entities.Message) - aggregationProcess, _ := InitAggregationProcess(messageChan, 2, fields) + input := AggregationInput{ + MessageChan: messageChan, + WorkerNum: 2, + CorrelateFields: fields, + } + aggregationProcess, _ := InitAggregationProcess(input) assert.Equal(t, aggregationProcess.flowKeyRecordMap, aggregationProcess.flowKeyRecordMap) } func TestAggregateMsgByFlowKey(t *testing.T) { messageChan := make(chan *entities.Message) - aggregationProcess, _ := InitAggregationProcess(messageChan, 2, fields) + input := AggregationInput{ + MessageChan: messageChan, + WorkerNum: 2, + CorrelateFields: fields, + } + aggregationProcess, _ := InitAggregationProcess(input) // Template records should be ignored message := createMsgwithTemplateSet() err := aggregationProcess.AggregateMsgByFlowKey(message) @@ -215,7 +235,12 @@ func TestAggregateMsgByFlowKey(t *testing.T) { func TestAggregationProcess(t *testing.T) { messageChan := make(chan *entities.Message) - aggregationProcess, _ := InitAggregationProcess(messageChan, 2, fields) + input := AggregationInput{ + MessageChan: messageChan, + WorkerNum: 2, + CorrelateFields: fields, + } + aggregationProcess, _ := InitAggregationProcess(input) dataMsg := createMsgwithDataSet1() go func() { messageChan <- createMsgwithTemplateSet() @@ -235,7 +260,6 @@ func TestAggregationProcess(t *testing.T) { } func TestAddOriginalExporterInfo(t *testing.T) { - registry.LoadRegistry() // Test message with template set message := createMsgwithTemplateSet() err := addOriginalExporterInfo(message) @@ -261,7 +285,12 @@ func TestAddOriginalExporterInfo(t *testing.T) { func TestCorrelateRecords(t *testing.T) { registry.LoadRegistry() messageChan := make(chan *entities.Message) - aggregationProcess, _ := InitAggregationProcess(messageChan, 2, fields) + input := AggregationInput{ + MessageChan: messageChan, + WorkerNum: 2, + CorrelateFields: fields, + } + aggregationProcess, _ := InitAggregationProcess(input) record1 := createMsgwithDataSet1().GetSet().GetRecords()[0] flowKey1, _ := getFlowKeyFromRecord(record1) record2 := createMsgwithDataSet2().GetSet().GetRecords()[0] @@ -285,7 +314,12 @@ func TestCorrelateRecords(t *testing.T) { func TestDeleteTupleFromMap(t *testing.T) { messageChan := make(chan *entities.Message) message := createMsgwithDataSet1() - aggregationProcess, _ := InitAggregationProcess(messageChan, 2, fields) + input := AggregationInput{ + MessageChan: messageChan, + WorkerNum: 2, + CorrelateFields: fields, + } + aggregationProcess, _ := InitAggregationProcess(input) flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678} flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678} aggregationProcess.flowKeyRecordMap[flowKey1] = message.GetSet().GetRecords() diff --git a/pkg/test/collector_intermediate_test.go b/pkg/test/collector_intermediate_test.go index 812f9d03..6e6cf4ed 100644 --- a/pkg/test/collector_intermediate_test.go +++ b/pkg/test/collector_intermediate_test.go @@ -19,10 +19,12 @@ package test import ( "fmt" "net" + "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/vmware/go-ipfix/pkg/collector" "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/intermediate" @@ -47,10 +49,24 @@ func TestCollectorToIntermediate(t *testing.T) { t.Error(err) } // Initialize aggregation process and collecting process - cp, _ := collector.InitCollectingProcess(address, 1024, 0) - ap, _ := intermediate.InitAggregationProcess(cp.GetMsgChan(), 2, fields) + cpInput := collector.CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: false, + ServerCert: nil, + ServerKey: nil, + } + cp, _ := collector.InitCollectingProcess(cpInput) + + apInput := intermediate.AggregationInput{ + MessageChan: cp.GetMsgChan(), + WorkerNum: 2, + CorrelateFields: fields, + } + ap, _ := intermediate.InitAggregationProcess(apInput) go cp.Start() - waitForCollectorReady(t, address) + waitForCollectorReady(t, cp) go func() { conn, err := net.DialUDP("udp", nil, address) if err != nil { @@ -85,15 +101,18 @@ func copyFlowKeyRecordMap(key intermediate.FlowKey, records []entities.Record) e return nil } -func waitForCollectorReady(t *testing.T, address net.Addr) { +func waitForCollectorReady(t *testing.T, cp *collector.CollectingProcess) { checkConn := func() (bool, error) { - if _, err := net.Dial(address.Network(), address.String()); err != nil { + if strings.Split(cp.GetAddress().String(), ":")[1] == "0" { + return false, fmt.Errorf("random port is not resolved") + } + if _, err := net.Dial(cp.GetAddress().Network(), cp.GetAddress().String()); err != nil { return false, err } return true, nil } if err := wait.Poll(100*time.Millisecond, 500*time.Millisecond, checkConn); err != nil { - t.Errorf("Cannot establish connection to %s", address.String()) + t.Errorf("Cannot establish connection to %s", cp.GetAddress().String()) } } diff --git a/pkg/test/exporter_collector_test.go b/pkg/test/exporter_collector_test.go index f5cd5e3e..cb830cc1 100644 --- a/pkg/test/exporter_collector_test.go +++ b/pkg/test/exporter_collector_test.go @@ -34,49 +34,171 @@ func init() { registry.LoadRegistry() } +const ( + fakeKey = `-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDz3PD8TwjDfQg2 +IiBVZMZT1RJLZwZZXWYkukWfhx7Zp1iJeAX1ExSEcnq1b9Hc1cP0PXsw0eoqXomH +tAimX/k0d6IqLHTiB7fFJF2cuAjQbMBoGxN5v+0elJOBCwAaY8E0iwPwuKa4VFua +6rSqSfkhQ4FQqaUqhy6eqp3vD/ycHhje1VX3S66hwGMaDLi2trnm88fjm81I1A0d +bwPRPO0246m2loG7xB8RYfMLgCLzhOQLz2Uow0HfjbJQ+IlDQ3uuWdzr46nqQaes +T6ULLz8N560+rjQD0Dg3GGeqaHEP7OaLmVQBqtsj0GroD7xscDKkgq9fnW9VuGjZ +2tKElip5AgMBAAECggEBAO/NrnSOS7Hg+/gvqtgOVzEM8AaR8x5hyBYJznlHaEDk +XR4hlsoezyhHYv+UTCz7UMyWwNOLONgdSuTVV0Q0UF0V37PVL8Mtj7sfPablGlXK ++5HkPkyVPVm7BSn6ZUmOGunOYjuPePL+kW5PqwVh5MifF0T47eBaOq/wW4pAkEn7 +KzHcVEy08VnaK3XIC7lS9f5o/L9KSlZQU15TUAjFZ/kn4phzsR/z5462qKl9obYQ +gOqtK5ZA4ZqTX0nbzaqM20rhZEv+FkDWmSzcAaGfjvPtjFUmrChhpVSRqtcxlgEK +qwVM01i8t/E8p0Y4/2+GSDs4yKFeJNVuQ9AsLdhpJRkCgYEA//C+d/ltVhQtKaoA +dJy+AESTaVWS5IUmcE2x4fu9vyDR/U6FyPUmU4imwO4wS0O8jLdExjtb2kaIiGth +b3cmoUe77Xpi+iLSIAVk27QYWlnAQY/VkAZZZtY3JS75jdo4TShiafYq4vb0Gux7 +jDkh+VW2mTRpp0if88y/61HPH8cCgYEA8+t6ON3CY3U19wtoQEA01kLk13B182ps +7beRidxW37ndOVXGj27xeEuzHlj/ry+kh+AOkHRbiz1RvhdX2POotgwX0Tdq/SXv +CIYsL02ZS1m33GwrvoVfsTOpvKrQMlX7WpXYpghagf2y6TEJiCtWlSROJhje9K2I +eL5jwV8g478CgYEAiYNEExoE0NcOXPBmRkFhJKuzuEiuH/IacQSNqqmjjWmI6dyi +rRJqgT9OuSJA+G9wgvqFDS0fcOust/9Z3pXaP5VXN4UmYNcMpv++7PyaiRDn51Hs +oPGIX2SBRI00sC6rSWmFVwFYkZG2HjEpQHIB+wE+lpo+mg6/QjKkez79VkkCgYAP +BnFX8WkZAU5asmwwkQPwMtyv3LCXVvXwyr7/VABR9bwH3R3HFhlvxJH7C5Zsby3e +ZNHg2hoNgLB5WizCI3hABoytCZHgmCaaStGL9Ga9+n/V5x/ms4aKftk00vzSLPO3 +x8U5rQgOO9d6f9fLeIfz1fGubRfG0K24alnwvnBjNwKBgDtu5OUrOtAW4pZVj5rs +YK1g7EZMreBtU1ox80vhU7QT707EI2LfI7J9MGfEHcstjl868Z8JBIyjyEmAJxYv +08oooxSfn83rnXzn+XFaQKSuAYzbQXulKh3DGucZY1n8tPT9I+jbtGvGBj94FuBQ +SuTFwBux90EAGPE4SyuCHgYo +-----END PRIVATE KEY----- +` + fakeCert = `-----BEGIN CERTIFICATE----- +MIIDhjCCAm6gAwIBAgIJAJPXwf3JizlIMA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV +BAYTAlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2Vs +Zi1zaWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1z +aWduZWQgY2VydGlmaWNhdGUwHhcNMjAxMjAxMDQyNjA0WhcNMjIxMjAxMDQyNjA0 +WjB4MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAe +BgNVBAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4w +LjE6IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEA89zw/E8Iw30INiIgVWTGU9USS2cGWV1mJLpFn4ce2adYiXgF +9RMUhHJ6tW/R3NXD9D17MNHqKl6Jh7QIpl/5NHeiKix04ge3xSRdnLgI0GzAaBsT +eb/tHpSTgQsAGmPBNIsD8LimuFRbmuq0qkn5IUOBUKmlKocunqqd7w/8nB4Y3tVV +90uuocBjGgy4tra55vPH45vNSNQNHW8D0TztNuOptpaBu8QfEWHzC4Ai84TkC89l +KMNB342yUPiJQ0N7rlnc6+Op6kGnrE+lCy8/DeetPq40A9A4NxhnqmhxD+zmi5lU +AarbI9Bq6A+8bHAypIKvX51vVbho2drShJYqeQIDAQABoxMwETAPBgNVHREECDAG +hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQBninXJdEqqsJtqJKwog5X/EY7+Qgxh +f4Ye7cI3JvGuH1InmZkLmgJAtRy5MDo64GZmPjKJsr4a5M+DARFcRnoFhqo7uXoS +M3D5l9hbUUrXDrQzTww+6ga2wwZEFaOkzNLaTZ2bw01vVM3Xpqn5y0N4+7fTw6Ka +M2xhaFYgJzDy9nQjmTn2F3G8s27xQ3ebAy/KcNGJJwf3vFC76i76sa78jWYmGOO2 +hyHgFXOE2+9q4J364PTipSwLgFr7lz4CF+OCzttpyFqB34c54atCQ7ste/NXysIO +F90n06ISWkonCXiZ09r3ooH5py6GBFf15ZKrBzetZN+aSJcBjHXasQrA +-----END CERTIFICATE----- +` + fakeKey2 = `-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgnOocXNDRcH6BZ86v +4GwZF6JiqYbF7bxrssfJ/Ge0jbihRANCAARX5PJ6Za+ZkYvliOtKO2fCbJG07/Pw +nrBDHZzPbrdW0TJNZ9psQjj0dgG15/Jimn1YnnSYF0g153EEtFmeTk72 +-----END PRIVATE KEY----- +` + fakeCert2 = `-----BEGIN CERTIFICATE----- +MIIB+jCCAaCgAwIBAgIJAOtRkOrJBEY0MAoGCCqGSM49BAMCMHgxCzAJBgNVBAYT +AlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2VsZi1z +aWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1zaWdu +ZWQgY2VydGlmaWNhdGUwHhcNMjAxMjAxMDQzMTU1WhcNMjIxMjAxMDQzMTU1WjB4 +MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAeBgNV +BAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4wLjE6 +IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcD +QgAEV+TyemWvmZGL5YjrSjtnwmyRtO/z8J6wQx2cz263VtEyTWfabEI49HYBtefy +Ypp9WJ50mBdINedxBLRZnk5O9qMTMBEwDwYDVR0RBAgwBocEfwAAATAKBggqhkjO +PQQDAgNIADBFAiEA+g3X1x27qV+LRx81AudIagHdvcVvLVbRJh0eXNFfPzUCIFHg +JSnRKkDuZ/d5wYR59eIld9FsJPFWPCQth2cKnBsM +-----END CERTIFICATE----- +` +) + func TestSingleRecordUDPTransport(t *testing.T) { - address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4739") + address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { t.Error(err) } - testExporterToCollector(address, false, t) + testExporterToCollector(address, false, false, t) } func TestSingleRecordTCPTransport(t *testing.T) { - address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4739") + address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") if err != nil { t.Error(err) } - testExporterToCollector(address, false, t) + testExporterToCollector(address, false, false, t) } func TestMultipleRecordUDPTransport(t *testing.T) { - address, err := net.ResolveUDPAddr("udp", "0.0.0.0:4738") + address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { t.Error(err) } - testExporterToCollector(address, true, t) + testExporterToCollector(address, true, false, t) } func TestMultipleRecordTCPTransport(t *testing.T) { - address, err := net.ResolveTCPAddr("tcp", "0.0.0.0:4738") + address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + if err != nil { + t.Error(err) + } + testExporterToCollector(address, true, false, t) +} + +func TestTLSTransport(t *testing.T) { + address, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") + if err != nil { + t.Error(err) + } + testExporterToCollector(address, false, true, t) +} + +func TestDTLSTransport(t *testing.T) { + address, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") if err != nil { t.Error(err) } - testExporterToCollector(address, true, t) + testExporterToCollector(address, false, true, t) } -func testExporterToCollector(address net.Addr, isMultipleRecord bool, t *testing.T) { +func testExporterToCollector(address net.Addr, isMultipleRecord bool, isEncrypted bool, t *testing.T) { // Initialize collecting process messages := make([]*entities.Message, 0) - cp, _ := collector.InitCollectingProcess(address, 1024, 0) + cpInput := collector.CollectorInput{ + Address: address, + MaxBufferSize: 1024, + TemplateTTL: 0, + IsEncrypted: isEncrypted, + ServerCert: nil, + ServerKey: nil, + } + if isEncrypted { + if address.Network() == "tcp" { + cpInput.ServerCert = []byte(fakeCert) + cpInput.ServerKey = []byte(fakeKey) + } else if address.Network() == "udp" { + cpInput.ServerCert = []byte(fakeCert2) + cpInput.ServerKey = []byte(fakeKey2) + } + } + cp, _ := collector.InitCollectingProcess(cpInput) // Start collecting process go cp.Start() go func() { // Start exporting process in go routine - waitForCollectorReady(t, address) - export, err := exporter.InitExportingProcess(address, 1, 0, 0) + waitForCollectorReady(t, cp) + epInput := exporter.ExporterInput{ + CollectorAddr: cp.GetAddress(), + ObservationDomainID: 1, + TempRefTimeout: 0, + PathMTU: 0, + IsEncrypted: isEncrypted, + Cert: nil, + } + if isEncrypted { + if address.Network() == "tcp" { // use TLS + epInput.Cert = []byte(fakeCert) + } else if address.Network() == "udp" { // use DTLS + epInput.Cert = []byte(fakeCert2) + } + } + export, err := exporter.InitExportingProcess(epInput) if err != nil { - klog.Fatalf("Got error when connecting to %s", address.String()) + klog.Fatalf("Got error when connecting to %s", cp.GetAddress().String()) } // Create template record with 5 fields diff --git a/pkg/util/util.go b/pkg/util/util.go index 271e2c09..10b8c436 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -26,7 +26,7 @@ func Encode(buff io.Writer, byteOrder binary.ByteOrder, inputs ...interface{}) e for _, in := range inputs { err = binary.Write(buff, byteOrder, in) if err != nil { - return fmt.Errorf("Error in encoding data %v: %v", in, err) + return fmt.Errorf("error in encoding data %v: %v", in, err) } } return nil @@ -45,7 +45,7 @@ func Decode(buffer io.Reader, byteOrder binary.ByteOrder, outputs ...interface{} for _, out := range outputs { err = binary.Read(buffer, byteOrder, out) if err != nil { - return fmt.Errorf("Error in decoding data: %v", err) + return fmt.Errorf("error in decoding data: %v", err) } } return nil