diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 506c326f9..ec35e64b3 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -67,6 +67,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -1421,10 +1422,11 @@ public List queryAll(String sqlQuery) { query(sqlQuery, settings).get(operationTimeout, TimeUnit.MILLISECONDS)) { List records = new ArrayList<>(); if (response.getResultRows() > 0) { - ClickHouseBinaryFormatReader reader = + RowBinaryWithNamesAndTypesFormatReader reader = new RowBinaryWithNamesAndTypesFormatReader(response.getInputStream(), response.getSettings()); + Map record; - while ((record = reader.next()) != null) { + while (reader.readRecord((record = new LinkedHashMap<>()))) { records.add(new MapBackedRecord(record, reader.getSchema())); } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java index 5e9123db3..8a69d3656 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java @@ -33,7 +33,7 @@ public NativeFormatReader(InputStream inputStream, QuerySettings settings) { } @Override - protected boolean readRecord(Map record) throws IOException { + public boolean readRecord(Map record) throws IOException { if (currentBlock == null || blockRowIndex >= currentBlock.getnRows()) { if (!readBlock()) { return false; diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java index f3c77007a..b4d1773c4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java @@ -30,6 +30,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -38,6 +39,7 @@ import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader { @@ -70,7 +72,18 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer protected Map nextRecord = new ConcurrentHashMap<>(); - protected boolean readRecord(Map record) throws IOException { + /** + * It is still internal method and should be used with care. + * Usually this method is called to read next record into internal object and affects hasNext() method. + * So after calling this one: + * - hasNext(), next() should not be called + * - stream should be read with readRecord() method fully + * + * @param record + * @return + * @throws IOException + */ + public boolean readRecord(Map record) throws IOException { boolean firstColumn = true; for (ClickHouseColumn column : getSchema().getColumns()) { try { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryReaderBackedRecord.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryReaderBackedRecord.java index 97a114f82..9ee312154 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryReaderBackedRecord.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryReaderBackedRecord.java @@ -346,4 +346,14 @@ public LocalDateTime getLocalDateTime(String colName) { public LocalDateTime getLocalDateTime(int index) { return reader.getLocalDateTime(index); } + + @Override + public Object getObject(String colName) { + return reader.readValue(colName); + } + + @Override + public Object getObject(int index) { + return reader.readValue(index); + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/MapBackedRecord.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/MapBackedRecord.java index d943e58e4..cd6f8809f 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/MapBackedRecord.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/MapBackedRecord.java @@ -480,4 +480,14 @@ public LocalDateTime getLocalDateTime(String colName) { public LocalDateTime getLocalDateTime(int index) { return readValue(index); } + + @Override + public Object getObject(String colName) { + return readValue(colName); + } + + @Override + public Object getObject(int index) { + return readValue(index); + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/GenericRecord.java b/client-v2/src/main/java/com/clickhouse/client/api/query/GenericRecord.java index 30c750fd6..bc8f6e25e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/GenericRecord.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/GenericRecord.java @@ -19,7 +19,6 @@ public interface GenericRecord { - /** * Reads column with name `colName` as a string. * @@ -486,4 +485,8 @@ public interface GenericRecord { LocalDateTime getLocalDateTime(String colName); LocalDateTime getLocalDateTime(int index); + + Object getObject(String colName); + + Object getObject(int index); } diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java index bb356e841..9c62cf672 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java @@ -14,12 +14,12 @@ import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.DataTypeUtils; import com.clickhouse.client.api.ServerException; -import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.data_formats.NativeFormatReader; import com.clickhouse.client.api.data_formats.RowBinaryFormatReader; import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader; import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader; +import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.insert.InsertSettings; import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.client.api.metrics.ClientMetrics; @@ -52,19 +52,17 @@ import java.math.BigInteger; import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.OffsetDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -77,6 +75,7 @@ import java.util.function.Supplier; import java.util.stream.BaseStream; import java.util.stream.IntStream; +import java.util.stream.Collectors; public class QueryTests extends BaseIntegrationTest { @@ -150,16 +149,19 @@ public void testSimpleQueryWithTSV() { @Test(groups = {"integration"}) public void testReadRecords() throws Exception { - prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10); + List> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10); Records records = client.queryRecords("SELECT * FROM " + DATASET_TABLE).get(3, TimeUnit.SECONDS); Assert.assertEquals(records.getResultRows(), 10, "Unexpected number of rows"); + + Iterator> dataIterator = dataset.iterator(); for (GenericRecord record : records) { - System.out.println(record.getLong(1)); // UInt32 column col1 - System.out.println(record.getInteger(2)); // Int32 column col2 - System.out.println(record.getString(3)); // string column col3 - System.out.println(record.getLong(4)); // Int64 column col4 - System.out.println(record.getString(5)); // string column col5 + Map dsRecords = dataIterator.next(); + Assert.assertEquals(record.getLong("col1"), dsRecords.get("col1")); + Assert.assertEquals(record.getInteger("col2"), dsRecords.get("col2")); + Assert.assertEquals(record.getString("col3"), dsRecords.get("col3")); + Assert.assertEquals(record.getLong("col4"), dsRecords.get("col4")); + Assert.assertEquals(record.getString("col5"), dsRecords.get("col5")); } } @@ -179,6 +181,33 @@ public void testBigUnsignedInt() throws Exception { Assert.assertEquals(firstRecord.getBigInteger("i256"), expected256); } + @Test(groups = {"integration"}) + public void testReadRecordsWithStreamAPI() throws Exception { + final int tables = 10; + + Set expectedTableNames = new HashSet<>(); + for (int i = 0; i < tables; i++) { + final String tableName = "a_" + i; + expectedTableNames.add(tableName); + client.execute("DROP TABLE IF EXISTS default." + tableName); + client.execute("CREATE TABLE " + tableName +" (x UInt32) ENGINE = Memory"); + } + + Records records = client.queryRecords("SHOW TABLES").get(3, TimeUnit.SECONDS); + + HashSet tableNames = new HashSet<>(); + records.forEach(r -> { + tableNames.add(r.getString(1)); + }); + Assert.assertTrue(tableNames.containsAll(expectedTableNames)); + + Assert.expectThrows(IllegalStateException.class, () -> { + records.forEach(r -> { + System.out.println(r); + }); + }); + } + @Test(groups = {"integration"}) public void testReadRecordsGetFirstRecord() throws Exception { prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10); @@ -200,12 +229,20 @@ public void testReadRecordsNoResult() throws Exception { @Test(groups = {"integration"}) public void testQueryAll() throws Exception { - testQueryAll(10); - } - public void testQueryAll(int numberOfRecords) throws Exception { - prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, numberOfRecords); - GenericRecord hostnameRecord = client.queryAll("SELECT hostname()").stream().findFirst().get(); - Assert.assertNotNull(hostnameRecord); + List> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10); + List records = client.queryAll("SELECT * FROM " + DATASET_TABLE + " LIMIT " + dataset.size()); + Assert.assertFalse(records.isEmpty()); + + for (String colDefinition : DATASET_COLUMNS) { + // result values + String colName = colDefinition.split(" ")[0]; + List colValues = records.stream().map(r -> r.getObject(colName)).collect(Collectors.toList()); + Assert.assertEquals(colValues.size(), dataset.size()); + + // dataset values + List dataValue = dataset.stream().map(d -> d.get(colName)).collect(Collectors.toList()); + Assert.assertEquals(colValues, dataValue, "Failed for column " + colName); + } } @Test(groups = {"integration"}) @@ -223,6 +260,24 @@ public void testQueryAllNoResult() throws Exception { Assert.assertTrue(records.isEmpty()); } + @Test + public void testQueryAllTableNames() { + final int tables = 10; + Set expectedTableNames = new HashSet<>(); + for (int i = 0; i < tables; i++) { + final String tableName = "a_" + i; + expectedTableNames.add(tableName); + client.execute("DROP TABLE IF EXISTS default." + tableName); + client.execute("CREATE TABLE " + tableName +" (x UInt32) ENGINE = Memory"); + } + + List records = client.queryAll("SHOW TABLES"); + Assert.assertTrue(records.size() >= tables); + + Set tableNames = records.stream().map(r -> r.getString(1)).collect(Collectors.toSet()); + Assert.assertTrue(tableNames.containsAll(expectedTableNames)); + } + @Test(groups = {"integration"}) public void testQueryJSON() throws ExecutionException, InterruptedException { Map datasetRecord = prepareSimpleDataSet(); @@ -975,7 +1030,6 @@ public void testDataTypes(List columns, List> valueGene colIndex++; try { verifier.accept(reader); - System.out.println("Verified " + colIndex); } catch (Exception e) { Assert.fail("Failed to verify " + columns.get(colIndex), e); } @@ -1000,8 +1054,6 @@ public void testQueryMetrics() throws Exception { // Stats should be available after the query is done OperationMetrics metrics = response.getMetrics(); - System.out.println("Server read rows: " + metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong()); - System.out.println("Client stats: " + metrics.getMetric(ClientMetrics.OP_DURATION).getLong()); Assert.assertEquals(metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong(), 10); // 10 rows in the table Assert.assertEquals(metrics.getMetric(ServerMetrics.RESULT_ROWS).getLong(), 3); @@ -1018,8 +1070,6 @@ public void testQueryMetrics() throws Exception { response = client.query(insertStmtBuilder.toString(), settings).get(); metrics = response.getMetrics(); - System.out.println("Server read rows: " + metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong()); - System.out.println("Client stats: " + metrics.getMetric(ClientMetrics.OP_DURATION).getLong()); Assert.assertEquals(metrics.getMetric(ServerMetrics.NUM_ROWS_READ).getLong(), rowsToInsert); // 10 rows in the table Assert.assertEquals(metrics.getMetric(ServerMetrics.RESULT_ROWS).getLong(), rowsToInsert); @@ -1087,7 +1137,6 @@ private List> prepareDataSet(String table, List colu insertStmtBuilder.append("), "); data.add(values); } - System.out.println("Insert statement: " + insertStmtBuilder); request = client.write(getServer(ClickHouseProtocol.HTTP)) .query(insertStmtBuilder.toString()); try (ClickHouseResponse response = request.executeAndWait()) {} @@ -1232,9 +1281,7 @@ public void testServerTimeZoneFromHeader() { reader.next(); LocalDateTime serverTime = reader.getLocalDateTime(1); - System.out.println("Server time: " + serverTime); LocalDateTime serverUtcTime = reader.getLocalDateTime(2); - System.out.println("Server UTC time: " + serverUtcTime); ZonedDateTime serverTimeZ = serverTime.atZone(ZoneId.of(requestTimeZone)); ZonedDateTime serverUtcTimeZ = serverUtcTime.atZone(ZoneId.of("UTC")); @@ -1265,11 +1312,8 @@ public void testClientUseOwnTimeZone() { reader.next(); LocalDateTime serverTime = reader.getLocalDateTime(1); // in "America/Los_Angeles" - System.out.println("Server time: " + serverTime); LocalDateTime serverUtcTime = reader.getLocalDateTime(2); - System.out.println("Server UTC time: " + serverUtcTime); ZonedDateTime serverLisbonTime = reader.getZonedDateTime(3); // in "Europe/Lisbon" - System.out.println("Server Lisbon time: " + serverLisbonTime); ZonedDateTime serverTimeZ = serverTime.atZone(ZoneId.of("America/Los_Angeles")); ZonedDateTime serverUtcTimeZ = serverUtcTime.atZone(ZoneId.of("UTC"));