Skip to content

Commit

Permalink
[Feat] Append the url's database and parameters to newly created conn…
Browse files Browse the repository at this point in the history
…ection #142

[Enhance] Append the url's database and parameters to newly created connection #142
  • Loading branch information
czy006 authored Aug 27, 2024
2 parents 279b6ac + 6f409ba commit 7e34551
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public ClickHouseCatalog(
checkArgument(!isNullOrWhitespaceOnly(username), "username cannot be null or empty");
checkArgument(!isNullOrWhitespaceOnly(password), "password cannot be null or empty");

this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.baseUrl = baseUrl;
this.username = username;
this.password = password;
this.ignorePrimaryKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,7 +36,7 @@
import java.util.Properties;

import static java.util.stream.Collectors.toList;
import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.getActualHttpPort;
import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.getClusterSpec;

/** ClickHouse connection provider. Use ClickHouseDriver to create a connection. */
public class ClickHouseConnectionProvider implements Serializable {
Expand All @@ -47,9 +45,6 @@ public class ClickHouseConnectionProvider implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(ClickHouseConnectionProvider.class);

private static final String QUERY_CLUSTER_INFO_SQL =
"SELECT shard_num, host_address, port FROM system.clusters WHERE cluster = ? ORDER BY shard_num, replica_num ASC";

private final ClickHouseConnectionOptions options;

private final Properties connectionProperties;
Expand All @@ -74,17 +69,19 @@ public boolean isConnectionValid() throws SQLException {

public synchronized ClickHouseConnection getOrCreateConnection() throws SQLException {
if (connection == null) {
connection = createConnection(options.getUrl(), options.getDatabaseName());
connection = createConnection(options.getUrl());
}
return connection;
}

public synchronized Map<Integer, ClickHouseConnection> createShardConnections(
ClusterSpec clusterSpec, String defaultDatabase) throws SQLException {
Map<Integer, ClickHouseConnection> connectionMap = new HashMap<>();
String urlSuffix = options.getUrlSuffix();
for (ShardSpec shardSpec : clusterSpec.getShards()) {
String shardUrl = shardSpec.getJdbcUrls() + urlSuffix;
ClickHouseConnection connection =
createAndStoreShardConnection(shardSpec.getJdbcUrls(), defaultDatabase);
createAndStoreShardConnection(shardUrl, defaultDatabase);
connectionMap.put(shardSpec.getNum(), connection);
}

Expand All @@ -97,35 +94,29 @@ public synchronized ClickHouseConnection createAndStoreShardConnection(
shardConnections = new ArrayList<>();
}

ClickHouseConnection connection = createConnection(url, database);
ClickHouseConnection connection = createConnection(url);
shardConnections.add(connection);
return connection;
}

public List<String> getShardUrls(String remoteCluster) throws SQLException {
Map<Long, List<String>> shardsMap = new HashMap<>();
Map<Integer, String> shardsMap = new HashMap<>();
ClickHouseConnection conn = getOrCreateConnection();
try (PreparedStatement stmt = conn.prepareStatement(QUERY_CLUSTER_INFO_SQL)) {
stmt.setString(1, remoteCluster);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String host = rs.getString("host_address");
int port = getActualHttpPort(host, rs.getInt("port"));
List<String> shardUrls =
shardsMap.computeIfAbsent(
rs.getLong("shard_num"), k -> new ArrayList<>());
shardUrls.add(host + ":" + port);
}
}
ClusterSpec clusterSpec = getClusterSpec(conn, remoteCluster);
String urlSuffix = options.getUrlSuffix();
for (ShardSpec shardSpec : clusterSpec.getShards()) {
String shardUrl = shardSpec.getJdbcUrls() + urlSuffix;
shardsMap.put(shardSpec.getNum(), shardUrl);
}

return shardsMap.values().stream()
.map(urls -> "jdbc:ch://" + String.join(",", urls))
return shardsMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(toList());
}

private ClickHouseConnection createConnection(String url, String database) throws SQLException {
LOG.info("connecting to {}, database {}", url, database);
private ClickHouseConnection createConnection(String url) throws SQLException {
LOG.info("connecting to {}", url);
Properties configuration = new Properties();
configuration.putAll(connectionProperties);
if (options.getUsername().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@

package org.apache.flink.connector.clickhouse.internal.options;

import org.apache.flink.annotation.VisibleForTesting;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.EMPTY;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

/** ClickHouse connection options. */
public class ClickHouseConnectionOptions implements Serializable {

private static final long serialVersionUID = 1L;

public static final Pattern URL_PATTERN =
Pattern.compile("[^/]+//[^/?]+(/(?<database>[^?]*))?(\\?(?<param>\\S+))?");

private final String url;

private final String username;
Expand All @@ -37,6 +47,12 @@ public class ClickHouseConnectionOptions implements Serializable {

private final String tableName;

// For testing.
@VisibleForTesting
public ClickHouseConnectionOptions(String url) {
this(url, null, null, null, null);
}

protected ClickHouseConnectionOptions(
String url,
@Nullable String username,
Expand All @@ -50,6 +66,23 @@ protected ClickHouseConnectionOptions(
this.tableName = tableName;
}

/**
* The format of the URL suffix is as follows: {@code
* [/<database>][?param1=value1&param2=value2]}.
*/
public String getUrlSuffix() {
Matcher matcher = URL_PATTERN.matcher(url);
if (!matcher.find()) {
return EMPTY;
}

String database = matcher.group("database");
String param = matcher.group("param");
database = isNullOrWhitespaceOnly(database) ? EMPTY : "/" + database;
param = isNullOrWhitespaceOnly(param) ? EMPTY : "?" + param;
return database + param;
}

public String getUrl() {
return this.url;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.connector.clickhouse;

import org.apache.flink.connector.clickhouse.internal.options.ClickHouseConnectionOptions;
import org.apache.flink.connector.clickhouse.internal.partitioner.ValuePartitioner;
import org.apache.flink.connector.clickhouse.internal.schema.ClusterSpec;
import org.apache.flink.connector.clickhouse.internal.schema.ShardSpec;
Expand Down Expand Up @@ -218,4 +219,23 @@ public void parseEngineFullTest() {
requireNonNull(parseShardingKey(matcher.group("shardingKey"))).explain());
}
}

@Test
public void parseJdbcUriTest() {
String[] urls = {
"jdbc:ch://localhost:8123",
"jdbc:ch://localhost:8123?",
"jdbc:ch://localhost:8123?ssl=true&sslmode=STRICT",
"jdbc:ch://localhost:8123/",
"jdbc:ch://localhost:8123/?ssl=true&sslmode=STRICT",
"jdbc:ch://localhost:8123/default?ssl=true&sslmode=STRICT",
"jdbc:ch://localhost:8123,127.0.0.1:8123/default?ssl=true&sslmode=STRICT"
};

for (String url : urls) {
String urlSuffix = new ClickHouseConnectionOptions(url).getUrlSuffix();
String urlPrefix = url.substring(0, url.lastIndexOf(urlSuffix));
assertEquals(url, urlPrefix + urlSuffix);
}
}
}

0 comments on commit 7e34551

Please sign in to comment.