Skip to content

Commit

Permalink
Merge pull request #85 from anskarl/feature/new_queries
Browse files Browse the repository at this point in the history
Adds support for Select, Scan and Search queries
  • Loading branch information
krisgeus authored Jan 18, 2020
2 parents c9f738d + 4854bb1 commit a4317b6
Show file tree
Hide file tree
Showing 16 changed files with 957 additions and 148 deletions.
119 changes: 110 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Currently the API is under heavy development, so changes might occur.

## Example queries:

Scruid provides three query constructors: `TopNQuery`, `GroupByQuery` and `TimeSeriesQuery` (see below for details). You can call the `execute` method ona query to send the query to Druid. This will return a `Future[DruidResponse]`. This response contains the [Circe](http://circe.io) JSON data without having it parsed to a specific case class yet. To interpret this JSON data you can run two methods on a `DruidResponse`:
Scruid provides query constructors for `TopNQuery`, `GroupByQuery`, `TimeSeriesQuery`, `ScanQuery`, `SelectQuery` and `SearchQuery` (see below for details). You can call the `execute` method on a query to send the query to Druid. This will return a `Future[DruidResponse]`. This response contains the [Circe](http://circe.io) JSON data without having it parsed to a specific case class yet. To interpret this JSON data you can run two methods on a `DruidResponse`:

- `.list[T](implicit decoder: Decoder[T]): List[T]` : This decodes the JSON to a list with items of type `T`.
- `.series[T](implicit decoder: Decoder[T]): Map[ZonedDateTime, T]` : This decodes the JSON to a timeseries map with the timestamp as key and `T` as value.
Expand Down Expand Up @@ -71,10 +71,55 @@ val response = TimeSeriesQuery(
val series: Future[Map[ZonedDateTime, TimeseriesCount]] = response.map(_.series[TimeseriesCount])
```

To get the timeseries data from this `Future[DruidRespones]` you can run `val series = result.series[TimeseriesCount]`.
### Select query

`TopNQuery`, `GroupByQuery` and `TimeSeriesQuery` can also configured using Druid [query context](https://druid.apache.org/docs/latest/querying/query-context.html),
such as `timeout`, `queryId` and `groupByStrategy`. All three types of query contain the argument `context` which
```scala
case class SelectResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])

val response = SelectQuery(
granularity = GranularityType.Hour
intervals = List("2011-06-01/2017-06-01")
pagingSpec = PagingSpec(10),
dimensions = List("channel", "cityName", "countryIsoCode", "user")
).execute()

val result: Future[List[SelectResult]] = response.map(_.list[SelectResult])
```

### Scan query

```scala
case class ScanResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])

val response = ScanQuery(
granularity = GranularityType.Hour
intervals = List("2011-06-01/2017-06-01")
dimensions = List("channel", "cityName", "countryIsoCode", "user"),
limit = 100
).execute()

val result: Future[List[ScanResult]] = response.map(_.list[ScanResult])
```

### Search query

Search query is a bit different, since it does not take type parameters as its results are of type `ing.wbaa.druid.DruidSearchResult`

```scala
val response = SearchQuery(
granularity = GranularityType.Hour,
intervals = List("2011-06-01/2017-06-01"),
query = ContainsInsensitive("GR"),
searchDimensions = List("countryIsoCode")
).execute()

val result = Future[List[DruidSearchResult]] = response.map(_.list)
```

## Query context

Queries can be configured using Druid [query context](https://druid.apache.org/docs/latest/querying/query-context.html),
such as `timeout`, `queryId` and `groupByStrategy`. All types of query contain the argument `context` which
associates query parameter with they corresponding values. The parameter names can also be accessed
by `ing.wbaa.druid.definitions.QueryContext` object. Consider, for example, a timeseries query with custom `query id`
and `priority`:
Expand Down Expand Up @@ -115,15 +160,68 @@ val response: Future[List[GroupByIsAnonymous]] = query.execute().map(_.list[Grou

For details and examples see the [DQL documentation](docs/dql.md).

## Print native Druid JSON representation

For all types of queries you can call the function `toDebugString`, in order to get the corresponding native Druid JSON
query representation.

For example the following:

```scala
import ing.wbaa.druid.dql.DSL._

val query: TopNQuery = DQL
.from("wikipedia")
.agg(count as "count")
.interval("2011-06-01/2017-06-01")
.topN(dimension = d"countryName", metric = "count", threshold = 5)
.build()

println(query.toDebugString)
```

will print to the standard output:

```json
{
"dimension" : {
"dimension" : "countryName",
"outputName" : "countryName",
"outputType" : null,
"type" : "default"
},
"threshold" : 5,
"metric" : "count",
"aggregations" : [
{
"name" : "count",
"type" : "count"
}
],
"intervals" : [
"2011-06-01/2017-06-01"
],
"granularity" : "all",
"filter" : null,
"postAggregations" : [
],
"context" : {

},
"queryType" : "topN",
"dataSource" : "wikipedia"
}
```

## Handling large payloads with Akka Streams

For queries with large payload of results (e.g., half a million of records), Scruid can transform the corresponding response into an [Akka Stream](https://doc.akka.io/docs/akka/2.5/stream/) Source.
The results can be processed, filtered and transformed using [Flows](https://doc.akka.io/docs/akka/2.5/stream/stream-flows-and-basics.html) and/or output to Sinks, as a continuous stream, without collecting the entire payload first.
To process the results with Akka Stream, you can call one of the following methods:

- `.stream`: gives a Source of `DruidResult`.
- `.streamAs[T](implicit decoder: Decoder[T])`: gives a Source where each JSON record is being decoded to the type of `T`.
- `.streamSeriesAs[T](implicit decoder: Decoder[T])`: gives a Source where each JSON record is being decoded to the type of `T` and it is accompanied by its corresponding timestamp.
- `.stream`: gives a Source of `DruidResult`.
- `.streamAs[T](implicit decoder: Decoder[T])`: gives a Source where each JSON record is being decoded to the type of `T`.
- `.streamSeriesAs[T](implicit decoder: Decoder[T])`: gives a Source where each JSON record is being decoded to the type of `T` and it is accompanied by its corresponding timestamp.

All the methods above can be applied to any timeseries, group-by or top-N query created either directly by using query constructors or by DQL.

Expand Down Expand Up @@ -170,7 +268,10 @@ druid = {
client-backend = "ing.wbaa.druid.client.DruidHttpClient"
client-backend = ${?DRUID_CLIENT_BACKEND}
datasource = "wikiticker"
scan-query-legacy-mode = false
scan-query-legacy-mode = ${?DRUID_SCAN_QUERY_LEGACY_MODE}
datasource = "wikipedia"
datasource = ${?DRUID_DATASOURCE}
response-parsing-timeout = 5 seconds
Expand All @@ -189,7 +290,7 @@ import scala.concurrent.duration._

implicit val druidConf = DruidConfig(
hosts = Seq("localhost:8082"),
datasource = "wikiticker",
datasource = "wikipedia",
responseParsingTimeout = 10.seconds
)

Expand Down
94 changes: 79 additions & 15 deletions docs/dql.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
Scruid provides a rich Scala API for building queries using the fluent pattern.

In order to use DQL, you have to import `ing.wbaa.druid.dql.DSL._` and thereafter build a query using the `DQL` query
builder. The type of the query can be time-series (default), group-by or top-n.
builder. The type of the query can be time-series, group-by, top-n, select, scan or search.

For all three types of queries you can define the following:
For all any type of queries you can define the following:

- The datasource name to perform the query, or defaults to the one that has been defined in the configuration.
- The granularity of the query, e.g., `Hour`, `Day`, `Week`, etc (default is `Week` for time-series and `All`
for top-n and group-by).
- The interval of the query, expressed as [ISO-8601 intervals](https://en.wikipedia.org/wiki/ISO_8601).
- Filter dimensions
- Aggregations and post-aggregations
- Query context properties
- Filters over dimensions.

Additionally, for time-series, group-by and top-n queries you can define aggregations and post-aggregations.

For example, consider the following fragment of a DQL query:

Expand All @@ -21,7 +23,7 @@ import ing.wbaa.druid.definitions.GranularityType
import ing.wbaa.druid.dql.DSL._

val query = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Hour)
.interval("2011-06-01/2017-06-01")
.where(d"countryName" === "Italy" or d"countryName" === "Greece")
Expand All @@ -35,7 +37,7 @@ temporal interval of the data expressed in ISO-8601, `where` defines which rows
computation for a query, `agg` defines functions that summarize data (e.g., count of rows) and `postAgg` defines
specifications of processing that should happen on aggregated values.

In the above example we are performing a query over the datasource `wikiticker`, using hourly granularity, for the
In the above example we are performing a query over the datasource `wikipedia`, using hourly granularity, for the
interval `2011-06-01` until `2017-06-01`. We are considering rows of data where the value of dimension `countryName`
is either `Italy` or `Greece`. Furthermore, we are interested in half counting the rows. To achieve that we define
the aggregation function `count` we name it as `agg_count` and thereafter we define a post-aggregation function named
Expand All @@ -45,7 +47,7 @@ The equivalent fragment of a Druid query expressed in JSON is given below:

```
{
"dataSource" : "wikiticker",
"dataSource" : "wikipedia",
"granularity" : "hour",
"intervals" : [ "2011-06-01/2017-06-01"],
"filter" : {
Expand Down Expand Up @@ -642,7 +644,7 @@ For example, the following query is a time-series that counts the number of rows
case class TimeseriesCount(ts_count: Long)

val query: TimeSeriesQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Hour)
.interval("2011-06-01/2017-06-01")
.agg(count as "ts_count")
Expand All @@ -659,7 +661,7 @@ The following query computes the Top-5 `countryName` with respect to the aggrega
case class PostAggregationAnonymous(countryName: Option[String], agg_count: Double, half_count: Double)

val query: TopNQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Week)
.interval("2011-06-01/2017-06-01")
.agg(count as "agg_count")
Expand All @@ -678,7 +680,7 @@ The following query performs group-by count over the dimension `isAnonymous`:
case class GroupByIsAnonymous(isAnonymous: String, count: Int)

val query: GroupByQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.agg(count as "count")
Expand All @@ -697,7 +699,7 @@ the aggregation `count`.
case class GroupByIsAnonymous(isAnonymous: String, country: Option[String], count: Int)

val query: GroupByQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.agg(count as "count")
Expand All @@ -714,7 +716,7 @@ We can avoid null values in `country` by filtering the dimension `countryName`:
case class GroupByIsAnonymous(isAnonymous: String, country: String, count: Int)

val query: GroupByQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.agg(count as "count")
Expand All @@ -732,7 +734,7 @@ We can also keep only those records that they are having count above 100 and bel
case class GroupByIsAnonymous(isAnonymous: String, country: String, count: Int)

val query: GroupByQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.agg(count as "count")
Expand All @@ -745,6 +747,68 @@ val query: GroupByQuery = DQL
val response: Future[List[GroupByIsAnonymous]] = query.execute().map(_.list[GroupByIsAnonymous])
```

#### Select query

The following query performs select over the dimensions `channel`, `cityName`, `countryIsoCode` and `user`:

```scala
case class SelectResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])

val query: SelectQuery = DQL
.select(threshold = 10)
.from("wikipedia")
.dimensions(d"channel", d"cityName", d"countryIsoCode", d"user")
.granularity(GranularityType.Hour)
.interval("2011-06-01/2017-06-01")
.build()

val response: Future[List[SelectResult]] = query.execute().map(_.list[SelectResult])
```
Select queries support pagination, in the example above the pagination threshold is set to 10 rows per block of
paginated results. The resulting response, however, is being flattened to a single list of results
(due to `_.list[SelectResult]`). The pagination can be controlled with the parameters of the official
[Druid documentation](https://druid.apache.org/docs/latest/querying/select-query.html#result-pagination).


#### Scan query

Similar to `SelectQuery`, the following query performs scan over the dimensions `channel`, `cityName`, `countryIsoCode`
and `user`:

```scala
case class ScanResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])

val query: ScanQuery = DQL
.scan()
.from("wikipedia")
.columns("channel", "cityName", "countryIsoCode", "user")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.build()
```

[Scan query](https://druid.apache.org/docs/latest/querying/scan-query.html) is more efficient than
[Select query](https://druid.apache.org/docs/latest/querying/select-query.html). The main difference is that it does
not support pagination, but is able to return a virtually unlimited number of results.

#### Search query

The following query performs case insensitive [search](https://druid.apache.org/docs/latest/querying/searchquery.html) over the dimensions `countryIsoCode`:

```scala
val query: SearchQuery = DQL
.search(ContainsInsensitive("GR"))
.from("wikipedia")
.granularity(GranularityType.Hour)
.interval("2011-06-01/2017-06-01")
.dimensions("countryIsoCode")
.build()

val request: Future[List[DruidSearchResult]] = query.execute().map(_.list)
```

In contrast to rest of queries, Search query does not take type parameters as its results are of type `ing.wbaa.druid.DruidSearchResult`.

## Query Context

Druid [query context](https://druid.apache.org/docs/latest/querying/query-context.html) is used for various query
Expand All @@ -756,7 +820,7 @@ Consider, for example, a group-by query with custom `query id` and `priority`:

```scala
val query: GroupByQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.agg(count as "count")
Expand All @@ -773,7 +837,7 @@ Alternatively, context parameters can also be specified one each time by using t

```scala
val query: GroupByQuery = DQL
.from("wikiticker")
.from("wikipedia")
.granularity(GranularityType.Day)
.interval("2011-06-01/2017-06-01")
.agg(count as "count")
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ druid = {
client-backend = "ing.wbaa.druid.client.DruidHttpClient"
client-backend = ${?DRUID_CLIENT_BACKEND}

// When it is required, Scruid also supports legacy mode in Scan Queries. By default, legacy mode is turned off.
// For details see https://druid.apache.org/docs/latest/querying/scan-query.html#legacy-mode
scan-query-legacy-mode = false
scan-query-legacy-mode = ${?DRUID_SCAN_QUERY_LEGACY_MODE}

client-config = {

Expand Down
Loading

0 comments on commit a4317b6

Please sign in to comment.