diff --git a/build/yamls/flow-visibility-e2e.yml b/build/yamls/flow-visibility-e2e.yml index 87bbe91ae51..9e2524768cb 100644 --- a/build/yamls/flow-visibility-e2e.yml +++ b/build/yamls/flow-visibility-e2e.yml @@ -90,6 +90,7 @@ data: throughputFromDestinationNode UInt64, reverseThroughputFromSourceNode UInt64, reverseThroughputFromDestinationNode UInt64, + clusterUUID String, trusted UInt8 DEFAULT 0 ) engine=MergeTree ORDER BY (timeInserted, flowEndSeconds) diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index dd9776da769..14ca353eb13 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -81,9 +81,10 @@ const ( throughputFromSourceNode, throughputFromDestinationNode, reverseThroughputFromSourceNode, - reverseThroughputFromDestinationNode) + reverseThroughputFromDestinationNode, + clusterUUID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) // PrepareClickHouseConnection is used for unit testing @@ -118,7 +119,8 @@ type ClickHouseExportProcess struct { commitTicker *time.Ticker exportProcessRunning bool // mutex protects configuration state from concurrent access - mutex sync.Mutex + mutex sync.Mutex + clusterUUID string } type ClickHouseInput struct { @@ -156,7 +158,7 @@ func (ci *ClickHouseInput) GetDataSourceName() (string, error) { return sb.String(), nil } -func NewClickHouseClient(input ClickHouseInput) (*ClickHouseExportProcess, error) { +func NewClickHouseClient(input ClickHouseInput, clusterUUID string) (*ClickHouseExportProcess, error) { dsn, connect, err := PrepareClickHouseConnection(input) if err != nil { return nil, err @@ -168,6 +170,7 @@ func NewClickHouseClient(input ClickHouseInput) (*ClickHouseExportProcess, error deque: deque.New(), queueSize: maxQueueSize, commitInterval: input.CommitInterval, + clusterUUID: clusterUUID, } return chClient, nil } @@ -343,7 +346,8 @@ func (ch *ClickHouseExportProcess) batchCommitAll(ctx context.Context) (int, err record.ThroughputFromSourceNode, record.ThroughputFromDestinationNode, record.ReverseThroughputFromSourceNode, - record.ReverseThroughputFromDestinationNode) + record.ReverseThroughputFromDestinationNode, + ch.clusterUUID) if err != nil { klog.ErrorS(err, "Error when adding record") diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index 36cebfefd20..fa83e328198 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -25,6 +25,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/gammazero/deque" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing" @@ -40,6 +41,8 @@ func init() { registry.LoadRegistry() } +var fakeClusterUUID = uuid.New().String() + func TestGetDataSourceName(t *testing.T) { chInput := ClickHouseInput{ Username: "username", @@ -103,9 +106,10 @@ func TestBatchCommitAll(t *testing.T) { defer db.Close() chExportProc := ClickHouseExportProcess{ - db: db, - deque: deque.New(), - queueSize: maxQueueSize, + db: db, + deque: deque.New(), + queueSize: maxQueueSize, + clusterUUID: fakeClusterUUID, } recordRow := flowrecordtesting.PrepareTestFlowRecord() @@ -161,7 +165,8 @@ func TestBatchCommitAll(t *testing.T) { 15902813473, 15902813474, 12381345, - 12381346). + 12381346, + fakeClusterUUID). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() @@ -183,7 +188,7 @@ func TestBatchCommitAllMultiRecord(t *testing.T) { queueSize: maxQueueSize, } recordRow := flowrecord.FlowRecord{} - fieldCount := reflect.TypeOf(recordRow).NumField() + fieldCount := reflect.TypeOf(recordRow).NumField() + 1 argList := make([]driver.Value, fieldCount) for i := 0; i < len(argList); i++ { argList[i] = sqlmock.AnyArg() @@ -216,7 +221,7 @@ func TestBatchCommitAllError(t *testing.T) { } recordRow := flowrecord.FlowRecord{} chExportProc.deque.PushBack(&recordRow) - fieldCount := reflect.TypeOf(recordRow).NumField() + fieldCount := reflect.TypeOf(recordRow).NumField() + 1 argList := make([]driver.Value, fieldCount) for i := 0; i < len(argList); i++ { argList[i] = sqlmock.AnyArg() diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index 80faa98c300..cdb9b948567 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -18,6 +18,7 @@ import ( "os" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "antrea.io/antrea/pkg/flowaggregator/clickhouseclient" @@ -41,10 +42,14 @@ func buildClickHouseInput(opt *options.Options) clickhouseclient.ClickHouseInput } } -func NewClickHouseExporter(opt *options.Options) (*ClickHouseExporter, error) { +func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) { chInput := buildClickHouseInput(opt) klog.InfoS("ClickHouse configuration", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval) - chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput) + clusterUUID, err := getClusterUUID(k8sClient) + if err != nil { + return nil, err + } + chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, clusterUUID.String()) if err != nil { return nil, err } diff --git a/pkg/flowaggregator/exporter/clickhouse_test.go b/pkg/flowaggregator/exporter/clickhouse_test.go index f4530e1fdf2..e9d816c622d 100644 --- a/pkg/flowaggregator/exporter/clickhouse_test.go +++ b/pkg/flowaggregator/exporter/clickhouse_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,8 +55,10 @@ func TestClickHouse_UpdateOptions(t *testing.T) { }, ClickHouseCommitInterval: 8 * time.Second, } - clickHouseExporter, err := NewClickHouseExporter(opt) + chInput := buildClickHouseInput(opt) + chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, uuid.New().String()) require.NoError(t, err) + clickHouseExporter := ClickHouseExporter{chInput: &chInput, chExportProcess: chExportProcess} clickHouseExporter.Start() assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "tcp://clickhouse-clickhouse.flow-visibility.svc:9000?username=default&password=default&database=default&debug=true&compress=false") assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "8s") diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index 9e19af90199..4b6253ac2ec 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -18,17 +18,14 @@ import ( "fmt" "hash/fnv" "sync" - "time" "github.com/google/uuid" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/exporter" ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "antrea.io/antrea/pkg/clusteridentity" "antrea.io/antrea/pkg/flowaggregator/infoelements" "antrea.io/antrea/pkg/flowaggregator/options" "antrea.io/antrea/pkg/ipfix" @@ -59,31 +56,11 @@ type IPFIXExporter struct { // genObservationDomainID generates an IPFIX Observation Domain ID when one is not provided by the // user through the flow aggregator configuration. It will first try to generate one // deterministically based on the cluster UUID (if available, with a timeout of 10s). Otherwise, it -// will generate a random one. The cluster UUID should be available if Antrea is deployed to the -// cluster ahead of the flow aggregator, which is the expectation since when deploying flow -// aggregator as a Pod, networking needs to be configured by the CNI plugin. +// will generate a random one. func genObservationDomainID(k8sClient kubernetes.Interface) uint32 { - const retryInterval = time.Second - const timeout = 10 * time.Second - const defaultAntreaNamespace = "kube-system" - - clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( - defaultAntreaNamespace, - clusteridentity.DefaultClusterIdentityConfigMapName, - k8sClient, - ) - var clusterUUID uuid.UUID - if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { - clusterIdentity, _, err := clusterIdentityProvider.Get() - if err != nil { - return false, nil - } - clusterUUID = clusterIdentity.UUID - return true, nil - }); err != nil { - klog.InfoS( - "Unable to retrieve cluster UUID; will generate a random observation domain ID", "timeout", timeout, "ConfigMapNameSpace", defaultAntreaNamespace, "ConfigMapName", clusteridentity.DefaultClusterIdentityConfigMapName, - ) + clusterUUID, err := getClusterUUID(k8sClient) + if err != nil { + klog.ErrorS(err, "Error when retrieving cluster UUID; will generate a random observation domain ID") clusterUUID = uuid.New() } h := fnv.New32() diff --git a/pkg/flowaggregator/exporter/s3.go b/pkg/flowaggregator/exporter/s3.go index 32c7b888881..9713687a1cc 100644 --- a/pkg/flowaggregator/exporter/s3.go +++ b/pkg/flowaggregator/exporter/s3.go @@ -16,6 +16,7 @@ package exporter import ( ipfixentities "github.com/vmware/go-ipfix/pkg/entities" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "antrea.io/antrea/pkg/flowaggregator/options" @@ -34,10 +35,14 @@ func buildS3Input(opt *options.Options) s3uploader.S3Input { } } -func NewS3Exporter(opt *options.Options) (*S3Exporter, error) { +func NewS3Exporter(k8sClient kubernetes.Interface, opt *options.Options) (*S3Exporter, error) { s3Input := buildS3Input(opt) klog.InfoS("S3Uploader configuration", "bucketName", s3Input.Config.BucketName, "bucketPrefix", s3Input.Config.BucketPrefix, "region", s3Input.Config.Region, "recordFormat", s3Input.Config.RecordFormat, "compress", *s3Input.Config.Compress, "maxRecordsPerFile", s3Input.Config.MaxRecordsPerFile, "uploadInterval", s3Input.UploadInterval) - s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input) + clusterUUID, err := getClusterUUID(k8sClient) + if err != nil { + return nil, err + } + s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, clusterUUID.String()) if err != nil { return nil, err } diff --git a/pkg/flowaggregator/exporter/s3_test.go b/pkg/flowaggregator/exporter/s3_test.go index 6f66b96ff63..427b5340f71 100644 --- a/pkg/flowaggregator/exporter/s3_test.go +++ b/pkg/flowaggregator/exporter/s3_test.go @@ -18,11 +18,13 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "antrea.io/antrea/pkg/config/flowaggregator" "antrea.io/antrea/pkg/flowaggregator/options" + "antrea.io/antrea/pkg/flowaggregator/s3uploader" ) func TestS3_UpdateOptions(t *testing.T) { @@ -40,8 +42,10 @@ func TestS3_UpdateOptions(t *testing.T) { }, S3UploadInterval: 8 * time.Second, } - s3Exporter, err := NewS3Exporter(opt) + s3Input := buildS3Input(opt) + s3UploadProcess, err := s3uploader.NewS3UploadProcess(s3Input, uuid.New().String()) require.NoError(t, err) + s3Exporter := S3Exporter{s3Input: &s3Input, s3UploadProcess: s3UploadProcess} s3Exporter.Start() assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketName(), "defaultBucketName") assert.Equal(t, s3Exporter.s3UploadProcess.GetBucketPrefix(), "defaultBucketPrefix") diff --git a/pkg/flowaggregator/exporter/utils.go b/pkg/flowaggregator/exporter/utils.go new file mode 100644 index 00000000000..ced42593a41 --- /dev/null +++ b/pkg/flowaggregator/exporter/utils.go @@ -0,0 +1,55 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + + "antrea.io/antrea/pkg/clusteridentity" +) + +// getClusterUUID retrieves the cluster UUID (if available, with a timeout of 10s). +// Otherwise, it returns an empty cluster UUID and error. The cluster UUID should +// be available if Antrea is deployed to the cluster ahead of the flow aggregator, +// which is the expectation since when deploying flow aggregator as a Pod, +// networking needs to be configured by the CNI plugin. +func getClusterUUID(k8sClient kubernetes.Interface) (uuid.UUID, error) { + const retryInterval = time.Second + const timeout = 10 * time.Second + const defaultAntreaNamespace = "kube-system" + + clusterIdentityProvider := clusteridentity.NewClusterIdentityProvider( + defaultAntreaNamespace, + clusteridentity.DefaultClusterIdentityConfigMapName, + k8sClient, + ) + var clusterUUID uuid.UUID + if err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + clusterIdentity, _, err := clusterIdentityProvider.Get() + if err != nil { + return false, nil + } + clusterUUID = clusterIdentity.UUID + return true, nil + }); err != nil { + return clusterUUID, fmt.Errorf("unable to retrieve cluster UUID from ConfigMap '%s/%s': timeout after %v", defaultAntreaNamespace, clusteridentity.DefaultClusterIdentityConfigMapName, timeout) + } + return clusterUUID, nil +} diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index 3d79a80b21e..d14a231f072 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -93,11 +93,11 @@ var ( newIPFIXExporter = func(k8sClient kubernetes.Interface, opt *options.Options, registry ipfix.IPFIXRegistry) exporter.Interface { return exporter.NewIPFIXExporter(k8sClient, opt, registry) } - newClickHouseExporter = func(opt *options.Options) (exporter.Interface, error) { - return exporter.NewClickHouseExporter(opt) + newClickHouseExporter = func(k8sClient kubernetes.Interface, opt *options.Options) (exporter.Interface, error) { + return exporter.NewClickHouseExporter(k8sClient, opt) } - newS3Exporter = func(opt *options.Options) (exporter.Interface, error) { - return exporter.NewS3Exporter(opt) + newS3Exporter = func(k8sClient kubernetes.Interface, opt *options.Options) (exporter.Interface, error) { + return exporter.NewS3Exporter(k8sClient, opt) } ) @@ -183,14 +183,14 @@ func NewFlowAggregator( } if opt.Config.ClickHouse.Enable { var err error - fa.clickHouseExporter, err = newClickHouseExporter(opt) + fa.clickHouseExporter, err = newClickHouseExporter(k8sClient, opt) if err != nil { return nil, fmt.Errorf("error when creating ClickHouse export process: %v", err) } } if opt.Config.S3Uploader.Enable { var err error - fa.s3Exporter, err = newS3Exporter(opt) + fa.s3Exporter, err = newS3Exporter(k8sClient, opt) if err != nil { return nil, fmt.Errorf("error when creating S3 export process: %v", err) } @@ -586,7 +586,7 @@ func (fa *flowAggregator) updateFlowAggregator(opt *options.Options) { if fa.clickHouseExporter == nil { klog.InfoS("Enabling ClickHouse") var err error - fa.clickHouseExporter, err = newClickHouseExporter(opt) + fa.clickHouseExporter, err = newClickHouseExporter(fa.k8sClient, opt) if err != nil { klog.ErrorS(err, "Error when creating ClickHouse export process") return @@ -608,7 +608,7 @@ func (fa *flowAggregator) updateFlowAggregator(opt *options.Options) { if fa.s3Exporter == nil { klog.InfoS("Enabling S3Uploader") var err error - fa.s3Exporter, err = newS3Exporter(opt) + fa.s3Exporter, err = newS3Exporter(fa.k8sClient, opt) if err != nil { klog.ErrorS(err, "Error when creating S3 export process") return diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index ec3b5dcbe32..802b435183a 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -264,10 +264,10 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) { newIPFIXExporter = func(kubernetes.Interface, *options.Options, ipfix.IPFIXRegistry) exporter.Interface { return mockIPFIXExporter } - newClickHouseExporter = func(*options.Options) (exporter.Interface, error) { + newClickHouseExporter = func(kubernetes.Interface, *options.Options) (exporter.Interface, error) { return mockClickHouseExporter, nil } - newS3Exporter = func(*options.Options) (exporter.Interface, error) { + newS3Exporter = func(kubernetes.Interface, *options.Options) (exporter.Interface, error) { return mockS3Exporter, nil } @@ -405,10 +405,10 @@ func TestFlowAggregator_Run(t *testing.T) { newIPFIXExporter = func(kubernetes.Interface, *options.Options, ipfix.IPFIXRegistry) exporter.Interface { return mockIPFIXExporter } - newClickHouseExporter = func(*options.Options) (exporter.Interface, error) { + newClickHouseExporter = func(kubernetes.Interface, *options.Options) (exporter.Interface, error) { return mockClickHouseExporter, nil } - newS3Exporter = func(*options.Options) (exporter.Interface, error) { + newS3Exporter = func(kubernetes.Interface, *options.Options) (exporter.Interface, error) { return mockS3Exporter, nil } diff --git a/pkg/flowaggregator/s3uploader/s3uploader.go b/pkg/flowaggregator/s3uploader/s3uploader.go index b239bc988d3..2dab3481e0d 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader.go +++ b/pkg/flowaggregator/s3uploader/s3uploader.go @@ -79,6 +79,7 @@ type S3UploadProcess struct { // s3UploaderAPI wraps the call made by awsS3Uploader s3UploaderAPI S3UploaderAPI nameRand *rand.Rand + clusterUUID string } type S3Input struct { @@ -99,7 +100,7 @@ func (u *S3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, awsS3 return awsS3Uploader.Upload(ctx, input, opts...) } -func NewS3UploadProcess(input S3Input) (*S3UploadProcess, error) { +func NewS3UploadProcess(input S3Input, clusterUUID string) (*S3UploadProcess, error) { config := input.Config cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(config.Region)) if err != nil { @@ -126,6 +127,7 @@ func NewS3UploadProcess(input S3Input) (*S3UploadProcess, error) { awsS3Uploader: awsS3Uploader, s3UploaderAPI: &S3Uploader{}, nameRand: nameRand, + clusterUUID: clusterUUID, } return s3ExportProcess, nil } @@ -303,7 +305,7 @@ func (p *S3UploadProcess) writeRecordToBuffer(record *flowrecord.FlowRecord) { if p.compress { writer = p.gzipWriter } - writeRecord(writer, record) + writeRecord(writer, record, p.clusterUUID) io.WriteString(writer, "\n") p.cachedRecordCount += 1 } @@ -352,7 +354,7 @@ func randSeq(randSrc *rand.Rand, n int) string { return string(b) } -func writeRecord(w io.Writer, r *flowrecord.FlowRecord) { +func writeRecord(w io.Writer, r *flowrecord.FlowRecord, clusterUUID string) { io.WriteString(w, fmt.Sprintf("%d", r.FlowStartSeconds.Unix())) io.WriteString(w, ",") io.WriteString(w, fmt.Sprintf("%d", r.FlowEndSeconds.Unix())) @@ -447,5 +449,7 @@ func writeRecord(w io.Writer, r *flowrecord.FlowRecord) { io.WriteString(w, ",") io.WriteString(w, fmt.Sprintf("%d", r.ReverseThroughputFromDestinationNode)) io.WriteString(w, ",") + io.WriteString(w, clusterUUID) + io.WriteString(w, ",") io.WriteString(w, fmt.Sprintf("%d", time.Now().Unix())) } diff --git a/pkg/flowaggregator/s3uploader/s3uploader_test.go b/pkg/flowaggregator/s3uploader/s3uploader_test.go index 40e3cec0e38..a98016bd02d 100644 --- a/pkg/flowaggregator/s3uploader/s3uploader_test.go +++ b/pkg/flowaggregator/s3uploader/s3uploader_test.go @@ -26,6 +26,7 @@ import ( s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing" "github.com/vmware/go-ipfix/pkg/registry" @@ -35,12 +36,14 @@ import ( flowaggregatortesting "antrea.io/antrea/pkg/flowaggregator/testing" ) -const ( - seed = 1 - recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346" - recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346" +var ( + fakeClusterUUID = uuid.New().String() + recordStrIPv4 = "1637706961,1637706973,1637706974,1637706975,3,10.10.0.79,10.10.0.80,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,10.10.1.10,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID + recordStrIPv6 = "1637706961,1637706973,1637706974,1637706975,3,2001:0:3238:dfe1:63::fefb,2001:0:3238:dfe1:63::fefc,44752,5201,6,823188,30472817041,241333,8982624938,471111,24500996,136211,7083284,perftest-a,antrea-test,k8s-node-control-plane,perftest-b,antrea-test-b,k8s-node-control-plane-b,2001:0:3238:dfe1:64::a,5202,perftest,test-flow-aggregator-networkpolicy-ingress-allow,antrea-test-ns,test-flow-aggregator-networkpolicy-rule,2,1,test-flow-aggregator-networkpolicy-egress-allow,antrea-test-ns-e,test-flow-aggregator-networkpolicy-rule-e,5,4,TIME_WAIT,11,{\"antrea-e2e\":\"perftest-a\",\"app\":\"perftool\"},{\"antrea-e2e\":\"perftest-b\",\"app\":\"perftool\"},15902813472,12381344,15902813473,15902813474,12381345,12381346," + fakeClusterUUID ) +const seed = 1 + type mockS3Uploader struct { testReader *bytes.Buffer testReaderMutex sync.Mutex @@ -83,6 +86,7 @@ func TestCacheRecord(t *testing.T) { maxRecordPerFile: 2, currentBuffer: &bytes.Buffer{}, bufferQueue: make([]*bytes.Buffer, 0, maxNumBuffersPendingUpload), + clusterUUID: fakeClusterUUID, } // First call, cache the record in currentBuffer. @@ -115,6 +119,7 @@ func TestBatchUploadAll(t *testing.T) { buffersToUpload: make([]*bytes.Buffer, 0, maxNumBuffersPendingUpload), s3UploaderAPI: mockS3Uploader, nameRand: nameRand, + clusterUUID: fakeClusterUUID, } testRecord := flowrecordtesting.PrepareTestFlowRecord() s3UploadProc.writeRecordToBuffer(testRecord) @@ -173,6 +178,7 @@ func TestFlowRecordPeriodicCommit(t *testing.T) { buffersToUpload: make([]*bytes.Buffer, 0, maxNumBuffersPendingUpload), s3UploaderAPI: mockS3Uploader, nameRand: nameRand, + clusterUUID: fakeClusterUUID, } testRecord := flowrecordtesting.PrepareTestFlowRecord() s3UploadProc.writeRecordToBuffer(testRecord) @@ -209,6 +215,7 @@ func TestFlushCacheOnStop(t *testing.T) { buffersToUpload: make([]*bytes.Buffer, 0, maxNumBuffersPendingUpload), s3UploaderAPI: mockS3Uploader, nameRand: nameRand, + clusterUUID: fakeClusterUUID, } testRecord := flowrecordtesting.PrepareTestFlowRecord() s3UploadProc.writeRecordToBuffer(testRecord) diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 62bb552601c..e111e15694e 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -1453,5 +1453,6 @@ type ClickHouseFullRow struct { ThroughputFromDestinationNode uint64 `json:"throughputFromDestinationNode,string"` ReverseThroughputFromSourceNode uint64 `json:"reverseThroughputFromSourceNode,string"` ReverseThroughputFromDestinationNode uint64 `json:"reverseThroughputFromDestinationNode,string"` + ClusterUUID string `json:"clusterUUID"` Trusted uint8 `json:"trusted"` }