Skip to content

Commit

Permalink
Retrieve protocol from databaseURL
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <[email protected]>
  • Loading branch information
heanlan committed Jun 2, 2023
1 parent 5183762 commit 220b323
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build/charts/flow-aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Kubernetes: `>= 1.16.0-0`
| clickHouse.commitInterval | string | `"8s"` | CommitInterval is the periodical interval between batch commit of flow records to DB. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| clickHouse.compress | bool | `true` | Compress enables lz4 compression when committing flow records. |
| clickHouse.connectionSecret | object | `{"password":"clickhouse_operator_password","username":"clickhouse_operator"}` | Credentials to connect to ClickHouse. They will be stored in a Secret. |
| clickHouse.databaseURL | string | `"clickhouse-clickhouse.flow-visibility.svc:9000"` | DatabaseURL is the url to the database. Currently only TCP is supported. |
| clickHouse.databaseURL | string | `"tcp://clickhouse-clickhouse.flow-visibility.svc:9000"` | DatabaseURL is the url to the database. TCP protocol is required. |
| clickHouse.debug | bool | `false` | Debug enables debug logs from ClickHouse sql driver. |
| clickHouse.enable | bool | `false` | Determine whether to enable exporting flow records to ClickHouse. |
| flowAggregatorAddress | string | `""` | Provide an extra DNS name or IP address of flow aggregator for generating TLS certificate. |
Expand Down
2 changes: 1 addition & 1 deletion build/charts/flow-aggregator/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ clickHouse:
# Database is the name of database where Antrea "flows" table is created.
database: "default"

# DatabaseURL is the url to the database. Currently only TCP is supported.
# DatabaseURL is the url to the database. TCP protocol is required.
databaseURL: {{ .Values.clickHouse.databaseURL | quote }}

# Debug enables debug logs from ClickHouse sql driver.
Expand Down
4 changes: 2 additions & 2 deletions build/charts/flow-aggregator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ flowCollector:
clickHouse:
# -- Determine whether to enable exporting flow records to ClickHouse.
enable: false
# -- DatabaseURL is the url to the database. Currently only TCP is supported.
databaseURL: "clickhouse-clickhouse.flow-visibility.svc:9000"
# -- DatabaseURL is the url to the database. TCP protocol is required.
databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
# -- Debug enables debug logs from ClickHouse sql driver.
debug: false
# -- Compress enables lz4 compression when committing flow records.
Expand Down
4 changes: 2 additions & 2 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ data:
# Database is the name of database where Antrea "flows" table is created.
database: "default"
# DatabaseURL is the url to the database. Currently only TCP is supported.
databaseURL: "clickhouse-clickhouse.flow-visibility.svc:9000"
# DatabaseURL is the url to the database. TCP protocol is required.
databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
# Debug enables debug logs from ClickHouse sql driver.
debug: false
Expand Down
6 changes: 3 additions & 3 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ it is deployed following the [deployment steps](#deployment-steps-1), the
ClickHouse server is already exposed via a K8s Service, and no further
configuration is required. If a different FQDN or IP is desired, please use
the URL for `clickHouse.databaseURL` in the following format:
`<ClickHouse server FQDN or IP>:<ClickHouse TCP port>`.
`tcp://<ClickHouse server FQDN or IP>:<ClickHouse TCP port>`.

```yaml
flow-aggregator.conf: |
Expand Down Expand Up @@ -357,8 +357,8 @@ flow-aggregator.conf: |
# Database is the name of database where Antrea "flows" table is created.
database: "default"
# DatabaseURL is the url to the database. Currently only TCP is supported.
databaseURL: "clickhouse-clickhouse.flow-visibility.svc:9000"
# DatabaseURL is the url to the database. TCP protocol is required.
databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
# Debug enables debug logs from ClickHouse sql driver.
debug: false
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type ClickHouseConfig struct {
// Database is the name of database where Antrea "flows" table is created.
Database string `yaml:"database,omitempty"`
// DatabaseURL is the url to the database. TCP protocol is required.
// Defaults to "clickhouse-clickhouse.flow-visibility.svc:9000"
// Defaults to "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
DatabaseURL string `yaml:"databaseURL,omitempty"`
// Debug enables debug logs from ClickHouse sql driver. Defaults to false.
Debug bool `yaml:"debug,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
DefaultClickHouseDatabase = "default"
DefaultClickHouseCommitInterval = "8s"
MinClickHouseCommitInterval = 1 * time.Second
DefaultClickHouseDatabaseUrl = "clickhouse-clickhouse.flow-visibility.svc:9000"
DefaultClickHouseDatabaseUrl = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"

DefaultS3Region = "us-west-2"
DefaultS3RecordFormat = "CSV"
Expand Down
28 changes: 27 additions & 1 deletion pkg/flowaggregator/clickhouseclient/clickhouseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"net/url"
"sync"
"time"

Expand Down Expand Up @@ -89,6 +90,13 @@ const (
// PrepareClickHouseConnection is used for unit testing
var PrepareClickHouseConnection = prepareConnection

type protocol string

const (
protocolTCP = "tcp"
protocolUnknown = ""
)

type stopPayload struct {
flushQueue bool
}
Expand Down Expand Up @@ -398,6 +406,10 @@ func (ch *ClickHouseExportProcess) GetClickHouseConfig() ClickHouseConfig {
}

func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) {
_, addr, err := parseDatabaseURL(config.DatabaseURL)
if err != nil {
return nil, err
}
var connect *sql.DB
var connErr error
connRetryInterval := 1 * time.Second
Expand All @@ -407,7 +419,7 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) {
if err := wait.PollImmediate(connRetryInterval, connTimeout, func() (bool, error) {
// Open the database and ping it
opt := clickhouse.Options{
Addr: []string{config.DatabaseURL},
Addr: []string{addr},
Auth: clickhouse.Auth{
Username: config.Username,
Password: config.Password,
Expand Down Expand Up @@ -438,3 +450,17 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) {
}
return connect, nil
}

func parseDatabaseURL(dbUrl string) (protocol, string, error) {
u, err := url.Parse(dbUrl)
if err != nil {
return protocolUnknown, "", fmt.Errorf("failed to parse ClickHouse database URL: %v", err)
}
proto := u.Scheme
switch proto {
case protocolTCP:
return protocolTCP, u.Host, nil
default:
return protocolUnknown, "", fmt.Errorf("connection over %s transport protocol is not supported", proto)
}
}
38 changes: 38 additions & 0 deletions pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,41 @@ func TestUpdateCH(t *testing.T) {
return (err == nil), nil
}), "timeout while waiting for second flow record to be committed (after DB connection update)")
}

func TestParseDatabaseURL(t *testing.T) {
testcases := []struct {
url string
expectedProto protocol
expectedAddr string
expectedErr error
}{
{
url: "tcp://127.0.0.1:9000",
expectedProto: protocolTCP,
expectedAddr: "127.0.0.1:9000",
expectedErr: nil,
},
{
url: "abc://127.0.0.1:9000",
expectedProto: protocolUnknown,
expectedAddr: "",
expectedErr: fmt.Errorf("connection over abc transport protocol is not supported"),
},
{
url: "127.0.0.1:9000",
expectedProto: protocolUnknown,
expectedAddr: "",
expectedErr: fmt.Errorf("failed to parse ClickHouse database URL"),
},
}
for _, tc := range testcases {
proto, addr, err := parseDatabaseURL(tc.url)
if tc.expectedErr != nil {
assert.ErrorContains(t, err, tc.expectedErr.Error())
} else {
assert.Nil(t, err)
}
assert.Equal(t, tc.expectedProto, proto)
assert.Equal(t, tc.expectedAddr, addr)
}
}
2 changes: 1 addition & 1 deletion pkg/flowaggregator/exporter/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestClickHouse_UpdateOptions(t *testing.T) {
ClickHouse: flowaggregator.ClickHouseConfig{
Enable: true,
Database: "default",
DatabaseURL: "clickhouse-clickhouse.flow-visibility.svc:9000",
DatabaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000",
Debug: true,
Compress: &compress,
},
Expand Down

0 comments on commit 220b323

Please sign in to comment.