Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Query API #1580

Merged
merged 6 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
package com.clickhouse.data;

import java.io.Serializable;
import java.lang.reflect.Array;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;

import com.clickhouse.data.value.ClickHouseArrayValue;
import com.clickhouse.data.value.ClickHouseBigDecimalValue;
import com.clickhouse.data.value.ClickHouseBigIntegerValue;
Expand Down Expand Up @@ -46,6 +35,17 @@
import com.clickhouse.data.value.array.ClickHouseLongArrayValue;
import com.clickhouse.data.value.array.ClickHouseShortArrayValue;

import java.io.Serializable;
import java.lang.reflect.Array;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;

/**
* This class represents a column defined in database.
*/
Expand Down
32 changes: 25 additions & 7 deletions client-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@
<scope>provided</scope>
</dependency>

<!-- <dependency>-->
<!-- <groupId>${project.parent.groupId}</groupId>-->
<!-- <artifactId>clickhouse-client</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- <type>test-jar</type>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- START: Temporary test dependencies -->
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>clickhouse-client</artifactId>
<version>${revision}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- END: Temporary test dependencies -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand All @@ -96,13 +99,28 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
43 changes: 43 additions & 0 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseParameterizedQuery;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.internal.TableSchemaParser;
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseFormat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class Client {
public static final int TIMEOUT = 30_000;
Expand Down Expand Up @@ -92,4 +101,38 @@ public boolean ping(int timeout) {
ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
return clientPing.ping(getServerNode(), timeout);
}

/**
* Sends data query to the server and returns a reference to a result descriptor.
* Control is returned when server accepted the query and started processing it.
* <br/>
* The caller should use {@link ClickHouseParameterizedQuery} to render the `sqlQuery` with parameters.
*
*
* @param sqlQuery - complete SQL query.
* @param settings
* @return
*/
public Future<QueryResponse> query(String sqlQuery, Map<String, Object> qparams, QuerySettings settings) {
ClickHouseClient clientQuery = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseRequest request = clientQuery.read(getServerNode());
request.query(sqlQuery, settings.getQueryID());
// TODO: convert qparams to map[string, string]
request.params(qparams);
return CompletableFuture.completedFuture(new QueryResponse(clientQuery.execute(request)));
}

public TableSchema getTableSchema(String table, String database) {
try (ClickHouseClient clientQuery = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest request = clientQuery.read(getServerNode());
// XML - because java has a built-in XML parser. Will consider CSV later.
request.query("DESCRIBE TABLE " + table + " FORMAT " + ClickHouseFormat.TSKV.name());
TableSchema tableSchema = new TableSchema();
try {
return new TableSchemaParser().createFromBinaryResponse(clientQuery.execute(request).get(), table, database);
} catch (Exception e) {
throw new RuntimeException("Failed to get table schema", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.clickhouse.client.api.data_formats;

import com.clickhouse.data.ClickHouseRecord;

import java.util.function.Consumer;

public interface RecordReader {

/**
* Check if the reader can read the specified data format.
*
* @param dataFormat
* @return true if the reader can read the specified data format, false otherwise
*/
boolean canRead(String dataFormat);

/**
* Read a batch of records from a stream.
*
* @param size the maximum number of records to read
* @param consumer the consumer to process the records
* @param errorHandler the consumer to handle exceptions
* @return true if there are more records to read, false otherwise
*/
boolean readBatch(int size, Consumer<ClickHouseRecord> consumer, Consumer<Exception> errorHandler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.clickhouse.client.api.data_formats;

import com.clickhouse.data.ClickHouseInputStream;
import com.clickhouse.data.ClickHouseRecord;

import java.util.function.Consumer;

public class RowBinaryReader implements RecordReader {

private final ClickHouseInputStream inputStream;

public RowBinaryReader(ClickHouseInputStream inputStream) {
this.inputStream = inputStream;
}

@Override
public boolean readBatch(int size, Consumer<ClickHouseRecord> consumer, Consumer<Exception> errorHandler) {
// TODO: implementation of record reader will get raw stream from response and will read records from it
return false;
}

@Override
public boolean canRead(String dataFormat) {
return "RowBinary".equalsIgnoreCase(dataFormat);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.clickhouse.client.api.internal;

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.api.metadata.TableSchema;

import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;

public class TableSchemaParser {

public TableSchemaParser() {
}

public TableSchema createFromBinaryResponse(ClickHouseResponse response, String tableName, String databaseName) {
TableSchema schema = new TableSchema();
schema.setTableName(tableName);
schema.setDatabaseName(databaseName);
Properties p = new Properties();
response.records().forEach(record -> {
String values = record.getValue(0).asString().replaceAll("\t", "\n");
try {
p.clear();
p.load(new StringReader(values));
schema.addColumn(p.getProperty("name"), p.getProperty("type"));
} catch ( IOException e) {
throw new RuntimeException("Failed to parse table schema", e);
}
});
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.clickhouse.client.api.metadata;

import com.clickhouse.data.ClickHouseColumn;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TableSchema {

private String tableName = "";

private String databaseName = "";

private List<ClickHouseColumn> columns;

private Map<String, Map<String, Object>> metadata;

public TableSchema() {
this.metadata = new HashMap<>();
this.columns = new ArrayList<>();
}

/**
* Returns unmodifiable collection of columns.
*
* @return - collection of columns in the table
*/
public List<ClickHouseColumn> getColumns() {
return Collections.unmodifiableList(columns);
}

public String getDatabaseName() {
return databaseName;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}

public void addColumn(String name, String type) {
columns.add(ClickHouseColumn.of(name, type));
metadata.computeIfAbsent(name, k -> new HashMap<>()).put("type", type);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.clickhouse.client.api.query;

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.data.ClickHouseInputStream;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Response class provides interface to input stream of response data.
* <br/>
* It is used to read data from ClickHouse server.
* It is used to get response metadata like errors, warnings, etc.
*
* This class is for the following user cases:
* <ul>
* <li>Full read. User does conversion from record to custom object</li>
* <li>Full read. No conversion to custom object. List of generic records is returned. </li>
* <li>Iterative read. One record is returned at a time</li>
* </ul>
*
*
*/
public class QueryResponse {

private final Future<ClickHouseResponse> responseRef;

private long completeTimeout = TimeUnit.MINUTES.toMillis(1);

public QueryResponse(Future<ClickHouseResponse> responseRef) {
this.responseRef = responseRef;
}

public boolean isDone() {
return responseRef.isDone();
}

public void ensureDone() {
if (!isDone()) {
try {
responseRef.get(completeTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e); // TODO: handle exception
}
}
}

public ClickHouseInputStream getInputStream() {
ensureDone();
try {
return responseRef.get().getInputStream();
} catch (Exception e) {
throw new RuntimeException(e); // TODO: handle exception
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.clickhouse.client.api.query;


import com.clickhouse.client.config.ClickHouseClientOption;

import java.util.HashMap;
import java.util.Map;

public class QuerySettings {

private Map<String, Object> rawSettings;

public QuerySettings() {
this.rawSettings = new HashMap<>();
}

public QuerySettings setSetting(String key, Object value) {
rawSettings.put(key, value);
return this;
}
public Object getSetting(String key) {
return rawSettings.get(key);
}
public QuerySettings appendToSetting(String key, Object value) {
rawSettings.put(key, value);
return this;
}

public QuerySettings setFormat(String format) {
rawSettings.put(ClickHouseClientOption.FORMAT.getKey(), format);
return this;
}

public String getFormat() {
return (String) rawSettings.get(ClickHouseClientOption.FORMAT.getKey());
}

public QuerySettings setQueryID(String queryID) {
rawSettings.put("query_id", queryID);
return this;
}
public String getQueryID() {
return (String) rawSettings.get("query_id");
}
}
Loading
Loading