Skip to content

Commit

Permalink
Merge pull request #1816 from ClickHouse/clientv2_buffer_optimization
Browse files Browse the repository at this point in the history
[client-v2] reader buffer optimization
  • Loading branch information
chernser authored Sep 10, 2024
2 parents f3a4ab8 + ba490d5 commit dfb161b
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 232 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: |
mvn --batch-mode -DclickhouseVersion=$PREFERRED_LTS_VERSION \
mvn --batch-mode -DclickhouseVersion=$PREFERRED_LTS_VERSION -Dclient.tests.useNewImplementation=true \
-Panalysis verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar
continue-on-error: true
54 changes: 44 additions & 10 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.data_formats.internal.MapBackedRecord;
import com.clickhouse.client.api.data_formats.internal.ProcessParser;
import com.clickhouse.client.api.enums.Protocol;
Expand All @@ -20,6 +21,7 @@
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.insert.POJOSerializer;
import com.clickhouse.client.api.insert.SerializerNotFoundException;
import com.clickhouse.client.api.internal.BasicObjectsPool;
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -753,6 +756,22 @@ public Builder setMaxRetries(int maxRetries) {
return this;
}

/**
* Configures client to reuse allocated byte buffers for numbers. It affects how binary format reader is working.
* If set to 'true' then {@link Client#newBinaryFormatReader(QueryResponse)} will construct reader that will
* reuse buffers for numbers. It improves performance for large datasets by reducing number of allocations
* (therefore GC pressure).
* Enabling this feature is safe because each reader suppose to be used by a single thread and readers are not reused.
*
* Default is false.
* @param reuse - if to reuse buffers
* @return
*/
public Builder allowBinaryReaderToReuseBuffers(boolean reuse) {
this.configuration.put("client_allow_binary_reader_to_reuse_buffers", String.valueOf(reuse));
return this;
}

public Client build() {
setDefaults();

Expand Down Expand Up @@ -866,6 +885,10 @@ private void setDefaults() {
if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) {
setMaxRetries(3);
}

if (!configuration.containsKey("client_allow_binary_reader_to_reuse_buffers")) {
allowBinaryReaderToReuseBuffers(false);
}
}
}

Expand Down Expand Up @@ -1442,10 +1465,10 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
settings.waitEndOfQuery(true); // we rely on the summery

final QuerySettings finalSettings = settings;
return query(sqlQuery, settings).thenApply(response -> {
try {
return new Records(response, finalSettings);

return new Records(response, newBinaryFormatReader(response));
} catch (Exception e) {
throw new ClientException("Failed to get query response", e);
}
Expand All @@ -1462,13 +1485,14 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
public List<GenericRecord> queryAll(String sqlQuery) {
try {
int operationTimeout = getOperationTimeout();
QuerySettings settings = new QuerySettings().waitEndOfQuery(true);
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.waitEndOfQuery(true);
try (QueryResponse response = operationTimeout == 0 ? query(sqlQuery, settings).get() :
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
List<GenericRecord> records = new ArrayList<>();
if (response.getResultRows() > 0) {
RowBinaryWithNamesAndTypesFormatReader reader =
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
(RowBinaryWithNamesAndTypesFormatReader) newBinaryFormatReader(response);

Map<String, Object> record;
while (reader.readRecord((record = new LinkedHashMap<>()))) {
Expand Down Expand Up @@ -1569,28 +1593,38 @@ public CompletableFuture<CommandResponse> execute(String sql) {
* @param schema
* @return
*/
public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response, TableSchema schema) {
ClickHouseBinaryFormatReader reader = null;
// Using caching buffer allocator is risky so this parameter is not exposed to the user
boolean useCachingBufferAllocator = MapUtils.getFlag(configuration, "client_allow_binary_reader_to_reuse_buffers");
BinaryStreamReader.ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
new BinaryStreamReader.CachingByteBufferAllocator() :
new BinaryStreamReader.DefaultByteBufferAllocator();

switch (response.getFormat()) {
case Native:
reader = new NativeFormatReader(response.getInputStream(), response.getSettings());
reader = new NativeFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool);
break;
case RowBinaryWithNamesAndTypes:
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());
reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings(),
byteBufferPool);
break;
case RowBinaryWithNames:
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema);
reader = new RowBinaryWithNamesFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool);
break;
case RowBinary:
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema);
reader = new RowBinaryFormatReader(response.getInputStream(), response.getSettings(), schema,
byteBufferPool);
break;
default:
throw new IllegalArgumentException("Unsupported format: " + response.getFormat());
}
return reader;
}

public static ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
public ClickHouseBinaryFormatReader newBinaryFormatReader(QueryResponse response) {
return newBinaryFormatReader(response, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Map;
import java.util.UUID;

public interface ClickHouseBinaryFormatReader {
public interface ClickHouseBinaryFormatReader extends AutoCloseable {

/**
* Reads a single value from the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ public class NativeFormatReader extends AbstractBinaryFormatReader {

private int blockRowIndex;

public NativeFormatReader(InputStream inputStream) {
this(inputStream, null);
}

public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
super(inputStream, settings, null);
public NativeFormatReader(InputStream inputStream, QuerySettings settings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, settings, null, byteBufferAllocator);
readNextRecord();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.clickhouse.client.api.data_formats;

import com.clickhouse.client.api.data_formats.internal.AbstractBinaryFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;
Expand All @@ -12,12 +13,9 @@

public class RowBinaryFormatReader extends AbstractBinaryFormatReader {

public RowBinaryFormatReader(InputStream inputStream, TableSchema schema) {
this(inputStream, null, schema);
}

public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
super(inputStream, querySettings, schema);
public RowBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, schema, byteBufferAllocator);
readNextRecord();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -17,8 +16,9 @@

public class RowBinaryWithNamesAndTypesFormatReader extends AbstractBinaryFormatReader implements Iterator<Map<String, Object>> {

public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings) {
super(inputStream, querySettings, null);
public RowBinaryWithNamesAndTypesFormatReader(InputStream inputStream, QuerySettings querySettings,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, null, byteBufferAllocator);
readSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.data.ClickHouseColumn;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class RowBinaryWithNamesFormatReader extends AbstractBinaryFormatReader {

private List<String> columns = null;

public RowBinaryWithNamesFormatReader(InputStream inputStream, TableSchema schema) {
this(inputStream, null, schema);
readNextRecord();
}

public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
super(inputStream, querySettings, schema);
public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
super(inputStream, querySettings, schema, byteBufferAllocator);
int nCol = 0;
try {
nCol = BinaryStreamReader.readVarInt(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.internal.BasicObjectsPool;
import com.clickhouse.client.api.internal.MapUtils;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.query.NullValueException;
Expand Down Expand Up @@ -41,6 +42,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {

Expand All @@ -58,7 +60,8 @@ public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryForm

private volatile boolean hasNext = true;

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema) {
protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,
BinaryStreamReader.ByteBufferAllocator byteBufferAllocator) {
this.input = inputStream;
this.settings = querySettings == null ? Collections.emptyMap() : new HashMap<>(querySettings.getAllSettings());
boolean useServerTimeZone = (boolean) this.settings.get(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey());
Expand All @@ -67,7 +70,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
if (timeZone == null) {
throw new ClientException("Time zone is not set. (useServerTimezone:" + useServerTimeZone + ")");
}
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG);
this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator);
setSchema(schema);
}

Expand Down Expand Up @@ -133,12 +136,12 @@ protected void readNextRecord() {
try {
nextRecordEmpty.set(true);
if (!readRecord(nextRecord)) {
hasNext = false;
endReached();
} else {
nextRecordEmpty.compareAndSet(true, false);
}
} catch (IOException e) {
hasNext = false;
endReached();
throw new ClientException("Failed to read next row", e);
}
}
Expand All @@ -165,7 +168,7 @@ public Map<String, Object> next() {
return null;
}
} catch (IOException e) {
hasNext = false;
endReached();
throw new ClientException("Failed to read row", e);
}
}
Expand Down Expand Up @@ -621,4 +624,9 @@ public LocalDateTime getLocalDateTime(int index) {
}
return (LocalDateTime) value;
}

@Override
public void close() throws Exception {
input.close();
}
}
Loading

0 comments on commit dfb161b

Please sign in to comment.