Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved queryAll() to use less mem #1779

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1421,10 +1422,11 @@ public List<GenericRecord> queryAll(String sqlQuery) {
query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) {
List<GenericRecord> records = new ArrayList<>();
if (response.getResultRows() > 0) {
ClickHouseBinaryFormatReader reader =
RowBinaryWithNamesAndTypesFormatReader reader =
new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings());

Map<String, Object> record;
while ((record = reader.next()) != null) {
while (reader.readRecord((record = new LinkedHashMap<>()))) {
records.add(new MapBackedRecord(record, reader.getSchema()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public NativeFormatReader(InputStream inputStream, QuerySettings settings) {
}

@Override
protected boolean readRecord(Map<String, Object> record) throws IOException {
public boolean readRecord(Map<String, Object> record) throws IOException {
if (currentBlock == null || blockRowIndex >= currentBlock.getnRows()) {
if (!readBlock()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -38,6 +39,7 @@
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {

Expand Down Expand Up @@ -70,7 +72,18 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
protected Map<String, Object> nextRecord = new ConcurrentHashMap<>();


protected boolean readRecord(Map<String, Object> record) throws IOException {
/**
* It is still internal method and should be used with care.
* Usually this method is called to read next record into internal object and affects hasNext() method.
* So after calling this one:
* - hasNext(), next() should not be called
* - stream should be read with readRecord() method fully
*
* @param record
* @return
* @throws IOException
*/
public boolean readRecord(Map<String, Object> record) throws IOException {
boolean firstColumn = true;
for (ClickHouseColumn column : getSchema().getColumns()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,4 +346,14 @@ public LocalDateTime getLocalDateTime(String colName) {
public LocalDateTime getLocalDateTime(int index) {
return reader.getLocalDateTime(index);
}

@Override
public Object getObject(String colName) {
return reader.readValue(colName);
}

@Override
public Object getObject(int index) {
return reader.readValue(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,4 +480,14 @@ public LocalDateTime getLocalDateTime(String colName) {
public LocalDateTime getLocalDateTime(int index) {
return readValue(index);
}

@Override
public Object getObject(String colName) {
return readValue(colName);
}

@Override
public Object getObject(int index) {
return readValue(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

public interface GenericRecord {


/**
* Reads column with name `colName` as a string.
*
Expand Down Expand Up @@ -486,4 +485,8 @@ public interface GenericRecord {
LocalDateTime getLocalDateTime(String colName);

LocalDateTime getLocalDateTime(int index);

Object getObject(String colName);

Object getObject(int index);
}
100 changes: 72 additions & 28 deletions client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.DataTypeUtils;
import com.clickhouse.client.api.ServerException;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.data_formats.NativeFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.metadata.TableSchema;
import com.clickhouse.client.api.metrics.ClientMetrics;
Expand Down Expand Up @@ -52,19 +52,17 @@
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand All @@ -77,6 +75,7 @@
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.IntStream;
import java.util.stream.Collectors;

public class QueryTests extends BaseIntegrationTest {

Expand Down Expand Up @@ -150,16 +149,19 @@ public void testSimpleQueryWithTSV() {

@Test(groups = {"integration"})
public void testReadRecords() throws Exception {
prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
List<Map<String, Object>> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);

Records records = client.queryRecords("SELECT * FROM " + DATASET_TABLE).get(3, TimeUnit.SECONDS);
Assert.assertEquals(records.getResultRows(), 10, "Unexpected number of rows");

Iterator<Map<String, Object>> dataIterator = dataset.iterator();
for (GenericRecord record : records) {
System.out.println(record.getLong(1)); // UInt32 column col1
System.out.println(record.getInteger(2)); // Int32 column col2
System.out.println(record.getString(3)); // string column col3
System.out.println(record.getLong(4)); // Int64 column col4
System.out.println(record.getString(5)); // string column col5
Map<String,Object> dsRecords = dataIterator.next();
Assert.assertEquals(record.getLong("col1"), dsRecords.get("col1"));
Assert.assertEquals(record.getInteger("col2"), dsRecords.get("col2"));
Assert.assertEquals(record.getString("col3"), dsRecords.get("col3"));
Assert.assertEquals(record.getLong("col4"), dsRecords.get("col4"));
Assert.assertEquals(record.getString("col5"), dsRecords.get("col5"));
}
}

Expand All @@ -179,6 +181,33 @@ public void testBigUnsignedInt() throws Exception {
Assert.assertEquals(firstRecord.getBigInteger("i256"), expected256);
}

@Test(groups = {"integration"})
public void testReadRecordsWithStreamAPI() throws Exception {
final int tables = 10;

Set<String> expectedTableNames = new HashSet<>();
for (int i = 0; i < tables; i++) {
final String tableName = "a_" + i;
expectedTableNames.add(tableName);
client.execute("DROP TABLE IF EXISTS default." + tableName);
client.execute("CREATE TABLE " + tableName +" (x UInt32) ENGINE = Memory");
}

Records records = client.queryRecords("SHOW TABLES").get(3, TimeUnit.SECONDS);

HashSet<String> tableNames = new HashSet<>();
records.forEach(r -> {
tableNames.add(r.getString(1));
});
Assert.assertTrue(tableNames.containsAll(expectedTableNames));

Assert.expectThrows(IllegalStateException.class, () -> {
records.forEach(r -> {
System.out.println(r);
});
});
}

@Test(groups = {"integration"})
public void testReadRecordsGetFirstRecord() throws Exception {
prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
Expand All @@ -200,12 +229,20 @@ public void testReadRecordsNoResult() throws Exception {

@Test(groups = {"integration"})
public void testQueryAll() throws Exception {
testQueryAll(10);
}
public void testQueryAll(int numberOfRecords) throws Exception {
prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, numberOfRecords);
GenericRecord hostnameRecord = client.queryAll("SELECT hostname()").stream().findFirst().get();
Assert.assertNotNull(hostnameRecord);
List<Map<String, Object>> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10);
List<GenericRecord> records = client.queryAll("SELECT * FROM " + DATASET_TABLE + " LIMIT " + dataset.size());
Assert.assertFalse(records.isEmpty());

for (String colDefinition : DATASET_COLUMNS) {
// result values
String colName = colDefinition.split(" ")[0];
List<Object> colValues = records.stream().map(r -> r.getObject(colName)).collect(Collectors.toList());
Assert.assertEquals(colValues.size(), dataset.size());

// dataset values
List<Object> dataValue = dataset.stream().map(d -> d.get(colName)).collect(Collectors.toList());
Assert.assertEquals(colValues, dataValue, "Failed for column " + colName);
}
}

@Test(groups = {"integration"})
Expand All @@ -223,6 +260,24 @@ public void testQueryAllNoResult() throws Exception {
Assert.assertTrue(records.isEmpty());
}

@Test
public void testQueryAllTableNames() {
final int tables = 10;
Set<String> expectedTableNames = new HashSet<>();
for (int i = 0; i < tables; i++) {
final String tableName = "a_" + i;
expectedTableNames.add(tableName);
client.execute("DROP TABLE IF EXISTS default." + tableName);
client.execute("CREATE TABLE " + tableName +" (x UInt32) ENGINE = Memory");
}

List<GenericRecord> records = client.queryAll("SHOW TABLES");
Assert.assertTrue(records.size() >= tables);

Set<String> tableNames = records.stream().map(r -> r.getString(1)).collect(Collectors.toSet());
Assert.assertTrue(tableNames.containsAll(expectedTableNames));
}

@Test(groups = {"integration"})
public void testQueryJSON() throws ExecutionException, InterruptedException {
Map<String, Object> datasetRecord = prepareSimpleDataSet();
Expand Down Expand Up @@ -975,7 +1030,6 @@ public void testDataTypes(List<String> columns, List<Supplier<String>> valueGene
colIndex++;
try {
verifier.accept(reader);
System.out.println("Verified " + colIndex);
} catch (Exception e) {
Assert.fail("Failed to verify " + columns.get(colIndex), e);
}
Expand All @@ -1000,8 +1054,6 @@ public void testQueryMetrics() throws Exception {

// Stats should be available after the query is done
OperationMetrics metrics = response.getMetrics();
System.out.println("Server read rows: " + metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong());
System.out.println("Client stats: " + metrics.getMetric(ClientMetrics.OP_DURATION).getLong());

Assert.assertEquals(metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong(), 10); // 10 rows in the table
Assert.assertEquals(metrics.getMetric(ServerMetrics.RESULT_ROWS).getLong(), 3);
Expand All @@ -1018,8 +1070,6 @@ public void testQueryMetrics() throws Exception {
response = client.query(insertStmtBuilder.toString(), settings).get();

metrics = response.getMetrics();
System.out.println("Server read rows: " + metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong());
System.out.println("Client stats: " + metrics.getMetric(ClientMetrics.OP_DURATION).getLong());

Assert.assertEquals(metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong(), rowsToInsert); // 10 rows in the table
Assert.assertEquals(metrics.getMetric(ServerMetrics.RESULT_ROWS).getLong(), rowsToInsert);
Expand Down Expand Up @@ -1087,7 +1137,6 @@ private List<Map<String, Object>> prepareDataSet(String table, List<String> colu
insertStmtBuilder.append("), ");
data.add(values);
}
System.out.println("Insert statement: " + insertStmtBuilder);
request = client.write(getServer(ClickHouseProtocol.HTTP))
.query(insertStmtBuilder.toString());
try (ClickHouseResponse response = request.executeAndWait()) {}
Expand Down Expand Up @@ -1232,9 +1281,7 @@ public void testServerTimeZoneFromHeader() {
reader.next();

LocalDateTime serverTime = reader.getLocalDateTime(1);
System.out.println("Server time: " + serverTime);
LocalDateTime serverUtcTime = reader.getLocalDateTime(2);
System.out.println("Server UTC time: " + serverUtcTime);

ZonedDateTime serverTimeZ = serverTime.atZone(ZoneId.of(requestTimeZone));
ZonedDateTime serverUtcTimeZ = serverUtcTime.atZone(ZoneId.of("UTC"));
Expand Down Expand Up @@ -1265,11 +1312,8 @@ public void testClientUseOwnTimeZone() {
reader.next();

LocalDateTime serverTime = reader.getLocalDateTime(1); // in "America/Los_Angeles"
System.out.println("Server time: " + serverTime);
LocalDateTime serverUtcTime = reader.getLocalDateTime(2);
System.out.println("Server UTC time: " + serverUtcTime);
ZonedDateTime serverLisbonTime = reader.getZonedDateTime(3); // in "Europe/Lisbon"
System.out.println("Server Lisbon time: " + serverLisbonTime);

ZonedDateTime serverTimeZ = serverTime.atZone(ZoneId.of("America/Los_Angeles"));
ZonedDateTime serverUtcTimeZ = serverUtcTime.atZone(ZoneId.of("UTC"));
Expand Down
Loading