diff --git a/build/yamls/flow-visibility-e2e.yml b/build/yamls/flow-visibility-e2e.yml index 9e2524768cb..f548fee94a3 100644 --- a/build/yamls/flow-visibility-e2e.yml +++ b/build/yamls/flow-visibility-e2e.yml @@ -319,7 +319,7 @@ spec: spec: containers: - name: clickhouse - image: projects.registry.vmware.com/antrea/theia-clickhouse-server:21.11 + image: projects.registry.vmware.com/antrea/clickhouse-server:23.4 imagePullPolicy: IfNotPresent volumeMounts: - name: clickhouse-configmap-volume diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index a5f641d8ffd..71493301183 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -171,9 +171,9 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/antrea/perftool") FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.1" \ - "projects.registry.vmware.com/antrea/theia-clickhouse-operator:0.18.2" \ - "projects.registry.vmware.com/antrea/theia-metrics-exporter:0.18.2" \ - "projects.registry.vmware.com/antrea/theia-clickhouse-server:21.11") + "projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \ + "projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \ + "projects.registry.vmware.com/antrea/clickhouse-server:23.4") if $coverage; then manifest_args="$manifest_args --coverage" COMMON_IMAGES_LIST+=("antrea/antrea-ubuntu-coverage:latest") @@ -245,7 +245,10 @@ function run_test { $FLOWAGGREGATOR_YML_CMD | docker exec -i kind-control-plane dd of=/root/flow-aggregator.yml fi cat $FLOW_VISIBILITY_YML | docker exec -i kind-control-plane dd of=/root/flow-visibility.yml - curl -o $CH_OPERATOR_YML https://raw.githubusercontent.com/antrea-io/theia/main/build/charts/theia/crds/clickhouse-operator-install-bundle.yaml + curl -o $CH_OPERATOR_YML https://raw.githubusercontent.com/Altinity/clickhouse-operator/release-0.21.0/deploy/operator/clickhouse-operator-install-bundle.yaml + sed -i -e "s|\"image\": \"clickhouse/clickhouse-server:22.3\"|\"image\": \"projects.registry.vmware.com/antrea/clickhouse-server:23.4\"|g" $CH_OPERATOR_YML + sed -i -e "s|image: altinity/clickhouse-operator:0.21.0|image: projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0|g" $CH_OPERATOR_YML + sed -i -e "s|image: altinity/metrics-exporter:0.21.0|image: projects.registry.vmware.com/antrea/metrics-exporter:0.21.0|g" $CH_OPERATOR_YML cat $CH_OPERATOR_YML | docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yml fi diff --git a/go.mod b/go.mod index 5c79043db89..4eafb2c4098 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( antrea.io/libOpenflow v0.10.2 antrea.io/ofnet v0.7.1 - github.com/ClickHouse/clickhouse-go v1.5.4 + github.com/ClickHouse/clickhouse-go/v2 v2.6.1 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/Mellanox/sriovnet v1.1.0 github.com/Microsoft/go-winio v0.6.1 @@ -87,8 +87,10 @@ require ( ) require ( + github.com/ClickHouse/ch-go v0.51.2 // indirect github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/VividCortex/ewma v1.2.0 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.4 // indirect @@ -108,12 +110,11 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenk/hub v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.1.3 // indirect + github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/cenkalti/hub v1.0.1 // indirect github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect - github.com/containerd/cgroups v1.0.1 // indirect + github.com/containerd/cgroups v1.0.4 // indirect github.com/contiv/libovsdb v0.0.0-20170227191248-d0061a53e358 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect @@ -124,6 +125,8 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.14.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect @@ -149,6 +152,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/josharian/native v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.15.14 // indirect github.com/kr/fs v0.1.0 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect @@ -162,6 +166,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + github.com/paulmach/orb v0.8.0 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pion/dtls/v2 v2.2.4 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/transport/v2 v2.0.0 // indirect @@ -172,6 +178,8 @@ require ( github.com/rivo/uniseg v0.2.0 // indirect github.com/safchain/ethtool v0.0.0-20210803160452-9aa261dae9b1 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/shopspring/decimal v1.3.1 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect github.com/streamrail/concurrent-map v0.0.0-20160823150647-8bf1e9bacbf6 // indirect github.com/ti-mo/netfilter v0.3.1 // indirect @@ -183,16 +191,16 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.0 // indirect - go.opentelemetry.io/otel v1.10.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 // indirect - go.opentelemetry.io/otel/metric v0.31.0 // indirect - go.opentelemetry.io/otel/sdk v1.10.0 // indirect - go.opentelemetry.io/otel/trace v1.10.0 // indirect + go.opentelemetry.io/otel v1.11.2 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 // indirect + go.opentelemetry.io/otel/metric v0.34.0 // indirect + go.opentelemetry.io/otel/sdk v1.11.2 // indirect + go.opentelemetry.io/otel/trace v1.11.2 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect - go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect golang.org/x/term v0.8.0 // indirect diff --git a/go.sum b/go.sum index 68f60a98337..b2b93fc8db0 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,10 @@ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbt github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= -github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/ClickHouse/ch-go v0.51.2 h1:PesdqjUImi21U61yPKsDhfer8wiQ3geTsjdjZzXd/3s= +github.com/ClickHouse/ch-go v0.51.2/go.mod h1:z+/hEezvvHvRMV/I00CaXBnxOx+td4zRe7HJpBYLwGU= +github.com/ClickHouse/clickhouse-go/v2 v2.6.1 h1:82UzCrD8cYEb/Bs/LOO3dlBZZyL+SlvvH/xwZF25BIU= +github.com/ClickHouse/clickhouse-go/v2 v2.6.1/go.mod h1:SvXuWqDsiHJE3VAn2+3+nz9W9exOSigyskcs4DAcxJQ= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Mellanox/sriovnet v1.1.0 h1:j3KnktNJMHWPTqWXlf27OzQG0ahRO+88NauMjlazyko= @@ -117,6 +119,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/alessio/shellescape v1.2.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves= github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= @@ -175,8 +179,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= -github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk= -github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= @@ -192,8 +194,8 @@ github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3k github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA= github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= -github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= -github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= +github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/hub v1.0.1-0.20140529221144-7be60e186e66/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg= github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs= @@ -218,8 +220,6 @@ github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs= github.com/cilium/ebpf v0.6.2/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= -github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -244,8 +244,9 @@ github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59/go.mod h1:pA0z1 github.com/containerd/cgroups v0.0.0-20200710171044-318312a37340/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo= github.com/containerd/cgroups v0.0.0-20200824123100-0b889c03f102/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo= github.com/containerd/cgroups v0.0.0-20210114181951-8a68de567b68/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= -github.com/containerd/cgroups v1.0.1 h1:iJnMvco9XGvKUvNQkv88bE4uJXxRQH18efbKo9w5vHQ= github.com/containerd/cgroups v1.0.1/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU= +github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= +github.com/containerd/cgroups v1.0.4/go.mod h1:nLNQtsF7Sl2HxNebu77i1R0oDlhiTG+kO4JTrUzo6IA= github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE= @@ -380,8 +381,8 @@ github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNE github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik= github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= @@ -428,6 +429,10 @@ github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2H github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -497,7 +502,6 @@ github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4= -github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= @@ -677,7 +681,6 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/joefitzgerald/rainbow-reporter v0.1.0/go.mod h1:481CNgqmVHQZzdIbN52CupLJyoVwB10FQ/IQlF1pdL8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= @@ -719,6 +722,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= +github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -734,7 +739,6 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linuxkit/virtsock v0.0.0-20201010232012-f8cee7dfc7a3/go.mod h1:3r6x7q95whyfWQpmGZTu3gk3v2YkMi05HEzl7Tf7YEo= github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= @@ -763,7 +767,6 @@ github.com/mattn/go-runewidth v0.0.12 h1:Y41i/hVW3Pgwr8gV+J23B9YEY0zxjptBuCWEaxm github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= -github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -886,13 +889,16 @@ github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3 github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/paulmach/orb v0.8.0 h1:W5XAt5yNPNnhaMNEf0xNSkBMJ1LzOzdk2MRlB6EN0Vs= +github.com/paulmach/orb v0.8.0/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= -github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= -github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pion/dtls/v2 v2.2.4 h1:YSfYwDQgrxMYXLBc/m7PFY5BVtWlNm/DN4qoU2CbcWg= github.com/pion/dtls/v2 v2.2.4/go.mod h1:WGKfxqhrddne4Kg3p11FUMJrynkOY4lb25zHNO49wuw= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -972,7 +978,11 @@ github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= @@ -1119,31 +1129,33 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.3 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0/go.mod h1:h8TWwRAhQpOd0aM5nYsRD8+flnkj+526GEIVlarH7eY= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.0 h1:Ajldaqhxqw/gNzQA45IKFWLdG7jZuXX/wBW1d5qvbUI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.35.0/go.mod h1:9NiG9I2aHTKkcxqCILhjtyNA1QEiCjdBACv4IvrFQ+c= -go.opentelemetry.io/otel v1.10.0 h1:Y7DTJMR6zs1xkS/upamJYk0SxxN4C9AqRd77jmZnyY4= -go.opentelemetry.io/otel v1.10.0/go.mod h1:NbvWjCthWHKBEUMpf0/v8ZRZlni86PpGFEMA9pnQSnQ= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 h1:TaB+1rQhddO1sF71MpZOZAuSPW1klK2M8XxfrBMfK7Y= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0 h1:pDDYmo0QadUPal5fwXoY1pmMpFcdyhXOmL5drCrI3vU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.10.0/go.mod h1:Krqnjl22jUJ0HgMzw5eveuCvFDXY4nSYb4F8t5gdrag= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0 h1:KtiUEhQmj/Pa874bVYKGNVdq8NPKiacPbaRRtgXi+t4= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.10.0/go.mod h1:OfUCyyIiDvNXHWpcWgbF+MWvqPZiNa3YDEnivcnYsV0= -go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs= -go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A= -go.opentelemetry.io/otel/sdk v1.10.0 h1:jZ6K7sVn04kk/3DNUdJ4mqRlGDiXAVuIG+MMENpTNdY= -go.opentelemetry.io/otel/sdk v1.10.0/go.mod h1:vO06iKzD5baltJz1zarxMCNHFpUlUiOy4s65ECtn6kE= -go.opentelemetry.io/otel/trace v1.10.0 h1:npQMbR8o7mum8uF95yFbOEJffhs1sbCOfDh8zAJiH5E= -go.opentelemetry.io/otel/trace v1.10.0/go.mod h1:Sij3YYczqAdz+EhmGhE6TpTxUO5/F/AzrK+kxfGqySM= +go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0= +go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 h1:htgM8vZIF8oPSCxa341e3IZ4yr/sKxgu8KZYllByiVY= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2/go.mod h1:rqbht/LlhVBgn5+k3M5QK96K5Xb0DvXpMJ5SFQpY6uw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 h1:fqR1kli93643au1RKo0Uma3d2aPQKT+WBKfTSBaKbOc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2/go.mod h1:5Qn6qvgkMsLDX+sYK64rHb1FPhpn0UtxF+ouX1uhyJE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 h1:ERwKPn9Aer7Gxsc0+ZlutlH1bEEAUXAUhqm3Y45ABbk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2/go.mod h1:jWZUM2MWhWCJ9J9xVbRx7tzK1mXKpAlze4CeulycwVY= +go.opentelemetry.io/otel/metric v0.34.0 h1:MCPoQxcg/26EuuJwpYN1mZTeCYAUGx8ABxfW07YkjP8= +go.opentelemetry.io/otel/metric v0.34.0/go.mod h1:ZFuI4yQGNCupurTXCwkeD/zHBt+C2bR7bw5JqUm/AP8= +go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU= +go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU= +go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= +go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 1e9865b81cb..2c40a354e7e 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -18,11 +18,11 @@ import ( "context" "database/sql" "fmt" - "strings" + "net/url" "sync" "time" - "github.com/ClickHouse/clickhouse-go" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/gammazero/deque" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" "k8s.io/apimachinery/pkg/util/wait" @@ -90,23 +90,27 @@ const ( // PrepareClickHouseConnection is used for unit testing var PrepareClickHouseConnection = prepareConnection +type protocol string + +const ( + protocolTCP = "tcp" + protocolUnknown = "" +) + type stopPayload struct { flushQueue bool } type ClickHouseExportProcess struct { // db holds sql connection struct to clickhouse db. - db *sql.DB - // dsn is data source name used for connection to clickhouse db. - dsn string + db *sql.DB + config ClickHouseConfig // deque buffers flows records between batch commits. deque *deque.Deque // dequeMutex is for concurrency between adding and removing records from deque. dequeMutex sync.Mutex // queueSize is the max size of deque queueSize int - // commitInterval is the interval between batch commits - commitInterval time.Duration // stopCh is the channel to receive stop message stopCh chan stopPayload // exportWg is to ensure that all messages have been flushed from the queue when we stop @@ -119,7 +123,7 @@ type ClickHouseExportProcess struct { clusterUUID string } -type ClickHouseInput struct { +type ClickHouseConfig struct { Username string Password string Database string @@ -129,44 +133,22 @@ type ClickHouseInput struct { CommitInterval time.Duration } -func (ci *ClickHouseInput) GetDataSourceName() (string, error) { - if len(ci.DatabaseURL) == 0 || len(ci.Username) == 0 || len(ci.Password) == 0 { - return "", fmt.Errorf("URL, Username or Password missing for clickhouse DSN") - } - var sb strings.Builder - sb.WriteString(fmt.Sprintf("%s?username=%s&password=%s", ci.DatabaseURL, ci.Username, ci.Password)) - - if len(ci.Database) > 0 { - sb.WriteString("&database=") - sb.WriteString(ci.Database) - } - if ci.Debug { - sb.WriteString("&debug=true") - } else { - sb.WriteString("&debug=false") - } - if *ci.Compress { - sb.WriteString("&compress=true") - } else { - sb.WriteString("&compress=false") +func NewClickHouseClient(config ClickHouseConfig, clusterUUID string) (*ClickHouseExportProcess, error) { + if len(config.DatabaseURL) == 0 || len(config.Username) == 0 || len(config.Password) == 0 { + return nil, fmt.Errorf("DatabaseURL, Username or Password missing in ClickHouse config") } - return sb.String(), nil -} - -func NewClickHouseClient(input ClickHouseInput, clusterUUID string) (*ClickHouseExportProcess, error) { - dsn, connect, err := PrepareClickHouseConnection(input) + connect, err := PrepareClickHouseConnection(config) if err != nil { return nil, err } chClient := &ClickHouseExportProcess{ - db: connect, - dsn: dsn, - deque: deque.New(), - queueSize: maxQueueSize, - commitInterval: input.CommitInterval, - clusterUUID: clusterUUID, + db: connect, + config: config, + deque: deque.New(), + queueSize: maxQueueSize, + clusterUUID: clusterUUID, } return chClient, nil } @@ -197,7 +179,7 @@ func (ch *ClickHouseExportProcess) startExportProcess() { return } ch.exportProcessRunning = true - ch.commitTicker = time.NewTicker(ch.commitInterval) + ch.commitTicker = time.NewTicker(ch.config.CommitInterval) ch.stopCh = make(chan stopPayload, 1) ch.exportWg.Add(1) go func() { @@ -376,14 +358,10 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco } } -func prepareConnection(input ClickHouseInput) (string, *sql.DB, error) { - dsn, err := input.GetDataSourceName() - if err != nil { - return "", nil, fmt.Errorf("error when parsing ClickHouse DSN: %v", err) - } - connect, err := ConnectClickHouse(dsn) +func prepareConnection(config ClickHouseConfig) (*sql.DB, error) { + connect, err := ConnectClickHouse(&config) if err != nil { - return "", nil, err + return nil, err } // Test open Transaction tx, err := connect.Begin() @@ -391,54 +369,47 @@ func prepareConnection(input ClickHouseInput) (string, *sql.DB, error) { _, err = tx.Prepare(insertQuery) } if err != nil { - return "", nil, fmt.Errorf("error when preparing insert statement, %v", err) + return nil, fmt.Errorf("error when preparing insert statement, %v", err) } _ = tx.Commit() - return dsn, connect, err -} - -func (ch *ClickHouseExportProcess) GetDsnMap() map[string]string { - parseURL := strings.Split(ch.dsn, "?") - m := make(map[string]string) - m["databaseURL"] = parseURL[0] - for _, v := range strings.Split(parseURL[1], "&") { - pair := strings.Split(v, "=") - m[pair[0]] = pair[1] - } - return m + return connect, err } -func (ch *ClickHouseExportProcess) UpdateCH(dsn string, connect *sql.DB) { +func (ch *ClickHouseExportProcess) UpdateCH(config ClickHouseConfig, connect *sql.DB) { ch.stopExportProcess(false) // do not flush the queue defer ch.startExportProcess() ch.mutex.Lock() defer ch.mutex.Unlock() - ch.dsn = dsn + ch.config = config ch.db = connect } func (ch *ClickHouseExportProcess) GetCommitInterval() time.Duration { ch.mutex.Lock() defer ch.mutex.Unlock() - return ch.commitInterval + return ch.config.CommitInterval } func (ch *ClickHouseExportProcess) SetCommitInterval(commitInterval time.Duration) { ch.mutex.Lock() defer ch.mutex.Unlock() - ch.commitInterval = commitInterval + ch.config.CommitInterval = commitInterval if ch.commitTicker != nil { - ch.commitTicker.Reset(ch.commitInterval) + ch.commitTicker.Reset(ch.config.CommitInterval) } } -func (ch *ClickHouseExportProcess) GetDsn() string { +func (ch *ClickHouseExportProcess) GetClickHouseConfig() ClickHouseConfig { ch.mutex.Lock() defer ch.mutex.Unlock() - return ch.dsn + return ch.config } -func ConnectClickHouse(url string) (*sql.DB, error) { +func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { + _, addr, err := parseDatabaseURL(config.DatabaseURL) + if err != nil { + return nil, err + } var connect *sql.DB var connErr error connRetryInterval := 1 * time.Second @@ -447,12 +418,23 @@ func ConnectClickHouse(url string) (*sql.DB, error) { // Connect to ClickHouse in a loop if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) { // Open the database and ping it - var err error - connect, err = sql.Open("clickhouse", url) - if err != nil { - connErr = fmt.Errorf("error when opening DB connection: %v", err) - return false, nil + opt := clickhouse.Options{ + Addr: []string{addr}, + Auth: clickhouse.Auth{ + Username: config.Username, + Password: config.Password, + Database: config.Database, + }, + } + var compression clickhouse.CompressionMethod + if *config.Compress { + compression = clickhouse.CompressionLZ4 + } + opt.Compression = &clickhouse.Compression{ + Method: compression, } + + connect = clickhouse.OpenDB(&opt) if err := connect.Ping(); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { connErr = fmt.Errorf("failed to ping ClickHouse: %v", exception.Message) @@ -468,3 +450,20 @@ func ConnectClickHouse(url string) (*sql.DB, error) { } return connect, nil } + +func parseDatabaseURL(dbUrl string) (protocol, string, error) { + u, err := url.Parse(dbUrl) + if err != nil { + return protocolUnknown, "", fmt.Errorf("failed to parse ClickHouse database URL: %w", err) + } + if u.Path != "" || u.RawQuery != "" || u.User != nil { + return protocolUnknown, "", fmt.Errorf("invalid ClickHouse database URL '%s': path, query string or user info should not be set", dbUrl) + } + proto := u.Scheme + switch proto { + case protocolTCP: + return protocolTCP, u.Host, nil + default: + return protocolUnknown, "", fmt.Errorf("connection over %s transport protocol is not supported", proto) + } +} diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index a88f1fef89a..48fecfe3356 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -43,39 +43,6 @@ func init() { var fakeClusterUUID = uuid.New().String() -func TestGetDataSourceName(t *testing.T) { - chInput := ClickHouseInput{ - Username: "username", - Password: "password", - Database: "default", - DatabaseURL: "tcp://click-house-svc:9000", - Debug: true, - Compress: new(bool), - CommitInterval: 1 * time.Second, - } - *chInput.Compress = true - dsn := "tcp://click-house-svc:9000?username=username&password=password&database=default&debug=true&compress=true" - - chInputInvalid := ClickHouseInput{} - - testcases := []struct { - input ClickHouseInput - expectedDSN string - expectedErr bool - }{ - {chInput, dsn, false}, - {chInputInvalid, "", true}, - } - - for _, tc := range testcases { - dsn, err := tc.input.GetDataSourceName() - if tc.expectedErr { - assert.Errorf(t, err, "ClickHouseInput %v unexpectedly returns no error when getting DSN", tc.input) - } - assert.Equal(t, tc.expectedDSN, dsn) - } -} - func TestCacheRecord(t *testing.T) { ctrl := gomock.NewController(t) @@ -284,10 +251,10 @@ func TestFlushCacheOnStop(t *testing.T) { const commitInterval = time.Hour chExportProc := ClickHouseExportProcess{ - db: db, - deque: deque.New(), - queueSize: maxQueueSize, - commitInterval: commitInterval, + db: db, + config: ClickHouseConfig{CommitInterval: commitInterval}, + deque: deque.New(), + queueSize: maxQueueSize, } recordRow := flowrecordtesting.PrepareTestFlowRecord() @@ -319,10 +286,10 @@ func TestUpdateCH(t *testing.T) { const commitInterval = 100 * time.Millisecond chExportProc := ClickHouseExportProcess{ - db: db1, - deque: deque.New(), - queueSize: maxQueueSize, - commitInterval: commitInterval, + db: db1, + config: ClickHouseConfig{CommitInterval: commitInterval}, + deque: deque.New(), + queueSize: maxQueueSize, } recordRow := flowrecordtesting.PrepareTestFlowRecord() @@ -351,7 +318,7 @@ func TestUpdateCH(t *testing.T) { mock2.ExpectCommit() t.Logf("Calling UpdateCH to update DB connection") - chExportProc.UpdateCH("", db2) + chExportProc.UpdateCH(ClickHouseConfig{CommitInterval: commitInterval}, db2) func() { chExportProc.dequeMutex.Lock() @@ -364,3 +331,49 @@ func TestUpdateCH(t *testing.T) { return (err == nil), nil }), "timeout while waiting for second flow record to be committed (after DB connection update)") } + +func TestParseDatabaseURL(t *testing.T) { + testcases := []struct { + url string + expectedProto protocol + expectedAddr string + expectedErrMsg string + }{ + { + url: "tcp://127.0.0.1:9000", + expectedProto: protocolTCP, + expectedAddr: "127.0.0.1:9000", + expectedErrMsg: "", + }, + { + url: "abc://127.0.0.1:9000", + expectedErrMsg: "connection over abc transport protocol is not supported", + }, + { + url: "127.0.0.1:9000", + expectedErrMsg: "failed to parse ClickHouse database URL", + }, + { + url: "tcp://user:password@127.0.0.1:9000", + expectedErrMsg: "invalid ClickHouse database URL", + }, + { + url: "tcp://127.0.0.1:9000/path", + expectedErrMsg: "invalid ClickHouse database URL", + }, + { + url: "tcp://127.0.0.1:9000?key=value", + expectedErrMsg: "invalid ClickHouse database URL", + }, + } + for _, tc := range testcases { + proto, addr, err := parseDatabaseURL(tc.url) + if tc.expectedErrMsg != "" { + assert.Contains(t, err.Error(), tc.expectedErrMsg) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expectedProto, proto) + assert.Equal(t, tc.expectedAddr, addr) + } + } +} diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index cdb9b948567..3d42da9aa04 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -16,6 +16,7 @@ package exporter import ( "os" + "reflect" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" "k8s.io/client-go/kubernetes" @@ -26,12 +27,12 @@ import ( ) type ClickHouseExporter struct { - chInput *clickhouseclient.ClickHouseInput + chConfig *clickhouseclient.ClickHouseConfig chExportProcess *clickhouseclient.ClickHouseExportProcess } -func buildClickHouseInput(opt *options.Options) clickhouseclient.ClickHouseInput { - return clickhouseclient.ClickHouseInput{ +func buildClickHouseConfig(opt *options.Options) clickhouseclient.ClickHouseConfig { + return clickhouseclient.ClickHouseConfig{ Username: os.Getenv("CH_USERNAME"), Password: os.Getenv("CH_PASSWORD"), Database: opt.Config.ClickHouse.Database, @@ -43,18 +44,18 @@ func buildClickHouseInput(opt *options.Options) clickhouseclient.ClickHouseInput } func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) { - chInput := buildClickHouseInput(opt) - klog.InfoS("ClickHouse configuration", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval) + chConfig := buildClickHouseConfig(opt) + klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval) clusterUUID, err := getClusterUUID(k8sClient) if err != nil { return nil, err } - chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, clusterUUID.String()) + chExportProcess, err := clickhouseclient.NewClickHouseClient(chConfig, clusterUUID.String()) if err != nil { return nil, err } return &ClickHouseExporter{ - chInput: &chInput, + chConfig: &chConfig, chExportProcess: chExportProcess, }, nil } @@ -73,21 +74,24 @@ func (e *ClickHouseExporter) Stop() { } func (e *ClickHouseExporter) UpdateOptions(opt *options.Options) { - chInput := buildClickHouseInput(opt) - dsn, connect, err := clickhouseclient.PrepareClickHouseConnection(chInput) + chConfig := buildClickHouseConfig(opt) + connect, err := clickhouseclient.PrepareClickHouseConnection(chConfig) if err != nil { klog.ErrorS(err, "Error when checking new connection") return } - if dsn == e.chExportProcess.GetDsn() && chInput.CommitInterval == e.chExportProcess.GetCommitInterval() { + if reflect.DeepEqual(chConfig, e.chExportProcess.GetClickHouseConfig()) { return } klog.InfoS("Updating ClickHouse") - if chInput.CommitInterval != e.chExportProcess.GetCommitInterval() { - e.chExportProcess.SetCommitInterval(chInput.CommitInterval) + if chConfig.CommitInterval != e.chExportProcess.GetCommitInterval() { + e.chExportProcess.SetCommitInterval(chConfig.CommitInterval) } - if dsn != e.chExportProcess.GetDsn() { - e.chExportProcess.UpdateCH(dsn, connect) + // When a new commitInterval was updated through + // e.chExportProcess.SetCommitInterval, the following + // e.chExportProcess.UpdateCH will not be called. + if !reflect.DeepEqual(chConfig, e.chExportProcess.GetClickHouseConfig()) { + e.chExportProcess.UpdateCH(chConfig, connect) } - klog.InfoS("New ClickHouse configuration", "database", chInput.Database, "databaseURL", chInput.DatabaseURL, "debug", chInput.Debug, "compress", *chInput.Compress, "commitInterval", chInput.CommitInterval) + klog.InfoS("New ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval) } diff --git a/pkg/flowaggregator/exporter/clickhouse_test.go b/pkg/flowaggregator/exporter/clickhouse_test.go index e9d816c622d..75703b9c1e5 100644 --- a/pkg/flowaggregator/exporter/clickhouse_test.go +++ b/pkg/flowaggregator/exporter/clickhouse_test.go @@ -35,9 +35,8 @@ func TestClickHouse_UpdateOptions(t *testing.T) { defer os.Unsetenv("CH_USERNAME") defer os.Unsetenv("CH_PASSWORD") PrepareClickHouseConnectionSaved := clickhouseclient.PrepareClickHouseConnection - clickhouseclient.PrepareClickHouseConnection = func(input clickhouseclient.ClickHouseInput) (string, *sql.DB, error) { - dsn, _ := input.GetDataSourceName() - return dsn, nil, nil + clickhouseclient.PrepareClickHouseConnection = func(input clickhouseclient.ClickHouseConfig) (*sql.DB, error) { + return nil, nil } defer func() { clickhouseclient.PrepareClickHouseConnection = PrepareClickHouseConnectionSaved @@ -55,13 +54,14 @@ func TestClickHouse_UpdateOptions(t *testing.T) { }, ClickHouseCommitInterval: 8 * time.Second, } - chInput := buildClickHouseInput(opt) - chExportProcess, err := clickhouseclient.NewClickHouseClient(chInput, uuid.New().String()) + chConfig := buildClickHouseConfig(opt) + chExportProcess, err := clickhouseclient.NewClickHouseClient(chConfig, uuid.New().String()) require.NoError(t, err) - clickHouseExporter := ClickHouseExporter{chInput: &chInput, chExportProcess: chExportProcess} + clickHouseExporter := ClickHouseExporter{chConfig: &chConfig, chExportProcess: chExportProcess} clickHouseExporter.Start() - assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "tcp://clickhouse-clickhouse.flow-visibility.svc:9000?username=default&password=default&database=default&debug=true&compress=false") + assert.Equal(t, clickHouseExporter.chExportProcess.GetClickHouseConfig(), chConfig) assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "8s") + compress = true newOpt := &options.Options{ Config: &flowaggregator.FlowAggregatorConfig{ @@ -75,8 +75,9 @@ func TestClickHouse_UpdateOptions(t *testing.T) { }, ClickHouseCommitInterval: 5 * time.Second, } + newChConfig := buildClickHouseConfig(newOpt) clickHouseExporter.UpdateOptions(newOpt) - assert.Equal(t, clickHouseExporter.chExportProcess.GetDsn(), "databaseTestURL?username=default&password=default&database=databaseTest&debug=false&compress=true") + assert.Equal(t, clickHouseExporter.chExportProcess.GetClickHouseConfig(), newChConfig) assert.Equal(t, clickHouseExporter.chExportProcess.GetCommitInterval().String(), "5s") clickHouseExporter.Stop() } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 7b6395e6cd7..f5e296ae742 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -794,10 +794,18 @@ func (data *TestData) deployFlowVisibilityClickHouse() (string, error) { } // check clickhouse service http port for service connectivity - chSvc, err := data.GetService("flow-visibility", "clickhouse-clickhouse") - if err != nil { - return "", err + var chSvc *corev1.Service + if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (bool, error) { + chSvc, err = data.GetService(flowVisibilityNamespace, "clickhouse-clickhouse") + if err != nil { + return false, nil + } else { + return true, nil + } + }); err != nil { + return "", fmt.Errorf("timeout waiting for ClickHouse Service: %v", err) } + if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (bool, error) { rc, stdout, stderr, err := testData.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("curl -Ss %s:%s", chSvc.Spec.ClusterIP, clickHouseHTTPPort))