Skip to content

Commit

Permalink
Auto close idle connections (#999)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkaflik authored May 17, 2023
1 parent 78780a8 commit 807157d
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 9 deletions.
39 changes: 37 additions & 2 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ func Open(opt *Options) (driver.Conn, error) {
opt = &Options{}
}
o := opt.setDefaults()
return &clickhouse{
conn := &clickhouse{
opt: o,
idle: make(chan *connect, o.MaxIdleConns),
open: make(chan struct{}, o.MaxOpenConns),
}, nil
}
go conn.startAutoCloseIdleConnections()
return conn, nil
}

type clickhouse struct {
Expand Down Expand Up @@ -277,6 +279,39 @@ func (ch *clickhouse) acquire(ctx context.Context) (conn *connect, err error) {
return conn, nil
}

func (ch *clickhouse) startAutoCloseIdleConnections() {
ticker := time.NewTicker(ch.opt.ConnMaxLifetime)
defer ticker.Stop()

for {
select {
case <-ticker.C:
ch.closeIdleExpired()
}
}
}

func (ch *clickhouse) closeIdleExpired() {
cutoff := time.Now().Add(-ch.opt.ConnMaxLifetime)
for {
select {
case conn := <-ch.idle:
if conn.connectedAt.Before(cutoff) {
conn.close()
} else {
select {
case ch.idle <- conn:
default:
conn.close()
}
return
}
default:
return
}
}
}

func (ch *clickhouse) release(conn *connect, err error) {
if conn.released {
return
Expand Down
2 changes: 1 addition & 1 deletion tests/client_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestClientInfo(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
opts := clientOptionsFromEnv(env, clickhouse.Settings{})
opts := ClientOptionsFromEnv(env, clickhouse.Settings{})
opts.ClientInfo = testCase.clientInfo

conn, err := clickhouse.Open(&opts)
Expand Down
57 changes: 56 additions & 1 deletion tests/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -247,7 +248,7 @@ func TestConnCustomDialStrategy(t *testing.T) {
env, err := GetTestEnvironment(testSet)
require.NoError(t, err)

opts := clientOptionsFromEnv(env, clickhouse.Settings{})
opts := ClientOptionsFromEnv(env, clickhouse.Settings{})
validAddr := opts.Addr[0]
opts.Addr = []string{"invalid.host:9001"}

Expand Down Expand Up @@ -346,3 +347,57 @@ func TestCustomSettings(t *testing.T) {
require.Equal(t, "custom_query_value", setting)
})
}

func TestConnectionExpiresIdleConnection(t *testing.T) {
runInDocker, _ := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_DOCKER", "true"))
if !runInDocker {
t.Skip("Skip test in cloud environment. This test is not stable in cloud environment, due to race conditions.")
}

// given
ctx := context.Background()
testEnv, err := GetTestEnvironment(testSet)
require.NoError(t, err)

baseConn, err := TestClientWithDefaultSettings(testEnv)
require.NoError(t, err)

expectedConnections := getActiveConnections(t, baseConn)

// when the client is configured to expire idle connections after 1/10 of a second
opts := ClientOptionsFromEnv(testEnv, clickhouse.Settings{})
opts.MaxIdleConns = 20
opts.MaxOpenConns = 20
opts.ConnMaxLifetime = time.Second / 10
conn, err := clickhouse.Open(&opts)
require.NoError(t, err)

// run 1000 queries in parallel
var wg sync.WaitGroup
const selectToRunAtOnce = 1000
for i := 0; i < selectToRunAtOnce; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r, err := conn.Query(ctx, "SELECT 1")
require.NoError(t, err)

r.Close()
}()
}
wg.Wait()

// then we expect that all connections will be closed when they are idle
// retrying for 10 seconds to make sure that the connections are closed
assert.Eventuallyf(t, func() bool {
return getActiveConnections(t, baseConn) == expectedConnections
}, time.Second*10, opts.ConnMaxLifetime, "expected connections to be reset back to %d", expectedConnections)
}

func getActiveConnections(t *testing.T, client clickhouse.Conn) (conns int64) {
ctx := context.Background()
r := client.QueryRow(ctx, "SELECT sum(value) as conns FROM system.metrics WHERE metric LIKE '%Connection'")
require.NoError(t, r.Err())
require.NoError(t, r.Scan(&conns))
return conns
}
55 changes: 55 additions & 0 deletions tests/issues/957_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package issues

import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func Test957(t *testing.T) {
// given
ctx := context.Background()
testEnv, err := clickhouse_tests.GetTestEnvironment(testSet)
require.NoError(t, err)

// when the client is configured to use the test environment
opts := clickhouse_tests.ClientOptionsFromEnv(testEnv, clickhouse.Settings{})
opts.Debug = true
opts.Debugf = func(format string, v ...interface{}) {
t.Logf(format, v...)
}
// and the client is configured to have only 1 connection
opts.MaxIdleConns = 2
opts.MaxOpenConns = 1
// and the client is configured to have a connection lifetime of 1/10 of a second
opts.ConnMaxLifetime = time.Second / 10
conn, err := clickhouse.Open(&opts)
require.NoError(t, err)

// then the client should be able to execute queries for 1 second
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
_, err := conn.Query(ctx, "SELECT 1")
require.NoError(t, err)
}
}
2 changes: 1 addition & 1 deletion tests/query_parameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestQueryParameters(t *testing.T) {

env, err := GetTestEnvironment(testSet)
require.NoError(t, err)
client, err := testClientWithDefaultSettings(env)
client, err := TestClientWithDefaultSettings(env)
require.NoError(t, err)
defer client.Close()

Expand Down
2 changes: 1 addition & 1 deletion tests/read_only_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestReadOnlyUser(t *testing.T) {

env, err := GetTestEnvironment(testSet)
require.NoError(t, err)
client, err := testClientWithDefaultSettings(env)
client, err := TestClientWithDefaultSettings(env)
require.NoError(t, err)
defer client.Close()

Expand Down
6 changes: 3 additions & 3 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func GetExternalTestEnvironment(testSet string) (ClickHouseTestEnvironment, erro
return env, nil
}

func clientOptionsFromEnv(env ClickHouseTestEnvironment, settings clickhouse.Settings) clickhouse.Options {
func ClientOptionsFromEnv(env ClickHouseTestEnvironment, settings clickhouse.Settings) clickhouse.Options {
timeout, err := strconv.Atoi(GetEnv("CLICKHOUSE_DIAL_TIMEOUT", "10"))
if err != nil {
timeout = 10
Expand Down Expand Up @@ -284,11 +284,11 @@ func clientOptionsFromEnv(env ClickHouseTestEnvironment, settings clickhouse.Set
}

func testClientWithDefaultOptions(env ClickHouseTestEnvironment, settings clickhouse.Settings) (driver.Conn, error) {
opts := clientOptionsFromEnv(env, settings)
opts := ClientOptionsFromEnv(env, settings)
return clickhouse.Open(&opts)
}

func testClientWithDefaultSettings(env ClickHouseTestEnvironment) (driver.Conn, error) {
func TestClientWithDefaultSettings(env ClickHouseTestEnvironment) (driver.Conn, error) {
settings := clickhouse.Settings{}

if proto.CheckMinVersion(proto.Version{
Expand Down

0 comments on commit 807157d

Please sign in to comment.