diff --git a/CHANGELOG.md b/CHANGELOG.md index da314025340..edbc6e6d958 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 6.5.0 [unreleased] +### Features +1. [#366](https://github.com/influxdata/influxdb-client-java/pull/356): Added an endpoint to query with InfluxQL (v1) for more info see [README.md](./client). + ### Dependencies Update dependencies: diff --git a/README.md b/README.md index 6e5e6b943e8..c59f1ec8ffe 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ This section contains links to the client library documentation. - InfluxDB 2.x client - Querying data using the Flux language + - Querying data using the InfluxQL - Writing data using - [Line Protocol](https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/) - [Data Point](https://github.com/influxdata/influxdb-client-java/blob/master/client/src/main/java/org/influxdata/client/write/Point.java#L46) diff --git a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java index 2fba79b6095..dde96fc10be 100644 --- a/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java +++ b/client-core/src/main/java/com/influxdb/internal/AbstractQueryApi.java @@ -71,6 +71,7 @@ public abstract class AbstractQueryApi extends AbstractRestClient { }; protected static final String DEFAULT_DIALECT; + static { Map dialect = new HashMap<>(); dialect.put("header", true); @@ -107,7 +108,7 @@ protected RequestBody createBody(@Nullable final String dialect, @Nonnull final if (dialect != null && !dialect.isEmpty()) { JsonElement dialectJson = new Gson().fromJson(dialect, JsonElement.class); - json.add("dialect", dialectJson); + json.add("dialect", dialectJson); } return createBody(json.toString()); @@ -156,11 +157,11 @@ protected RawIterator queryRawIterator(@Nonnull final Call queryCa return new RawIterator(queryCall, ERROR_CONSUMER); } - private void query(@Nonnull final Call query, - @Nonnull final BiConsumer consumer, - @Nonnull final Consumer onError, - @Nonnull final Runnable onComplete, - @Nonnull final Boolean asynchronously) { + protected void query(@Nonnull final Call query, + @Nonnull final BiConsumer consumer, + @Nonnull final Consumer onError, + @Nonnull final Runnable onComplete, + @Nonnull final Boolean asynchronously) { Arguments.checkNotNull(query, "query"); Arguments.checkNotNull(consumer, "consumer"); @@ -262,7 +263,7 @@ private void parseFluxResponseToLines(@Nonnull final Consumer onResponse } } - private class DefaultCancellable implements Cancellable { + private static class DefaultCancellable implements Cancellable { private volatile boolean wasCancelled = false; @@ -394,4 +395,4 @@ record = fluxRecordOrTable.record; return record != null; } } -} \ No newline at end of file +} diff --git a/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java new file mode 100644 index 00000000000..a0e37f810ac --- /dev/null +++ b/client-core/src/main/java/com/influxdb/query/InfluxQLQueryResult.java @@ -0,0 +1,196 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.query; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.influxdb.utils.Arguments; + +/** + * This class represents the result of an InfluxQL query. + */ +public final class InfluxQLQueryResult { + + private final List results; + + public InfluxQLQueryResult(@Nonnull final List results) { + Arguments.checkNotNull(results, "results"); + + this.results = results; + } + + /** + * @return the results + */ + @Nonnull + public List getResults() { + return this.results; + } + + /** + * Represents one result of an InfluxQL query. + */ + public static final class Result { + private final List series; + + private final int index; + + public Result(final int index, @Nonnull final List series) { + Arguments.checkNotNull(series, "series"); + + this.index = index; + this.series = series; + } + + /** + * @return the index of the result + */ + public int getIndex() { + return index; + } + + /** + * @return the series + */ + @Nonnull + public List getSeries() { + return this.series; + } + + } + + /** + * Represents one series within the {@link Result} of an InfluxQL query. + */ + public static final class Series { + @Nonnull + private final Map columns; + + @Nonnull + private final String name; + + private final List values; + + public Series(final @Nonnull String name, final @Nonnull Map columns) { + Arguments.checkNotNull(name, "name"); + Arguments.checkNotNull(columns, "columns"); + + this.name = name; + this.columns = columns; + this.values = new ArrayList<>(); + } + + /** + * @return the name + */ + @Nonnull + public String getName() { + return this.name; + } + + /** + * @return the columns + */ + @Nonnull + public Map getColumns() { + return this.columns; + } + + + /** + * @return the values + */ + @Nonnull + public List getValues() { + return this.values; + } + + public void addRecord(@Nonnull final Record record) { + Arguments.checkNotNull(record, "record"); + + this.values.add(record); + } + + /** + * A value extractor is used to convert the string value returned by query into a custom type. + */ + @FunctionalInterface + public interface ValueExtractor { + + /** + * The value extractor is called for each resulting column to convert the string value returned by query + * into a custom type. + * + * @param columnName the name of the column + * @param rawValue the string value returned from the query + * @param resultIndex the index of the result + * @param seriesName the name of the series + * @return the converted string value + */ + @Nullable + Object extractValue( + @Nonnull String columnName, + @Nonnull String rawValue, + int resultIndex, + @Nonnull String seriesName); + } + + /** + * Represents one data record within a {@link Series} of an InfluxQL query. + */ + public final class Record { + private final Object[] values; + + public Record(final Object[] values) { + this.values = values; + } + + /** + * Get value by key. + * + * @param key of value in CSV response + * @return value + */ + @Nullable + public Object getValueByKey(@Nonnull final String key) { + + Arguments.checkNonEmpty(key, "key"); + + Integer index = columns.get(key); + if (index == null) { + return null; + } + return values[index]; + } + + public Object[] getValues() { + return values; + } + } + + } + +} diff --git a/client/README.md b/client/README.md index 83fc61394ab..86519e0b99c 100644 --- a/client/README.md +++ b/client/README.md @@ -8,6 +8,7 @@ The reference Java client that allows query, write and management (bucket, organ - [Querying data using Flux language](#queries) - [Parameterized Queries](#parameterized-queries) +- [Querying data using InfluxQL](#influxql-queries) - [Writing data using](#writes) - [Line Protocol](#by-lineprotocol) - [Data Point](#by-data-point) @@ -411,6 +412,73 @@ public class ParameterizedQuery { } ``` +### InfluxQL Queries + +The `InfluxQL` can be used with `/query compatibility` endpoint which uses the **database** and **retention policy** specified in the query request to map the request to an InfluxDB bucket. +For more information, see: . + +- [/query 1.x compatibility API](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/query/) +- [Database and retention policy mapping](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/dbrp/) + +This is an example of how to use this library to run a query with influxQL: + +```java +package example; + +import java.math.BigDecimal; +import java.time.Instant; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.InfluxQLQueryApi; +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.query.InfluxQLQueryResult; + +public class InfluxQLExample { + + private static char[] token = "my-token".toCharArray(); + private static String org = "my-org"; + + private static String database = "my-org"; + + public static void main(final String[] args) { + + try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org)) { + + // + // Query data + // + String influxQL = "SELECT FIRST(\"free\") FROM \"influxql\""; + + InfluxQLQueryApi queryApi = influxDBClient.getInfluxQLQueryApi(); + + // send request + InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(influxQL, database).setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS), + (columnName, rawValue, resultIndex, seriesName) -> { + // convert columns + switch (columnName) { + case "time": + return Instant.ofEpochSecond(Long.parseLong(rawValue)); + case "first": + return new BigDecimal(rawValue); + default: + throw new IllegalArgumentException("unexpected column " + columnName); + } + }); + + for (InfluxQLQueryResult.Result resultResult : result.getResults()) { + for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { + for (InfluxQLQueryResult.Series.Record record : series.getValues()) { + System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first")); + } + } + } + + } + } +} +``` + ## Writes The client offers two types of API to ingesting data: diff --git a/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java new file mode 100644 index 00000000000..c51257561c8 --- /dev/null +++ b/client/src/generated/java/com/influxdb/client/service/InfluxQLQueryService.java @@ -0,0 +1,28 @@ +package com.influxdb.client.service; + +import javax.annotation.Nonnull; + +import okhttp3.ResponseBody; +import retrofit2.Call; +import retrofit2.http.*; + +public interface InfluxQLQueryService { + + /** + * @param db Bucket to query (required) + * @param query (required) + * @param retentionPolicy Retention policy name (optional) + * @param zapTraceSpan OpenTracing span context (optional) + * @return response in csv format + */ + @Headers({ "Accept:application/csv", "Content-Type:application/vnd.influxql" }) + @POST("query") + Call query( + @Body String query, + @Nonnull @Query("db") String db, + @Query("rp") String retentionPolicy, + @Query("epoch") String epoch, + @Header("Zap-Trace-Span") String zapTraceSpan + ); + +} diff --git a/client/src/main/java/com/influxdb/client/InfluxDBClient.java b/client/src/main/java/com/influxdb/client/InfluxDBClient.java index 8a5687e71a4..21bc9305c71 100644 --- a/client/src/main/java/com/influxdb/client/InfluxDBClient.java +++ b/client/src/main/java/com/influxdb/client/InfluxDBClient.java @@ -31,6 +31,7 @@ import com.influxdb.client.domain.Check; import com.influxdb.client.domain.Dashboard; import com.influxdb.client.domain.HealthCheck; +import com.influxdb.client.domain.InfluxQLQuery; import com.influxdb.client.domain.Label; import com.influxdb.client.domain.NotificationEndpoint; import com.influxdb.client.domain.NotificationRules; @@ -252,6 +253,31 @@ public interface InfluxDBClient extends AutoCloseable { @Nonnull InvokableScriptsApi getInvokableScriptsApi(); + /** + * The InfluxQL can be used with /query compatibility endpoint which uses the + * {@link InfluxQLQuery#getDatabase() database} and + * {@link InfluxQLQuery#getRetentionPolicy() retention policy} specified in the query request to + * map the request to an InfluxDB bucket. + *
+ * For more information, see: + * + * + * @return InfluxQLQuery API instance + */ + @Nonnull + InfluxQLQueryApi getInfluxQLQueryApi(); + /** * Create an implementation of the API endpoints defined by the {@code service} interface. *

@@ -361,4 +387,4 @@ public interface InfluxDBClient extends AutoCloseable { */ @Override void close(); -} \ No newline at end of file +} diff --git a/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java new file mode 100644 index 00000000000..8bc3832e162 --- /dev/null +++ b/client/src/main/java/com/influxdb/client/InfluxQLQueryApi.java @@ -0,0 +1,95 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.query.InfluxQLQueryResult; + +/** + * The InfluxQL can be used with /query compatibility endpoint which uses the + * {@link InfluxQLQuery#getDatabase() database} and + * {@link InfluxQLQuery#getRetentionPolicy() retention policy} specified in the query request to + * map the request to an InfluxDB bucket. + *
+ * For more information, see: + *

+ **/ +@ThreadSafe +public interface InfluxQLQueryApi { + + /** + * Executes an InfluxQL query against the legacy endpoint. + * + * @param influxQlQuery the query + * @return the result + */ + @Nonnull + InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQlQuery); + + /** + * Executes an InfluxQL query against the legacy endpoint. + * The value extractor is called for each resulting column to convert the string value returned by query into a + * custom type. + * + *

Example

+ *
+     * InfluxQLQueryResult result = influxQLQueryApi.query(
+     *             new InfluxQLQuery("SELECT FIRST(\"free\") FROM \"influxql\"", DATABASE_NAME)
+     *                     .setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS),
+     *             (columnName, rawValue, resultIndex, seriesName) -> {
+     *                 switch (columnName) {
+     *                     case "time":
+     *                         return Instant.ofEpochSecond(Long.parseLong(rawValue));
+     *                     case "first":
+     *                         return new BigDecimal(rawValue);
+     *                     default:
+     *                         throw new IllegalArgumentException("unexpected column " + columnName);
+     *                 }
+     *             }
+     *     );
+     * 
+ * + * @param influxQlQuery the query + * @param valueExtractor a callback, to convert column values + * @return the result + */ + @Nonnull + InfluxQLQueryResult query( + @Nonnull InfluxQLQuery influxQlQuery, + @Nullable InfluxQLQueryResult.Series.ValueExtractor valueExtractor + ); +} diff --git a/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java new file mode 100644 index 00000000000..80d8673606c --- /dev/null +++ b/client/src/main/java/com/influxdb/client/domain/InfluxQLQuery.java @@ -0,0 +1,146 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.domain; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * A InfluxQL query. + */ +public class InfluxQLQuery { + private final String command; + private final String database; + private String retentionPolicy; + private InfluxQLPrecision precision; + + /** + * @param command the InfluxQL command to execute + * @param database the database to run this query against + */ + public InfluxQLQuery(@Nonnull final String command, @Nonnull final String database) { + this.command = command; + this.database = database; + } + + /** + * @return the InfluxQL command to execute + */ + @Nonnull + public String getCommand() { + return command; + } + + /** + * @return the database to run this query against + */ + @Nonnull + public String getDatabase() { + return database; + } + + /** + * @return the retention policy to use + */ + @Nullable + public String getRetentionPolicy() { + return retentionPolicy; + } + + /** + * @param retentionPolicy the retention policy to use + * @return this + */ + @Nonnull + public InfluxQLQuery setRetentionPolicy(@Nullable final String retentionPolicy) { + this.retentionPolicy = retentionPolicy; + return this; + } + + /** + * @return The precision used for the timestamps returned by the query + */ + @Nullable + public InfluxQLPrecision getPrecision() { + return precision; + } + + /** + * + * @param precision The precision used for the timestamps returned by the query + * @return this + */ + @Nonnull + public InfluxQLQuery setPrecision(@Nullable final InfluxQLPrecision precision) { + this.precision = precision; + return this; + } + + /** + * The precision used for the timestamps returned by InfluxQL queries. + */ + public enum InfluxQLPrecision { + HOURS("h"), + MINUTES("m"), + SECONDS("s"), + MILLISECONDS("ms"), + MICROSECONDS("u"), + NANOSECONDS("n"); + + private final String symbol; + + InfluxQLPrecision(final String symbol) { + this.symbol = symbol; + } + + /** + * @return the InfluxQL specific symbol + */ + @Nonnull + public String getSymbol() { + return symbol; + } + + @Nonnull + public static InfluxQLPrecision toTimePrecision(final TimeUnit t) { + switch (t) { + case HOURS: + return HOURS; + case MINUTES: + return MINUTES; + case SECONDS: + return SECONDS; + case MILLISECONDS: + return MILLISECONDS; + case MICROSECONDS: + return MICROSECONDS; + case NANOSECONDS: + return NANOSECONDS; + default: + throw new IllegalArgumentException("time precision must be one of:" + + Arrays.toString(InfluxQLPrecision.values())); + } + } + } +} diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxDBClientImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxDBClientImpl.java index 386bde83805..016be470c03 100644 --- a/client/src/main/java/com/influxdb/client/internal/InfluxDBClientImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/InfluxDBClientImpl.java @@ -34,6 +34,7 @@ import com.influxdb.client.DeleteApi; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientOptions; +import com.influxdb.client.InfluxQLQueryApi; import com.influxdb.client.InvokableScriptsApi; import com.influxdb.client.LabelsApi; import com.influxdb.client.NotificationEndpointsApi; @@ -59,6 +60,7 @@ import com.influxdb.client.service.ChecksService; import com.influxdb.client.service.DashboardsService; import com.influxdb.client.service.DeleteService; +import com.influxdb.client.service.InfluxQLQueryService; import com.influxdb.client.service.InvokableScriptsService; import com.influxdb.client.service.LabelsService; import com.influxdb.client.service.NotificationEndpointsService; @@ -247,6 +249,12 @@ public InvokableScriptsApi getInvokableScriptsApi() { return new InvokableScriptsApiImpl(retrofit.create(InvokableScriptsService.class)); } + @Nonnull + @Override + public InfluxQLQueryApi getInfluxQLQueryApi() { + return new InfluxQLQueryApiImpl(getService(InfluxQLQueryService.class)); + } + @Nonnull @Override public S getService(@Nonnull final Class service) { @@ -346,4 +354,4 @@ public boolean isGzipEnabled() { return this.gzipInterceptor.isEnabledGzip(); } -} \ No newline at end of file +} diff --git a/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java new file mode 100644 index 00000000000..1ec5839e324 --- /dev/null +++ b/client/src/main/java/com/influxdb/client/internal/InfluxQLQueryApiImpl.java @@ -0,0 +1,177 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.internal; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.influxdb.Cancellable; +import com.influxdb.client.InfluxQLQueryApi; +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.client.service.InfluxQLQueryService; +import com.influxdb.internal.AbstractQueryApi; +import com.influxdb.query.InfluxQLQueryResult; +import com.influxdb.utils.Arguments; + +import okhttp3.ResponseBody; +import okio.BufferedSource; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import retrofit2.Call; + +public class InfluxQLQueryApiImpl extends AbstractQueryApi implements InfluxQLQueryApi { + + private final InfluxQLQueryService service; + + public InfluxQLQueryApiImpl(@Nonnull final InfluxQLQueryService service) { + + Arguments.checkNotNull(service, "service"); + + this.service = service; + } + + @Nonnull + @Override + public InfluxQLQueryResult query(@Nonnull final InfluxQLQuery influxQlQuery) { + return query(influxQlQuery, null); + } + + @Nonnull + @Override + public InfluxQLQueryResult query( + @Nonnull final InfluxQLQuery influxQlQuery, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor + ) { + Call call = service.query( + influxQlQuery.getCommand(), + influxQlQuery.getDatabase(), + influxQlQuery.getRetentionPolicy(), + influxQlQuery.getPrecision() != null ? influxQlQuery.getPrecision().getSymbol() : null, + null); + + AtomicReference atomicReference = new AtomicReference<>(); + BiConsumer consumer = (cancellable, bufferedSource) -> { + try { + InfluxQLQueryResult result = parseResponse(bufferedSource, cancellable, valueExtractor); + atomicReference.set(result); + } catch (IOException e) { + ERROR_CONSUMER.accept(e); + } + }; + query(call, consumer, ERROR_CONSUMER, EMPTY_ACTION, false); + return atomicReference.get(); + } + + private InfluxQLQueryResult parseResponse( + @Nonnull final BufferedSource bufferedSource, + @Nonnull final Cancellable cancellable, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor) throws IOException { + + Arguments.checkNotNull(bufferedSource, "bufferedSource"); + + try (Reader reader = new InputStreamReader(bufferedSource.inputStream(), StandardCharsets.UTF_8)) { + return readInfluxQLResult(reader, cancellable, valueExtractor); + } + } + + static InfluxQLQueryResult readInfluxQLResult( + @Nonnull final Reader reader, + @Nonnull final Cancellable cancellable, + @Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor + ) throws IOException { + List results = new ArrayList<>(); + + Map series = null; + Map headerCols = null; + int nameCol = 0; + // The first 3 columns are static (`name`, `tags` and `time`) and got skipped. + // All other columns are dynamically parsed + int dynamicColumnsStartIndex = 2; + + try (CSVParser parser = new CSVParser(reader, CSVFormat.DEFAULT.builder().setIgnoreEmptyLines(false).build())) { + for (CSVRecord csvRecord : parser) { + if (cancellable.isCancelled()) { + break; + } + int resultIndex = results.size(); + if (csvRecord.size() == 1 || csvRecord.get(0).equals("")) { + if (series != null) { + InfluxQLQueryResult.Result result = new InfluxQLQueryResult.Result( + resultIndex, + new ArrayList<>(series.values()) + ); + results.add(result); + } + series = null; + continue; + } + + if (series == null) { + + List header = csvRecord.toList(); + headerCols = new LinkedHashMap<>(); + for (int col = dynamicColumnsStartIndex; col < header.size(); col++) { + String colName = header.get(col); + headerCols.put(colName, col - dynamicColumnsStartIndex); + } + series = new LinkedHashMap<>(); + + } else { + String name = csvRecord.get(nameCol); + Map finalHeaderCols = headerCols; + InfluxQLQueryResult.Series serie = series.computeIfAbsent( + name, + n -> new InfluxQLQueryResult.Series(n, finalHeaderCols) + ); + Object[] values = headerCols.entrySet().stream().map(entry -> { + String value = csvRecord.get(entry.getValue() + dynamicColumnsStartIndex); + if (valueExtractor != null) { + return valueExtractor.extractValue(entry.getKey(), value, resultIndex, serie.getName()); + } + return value; + }).toArray(); + InfluxQLQueryResult.Series.Record record = serie.new Record(values); + serie.addRecord(record); + } + } + } + if (series != null) { + InfluxQLQueryResult.Result result = new InfluxQLQueryResult.Result( + results.size(), + new ArrayList<>(series.values()) + ); + results.add(result); + } + return new InfluxQLQueryResult(results); + } +} diff --git a/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java new file mode 100644 index 00000000000..be68ec44cbe --- /dev/null +++ b/client/src/test/java/com/influxdb/client/ITInfluxQLQueryApi.java @@ -0,0 +1,147 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client; + +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Instant; + +import com.influxdb.client.domain.Bucket; +import com.influxdb.client.domain.DBRPCreate; +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.service.DbrPsService; +import com.influxdb.client.write.Point; +import com.influxdb.query.InfluxQLQueryResult; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import static org.assertj.core.api.InstanceOfAssertFactories.*; + +@RunWith(JUnitPlatform.class) +class ITInfluxQLQueryApi extends AbstractITClientTest { + + private static final String DATABASE_NAME = "my-database"; + private InfluxQLQueryApi influxQLQueryApi; + + + @BeforeEach + void setUp() throws IOException { + influxQLQueryApi = influxDBClient.getInfluxQLQueryApi(); + DbrPsService dbrPsService = influxDBClient.getService(DbrPsService.class); + + Bucket bucket = influxDBClient.getBucketsApi().findBucketByName("my-bucket"); + Assertions.assertThat(bucket).isNotNull(); + + dbrPsService.postDBRP(new DBRPCreate() + .bucketID(bucket.getId()) + .orgID(bucket.getOrgID()) + .database(DATABASE_NAME) + ._default(true) + .retentionPolicy("autogen") + , null) + .execute(); + influxDBClient.getWriteApiBlocking() + .writePoint(bucket.getId(), bucket.getOrgID(), new Point("influxql") + .time(1655900000, WritePrecision.S) + .addField("free", 10) + .addTag("host", "A") + .addTag("region", "west")); + } + + @Test + void testShowDatabases() { + InfluxQLQueryResult result = influxQLQueryApi.query(new InfluxQLQuery("SHOW DATABASES", DATABASE_NAME)); + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .extracting(record -> record.getValueByKey("name")) + .isEqualTo(DATABASE_NAME); + } + + + @Test + void testQueryData() { + InfluxQLQueryResult result = influxQLQueryApi.query(new InfluxQLQuery("SELECT FIRST(\"free\") FROM \"influxql\"", DATABASE_NAME)); + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("10"); + }); + } + + @Test + void testQueryDataWithConversion() { + InfluxQLQueryResult result = influxQLQueryApi.query( + new InfluxQLQuery("SELECT FIRST(\"free\") FROM \"influxql\"", DATABASE_NAME) + .setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS), + (columnName, rawValue, resultIndex, seriesName) -> { + switch (columnName) { + case "time": + return Instant.ofEpochSecond(Long.parseLong(rawValue)); + case "first": + return new BigDecimal(rawValue); + default: + throw new IllegalArgumentException("unexpected column " + columnName); + } + } + ); + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + Assertions.assertThat(record.getValueByKey("time")).asInstanceOf(INSTANT).isEqualTo("2022-06-22T12:13:20Z"); + Assertions.assertThat(record.getValueByKey("first")).asInstanceOf(BIG_DECIMAL).isEqualTo("10"); + }); + } + + @Test + void testSelectAll() { + InfluxQLQueryResult result = influxQLQueryApi.query(new InfluxQLQuery("SELECT * FROM \"influxql\"", DATABASE_NAME)); + assertSingleSeriesRecords(result) + .hasSize(1) + .first() + .satisfies(record -> { + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1655900000000000000"); + Assertions.assertThat(record.getValueByKey("free")).isEqualTo("10"); + Assertions.assertThat(record.getValueByKey("host")).isEqualTo("A"); + Assertions.assertThat(record.getValueByKey("region")).isEqualTo("west"); + }); + } + + private ListAssert assertSingleSeriesRecords(InfluxQLQueryResult result) { + return Assertions.assertThat(result) + .extracting(InfluxQLQueryResult::getResults, list(InfluxQLQueryResult.Result.class)) + .hasSize(1) + .first() + .extracting(InfluxQLQueryResult.Result::getSeries, list(InfluxQLQueryResult.Series.class)) + .hasSize(1) + .first() + .extracting(InfluxQLQueryResult.Series::getValues, list(InfluxQLQueryResult.Series.Record.class)); + } +} diff --git a/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java new file mode 100644 index 00000000000..f88fbc32978 --- /dev/null +++ b/client/src/test/java/com/influxdb/client/internal/InfluxQLQueryApiImplTest.java @@ -0,0 +1,131 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.internal; + +import java.io.IOException; +import java.io.StringReader; +import java.time.Instant; +import java.util.List; + +import com.influxdb.Cancellable; +import com.influxdb.query.InfluxQLQueryResult; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +class InfluxQLQueryApiImplTest { + + private static final Cancellable NO_CANCELLING = new Cancellable() { + @Override + public void cancel() { + } + + @Override + public boolean isCancelled() { + return false; + } + }; + + @Test + void readInfluxQLResult() throws IOException { + InfluxQLQueryResult.Series.ValueExtractor extractValues = (columnName, rawValue, resultIndex, seriesName) -> { + if (resultIndex == 0 && seriesName.equals("data2")){ + switch (columnName){ + case "time": return Instant.ofEpochSecond(Long.parseLong(rawValue)); + case "first": return Double.valueOf(rawValue); + } + } + return rawValue; + }; + + StringReader reader = new StringReader("name,tags,time,first\n" + + "data1,,1483225200,1\n" + + "data2,,1483225200,2\n" + + "\n" + + "name,tags,time,first,text\n" + + "data,,1500000000,42,foo\n" + + "\n" + + "name,tags,name\n" + + "databases,,measurement-1\n" + + "databases,,measurement-2"); + + InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLResult(reader, NO_CANCELLING, extractValues); + + List results = result.getResults(); + Assertions.assertThat(results).hasSize(3); + Assertions.assertThat(results.get(0)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(2); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data1"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1483225200"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("1"); + }); + Assertions.assertThat(series.get(1)) + .satisfies(series2 -> { + Assertions.assertThat(series2.getName()).isEqualTo("data2"); + Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time", "first"); + Assertions.assertThat(series2.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series2.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo(Instant.ofEpochSecond(1483225200L)); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo(2.); + }); + }); + + Assertions.assertThat(results.get(1)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("data"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time", "first", "text"); + Assertions.assertThat(series1.getValues()).hasSize(1); + InfluxQLQueryResult.Series.Record record = series1.getValues().get(0); + Assertions.assertThat(record.getValueByKey("time")).isEqualTo("1500000000"); + Assertions.assertThat(record.getValueByKey("first")).isEqualTo("42"); + Assertions.assertThat(record.getValueByKey("text")).isEqualTo("foo"); + }); + }); + + Assertions.assertThat(results.get(2)) + .extracting(InfluxQLQueryResult.Result::getSeries) + .satisfies(series -> { + Assertions.assertThat(series).hasSize(1); + Assertions.assertThat(series.get(0)) + .satisfies(series1 -> { + Assertions.assertThat(series1.getName()).isEqualTo("databases"); + Assertions.assertThat(series1.getColumns()).containsOnlyKeys("name"); + Assertions.assertThat(series1.getValues()).hasSize(2); + + Assertions.assertThat( series1.getValues().get(0).getValueByKey("name")) + .isEqualTo("measurement-1"); + Assertions.assertThat( series1.getValues().get(1).getValueByKey("name")) + .isEqualTo("measurement-2"); + }); + }); + } +} diff --git a/examples/README.md b/examples/README.md index 04cbbd91672..4a544e30a20 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,6 +5,13 @@ This directory contains Java, Kotlin and Scala examples. ## Java - [WriteDataEverySecond.java](src/main/java/example/WriteDataEverySecond.java) - Write data every second - [ParameterizedQuery.java](src/main/java/example/ParameterizedQuery.java) - How to use Parameterized Queries +- [InfluxQLExample.java](src/main/java/example/InfluxQLExample.java) - How to use queries with the old influxQL dialect + + The `InfluxQL` can be used with `/query compatibility` endpoint which uses the **database** and **retention policy** + specified in the query request to map the request to an InfluxDB bucket. + For more information, see: + - [/query 1.x compatibility API](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/query/) + - [Database and retention policy mapping](https://docs.influxdata.com/influxdb/latest/reference/api/influxdb-1x/dbrp/) ### Others - [InvokableScripts.java](src/main/java/example/InvokableScripts.java) - How to use Invokable scripts Cloud API to create custom endpoints that query data @@ -29,4 +36,4 @@ This directory contains Java, Kotlin and Scala examples. - [ScalaQueryDSL.scala](src/main/java/example/ScalaQueryDSL.scala) - How to use the [FluxDSL](../flux-dsl) to query data ### Writes -- [ScalaWriteApi.scala](src/main/java/example/ScalaWriteApi.scala) - How to ingest data by `DataPoint`, `LineProtocol` or `POJO` \ No newline at end of file +- [ScalaWriteApi.scala](src/main/java/example/ScalaWriteApi.scala) - How to ingest data by `DataPoint`, `LineProtocol` or `POJO` diff --git a/examples/src/main/java/example/InfluxQLExample.java b/examples/src/main/java/example/InfluxQLExample.java new file mode 100644 index 00000000000..327c8143ed9 --- /dev/null +++ b/examples/src/main/java/example/InfluxQLExample.java @@ -0,0 +1,75 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package example; + +import java.math.BigDecimal; +import java.time.Instant; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.InfluxQLQueryApi; +import com.influxdb.client.domain.InfluxQLQuery; +import com.influxdb.query.InfluxQLQueryResult; + +public class InfluxQLExample { + + private static char[] token = "my-token".toCharArray(); + private static String org = "my-org"; + + private static String database = "my-org"; + + public static void main(final String[] args) { + + try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org)) { + + // + // Query data + // + String influxQL = "SELECT FIRST(\"free\") FROM \"influxql\""; + + InfluxQLQueryApi queryApi = influxDBClient.getInfluxQLQueryApi(); + + // send request + InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(influxQL, database).setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS), + (columnName, rawValue, resultIndex, seriesName) -> { + // convert columns + switch (columnName) { + case "time": + return Instant.ofEpochSecond(Long.parseLong(rawValue)); + case "first": + return new BigDecimal(rawValue); + default: + throw new IllegalArgumentException("unexpected column " + columnName); + } + }); + + for (InfluxQLQueryResult.Result resultResult : result.getResults()) { + for (InfluxQLQueryResult.Series series : resultResult.getSeries()) { + for (InfluxQLQueryResult.Series.Record record : series.getValues()) { + System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first")); + } + } + } + + } + } +}