Skip to content

Commit

Permalink
Merge pull request #89 from anskarl/feature/druid018
Browse files Browse the repository at this point in the history
Deletes Select queries, updates docker images, docs and tests
  • Loading branch information
krisgeus authored May 2, 2020
2 parents d268515 + 3f6237b commit 6ca225b
Show file tree
Hide file tree
Showing 12 changed files with 14 additions and 205 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ZOOKEEPER_VERSION=3.5.6
DRUID_VERSION=0.16.1-incubating
ZOOKEEPER_VERSION=3.5.7
DRUID_VERSION=0.18.0
OPENRESTY_VERSION=1.15.8.2-6
# ClusterF chaos proxy version is defined by the corresponding git commit hash id
CHAOS_PROXY_VERSION=39874835
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Scruid (Scala+Druid) is an open source library that allows you to compose Druid queries easily in Scala. The library will take care of the translation of the query into json, parse the result in the case class that you define.

Currently the API is under heavy development, so changes might occur.
Currently, the API is under heavy development, so changes might occur.

## Example queries:

Expand Down Expand Up @@ -312,7 +312,7 @@ All parameters of `DruidConfig` are optional, and in case that some parameter is
## Druid Clients

Scruid provides two client implementations, one for simple requests over a single Druid query host (default) and
an advanced one with queue, cached pool connections and a load balancer when multiple Druid query hosts are provided.
an advanced one with a queue, cached pool connections and, a load balancer when multiple Druid query hosts are provided.
Depending on your use case, it is also possible to create a custom client. For details regarding clients, their
configuration, as well the creation of a custom one see the [Scruid Clients](docs/scruid_clients.md) documentation.

Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ lazy val commonSettings: Seq[Setting[_]] = Seq(
developers in ThisBuild := List(
Developer("fokko", "Fokko Driesprong", "@fokkodriesprong", url("https://github.com/fokko")),
Developer("bjgbeelen", "Bas Beelen", "", url("https://github.com/bjgbeelen")),
Developer("krisgeus", "Kris Geusebroek", "", url("https://github.com/krisgeus"))
Developer("krisgeus", "Kris Geusebroek", "", url("https://github.com/krisgeus")),
Developer("anskarl", "Anastasios Skarlatidis", "", url("https://github.com/anskarl"))
),
scmInfo in ThisBuild := Some(
ScmInfo(
Expand Down
2 changes: 1 addition & 1 deletion docker/druid/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN set -ex; apk add --no-cache bash perl python curl

# druid
RUN set -ex; \
curl -fsLS "https://www.apache.org/dyn/closer.cgi?filename=/incubator/druid/$DRUID_VERSION/apache-druid-$DRUID_VERSION-bin.tar.gz&action=download" | gunzip | tar x -C /opt; \
curl -fsLS "https://www.apache.org/dyn/closer.cgi?filename=/druid/$DRUID_VERSION/apache-druid-$DRUID_VERSION-bin.tar.gz&action=download" | gunzip | tar x -C /opt; \
mv /opt/apache-druid-${DRUID_VERSION} /opt/druid

# zookeeper
Expand Down
32 changes: 2 additions & 30 deletions docs/dql.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Scruid provides a rich Scala API for building queries using the fluent pattern.

In order to use DQL, you have to import `ing.wbaa.druid.dql.DSL._` and thereafter build a query using the `DQL` query
builder. The type of the query can be time-series, group-by, top-n, select, scan or search.
builder. The type of the query can be time-series, group-by, top-n, scan or search.

For all any type of queries you can define the following:

Expand Down Expand Up @@ -747,33 +747,9 @@ val query: GroupByQuery = DQL
val response: Future[List[GroupByIsAnonymous]] = query.execute().map(_.list[GroupByIsAnonymous])
```

#### Select query

The following query performs select over the dimensions `channel`, `cityName`, `countryIsoCode` and `user`:

```scala
case class SelectResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])

val query: SelectQuery = DQL
.select(threshold = 10)
.from("wikipedia")
.dimensions(d"channel", d"cityName", d"countryIsoCode", d"user")
.granularity(GranularityType.Hour)
.interval("2011-06-01/2017-06-01")
.build()

val response: Future[List[SelectResult]] = query.execute().map(_.list[SelectResult])
```
Select queries support pagination, in the example above the pagination threshold is set to 10 rows per block of
paginated results. The resulting response, however, is being flattened to a single list of results
(due to `_.list[SelectResult]`). The pagination can be controlled with the parameters of the official
[Druid documentation](https://druid.apache.org/docs/latest/querying/select-query.html#result-pagination).


#### Scan query

Similar to `SelectQuery`, the following query performs scan over the dimensions `channel`, `cityName`, `countryIsoCode`
and `user`:
The following query performs scan over the dimensions `channel`, `cityName`, `countryIsoCode` and `user`:

```scala
case class ScanResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])
Expand All @@ -787,10 +763,6 @@ val query: ScanQuery = DQL
.build()
```

[Scan query](https://druid.apache.org/docs/latest/querying/scan-query.html) is more efficient than
[Select query](https://druid.apache.org/docs/latest/querying/select-query.html). The main difference is that it does
not support pagination, but is able to return a virtually unlimited number of results.

#### Search query

The following query performs case insensitive [search](https://druid.apache.org/docs/latest/querying/searchquery.html) over the dimensions `countryIsoCode`:
Expand Down
29 changes: 0 additions & 29 deletions src/main/scala/ing/wbaa/druid/DruidQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ object QueryType extends EnumCodec[QueryType] {
case object GroupBy extends QueryType
case object Timeseries extends QueryType
case object Scan extends QueryType
case object Select extends QueryType
case object Search extends QueryType
val values: Set[QueryType] = sealerate.values[QueryType]
}
Expand Down Expand Up @@ -65,7 +64,6 @@ object DruidQuery {
case x: TimeSeriesQuery => x.asJsonObject
case x: TopNQuery => x.asJsonObject
case x: ScanQuery => x.asJsonObject
case x: SelectQuery => x.asJsonObject
case x: SearchQuery => x.asJsonObject
}).add("queryType", query.queryType.asJson)
.add("dataSource", query.dataSource.asJson)
Expand Down Expand Up @@ -252,33 +250,6 @@ object ScanQuery {
}
}

case class SelectQuery(
granularity: Granularity,
intervals: Iterable[String],
pagingSpec: PagingSpec,
filter: Option[Filter] = None,
descending: Boolean = false,
dimensions: Iterable[Dimension] = Iterable.empty,
metrics: Iterable[String] = Iterable.empty,
context: Map[QueryContextParam, QueryContextValue] = Map.empty
)(implicit val config: DruidConfig = DruidConfig.DefaultConfig)
extends DruidQuery
with DruidQueryFunctions {
val queryType = QueryType.Select
val dataSource: String = config.datasource
}

case class PagingSpec(
threshold: Int,
fromNext: Boolean = true,
pagingIdentifiers: Map[String, Int] = Map.empty
)

object PagingSpec {
def legacy(threshold: Int, pagingIdentifiers: Map[String, Int] = Map.empty): PagingSpec =
new PagingSpec(threshold, false, pagingIdentifiers)
}

case class SearchQuery(
granularity: Granularity,
intervals: Iterable[String],
Expand Down
43 changes: 3 additions & 40 deletions src/main/scala/ing/wbaa/druid/DruidResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ sealed trait DruidResponse extends CirceDecoders {
case class DruidResponseTimeseriesImpl(results: List[DruidResult], queryType: QueryType)
extends DruidResponse {

private implicit val decoderDruidSelectEvent = DruidSelectEvent.decoder
private implicit val decoderDruidSelectResult = DruidSelectEvents.decoder

private def decodeList[T](implicit decoder: Decoder[T]): List[T] = results.map { result =>
result.as[T](decoder)
}
Expand All @@ -49,20 +46,14 @@ case class DruidResponseTimeseriesImpl(results: List[DruidResult], queryType: Qu
decoder.decodeJson(result).toTry.get

override def list[T](implicit decoder: Decoder[T]): List[T] = queryType match {
case QueryType.TopN => decodeList[List[T]].flatten
case QueryType.Select => decodeList[DruidSelectEvents].flatMap(_.events.map(_.as[T]))
case _ => decodeList[T]
case QueryType.TopN => decodeList[List[T]].flatten
case _ => decodeList[T]
}

override def series[T](implicit decoder: Decoder[T]): ListMap[ZonedDateTime, List[T]] =
results.foldLeft[ListMap[ZonedDateTime, List[T]]](ListMap.empty) {
case (acc, DruidResult(timestamp, result)) =>
val elements = queryType match {
case QueryType.Select =>
decode[DruidSelectEvents](result).events.map(_.as[T])
case _ =>
List(decode(result)(decoder))
}
val elements = List(decode(result)(decoder))

acc ++ ListMap(
timestamp -> (acc.getOrElse(timestamp, List.empty[T]) ++ elements)
Expand Down Expand Up @@ -96,34 +87,6 @@ object DruidResult extends CirceDecoders {
}
}

case class DruidSelectEvent(
segmentId: String,
offset: Long,
event: Json
) extends BaseResult
with CirceDecoders {

def as[T](implicit decoder: Decoder[T]): T = decoder.decodeJson(this.event).toTry.get

override val timestamp: ZonedDateTime =
event.hcursor.downField("timestamp").as[ZonedDateTime].toTry.get
}

object DruidSelectEvent {
implicit val decoder = deriveDecoder[DruidSelectEvent]
}

case class DruidSelectEvents(
pagingIdentifiers: Map[String, Long],
dimensions: Seq[String],
metrics: Seq[String],
events: List[DruidSelectEvent]
)

object DruidSelectEvents {
implicit val decoder = deriveDecoder[DruidSelectEvents]
}

case class DruidScanResponse(results: List[DruidScanResults]) extends DruidResponse {

override def list[T](implicit decoder: Decoder[T]): List[T] = results.flatMap(_.as[T])
Expand Down
5 changes: 0 additions & 5 deletions src/main/scala/ing/wbaa/druid/client/DruidClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ trait DruidResponseHandler extends CirceDecoders {
CirceStreamSupport
.decode[List[DruidScanResults]](AsyncParser.ValueStream)
.mapConcat(_.flatMap(_.events))
case QueryType.Select =>
CirceStreamSupport
.decode[DruidResult](AsyncParser.UnwrapArray)
.mapConcat(_.as[DruidSelectEvents].events)

case _ =>
CirceStreamSupport.decode[DruidResult](AsyncParser.UnwrapArray)
}
Expand Down
56 changes: 0 additions & 56 deletions src/main/scala/ing/wbaa/druid/dql/QueryBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,6 @@ final class QueryBuilder private[dql] ()
def groupBy(dimensions: Iterable[Dim]): GroupByQueryBuilder =
copyTo(new GroupByQueryBuilder(dimensions))

/**
* Define that the query will be a select query
*
* @param pagingSpec the paging specification
* @return the builder for select queries
*/
def select(pagingSpec: PagingSpec): SelectQueryBuilder =
copyTo(new SelectQueryBuilder(pagingSpec))

def select(
threshold: Int,
fromNext: Boolean = true,
pagingIdentifiers: Map[String, Int] = Map.empty
): SelectQueryBuilder =
copyTo(new SelectQueryBuilder(PagingSpec(threshold, fromNext, pagingIdentifiers)))

def scan(): ScanQueryBuilder = copyTo(new ScanQueryBuilder())

def search(q: SearchQuerySpec): SearchQueryBuilder = copyTo(new SearchQueryBuilder(q))
Expand Down Expand Up @@ -271,7 +255,6 @@ final class TopNQueryBuilder private[dql] (dimension: Dim, metric: String, n: In
.map(ds => druidConfig.copy(datasource = ds))
.getOrElse(druidConfig)


TopNQuery(
dimension = this.dimension.build(),
threshold = n,
Expand Down Expand Up @@ -441,45 +424,6 @@ final class ScanQueryBuilder private[dql] () extends QueryBuilderCommons {
}
}

final class SelectQueryBuilder private[dql] (pagingSpec: PagingSpec)
extends QueryBuilderCommons
with DescendingOption {

private var allMetrics: List[String] = Nil
private var dimensions: List[Dim] = Nil

def dimensions(dims: Dim*): this.type = this.dimensions(dims)

def dimensions(dims: Iterable[Dim]): this.type = {
dimensions = dims.foldRight(dimensions)((dim, acc) => dim :: acc)
this
}

def metrics(metrics: String*): this.type = this.metrics(metrics)

def metrics(metrics: Iterable[String]): this.type = {
allMetrics = metrics.foldRight(allMetrics)((metric, acc) => metric :: acc)
this
}

def build()(implicit druidConfig: DruidConfig = DruidConfig.DefaultConfig): SelectQuery = {
val conf = dataSourceOpt
.map(ds => druidConfig.copy(datasource = ds))
.getOrElse(druidConfig)

SelectQuery(
granularity = this.granularityOpt.getOrElse(GranularityType.All),
intervals = this.intervals,
pagingSpec = this.pagingSpec,
filter = this.getFilters,
descending = this.descending,
dimensions = this.dimensions.map(_.build()),
metrics = this.allMetrics,
context = this.queryContextParams
)(conf)
}
}

final class SearchQueryBuilder private[dql] (query: SearchQuerySpec) extends QueryBuilderCommons {

private var sortOpt: Option[DimensionOrderType] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class DruidAdvancedHttpClientSpec extends WordSpec with Matchers with ScalaFutur
case exception: HttpStatusException =>
exception.status shouldBe StatusCodes.InternalServerError
exception.entity.isFailure shouldBe false
exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Instantiation of [simple type, class org.apache.druid.query.spec.LegacySegmentSpec] value failed: Format requires a '/' separator: invalid interval (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.JsonMappingException\",\"host\":null}"
exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Cannot construct instance of `org.apache.druid.query.spec.LegacySegmentSpec`, problem: Format requires a '/' separator: invalid interval\\n at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 186] (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.exc.ValueInstantiationException\",\"host\":null}"
case response => fail(s"expected HttpStatusException, got $response")
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/ing/wbaa/druid/DruidClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class DruidClientSpec extends WordSpec with Matchers with ScalaFutures {
case exception: HttpStatusException =>
exception.status shouldBe StatusCodes.InternalServerError
exception.entity.isFailure shouldBe false
exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Instantiation of [simple type, class org.apache.druid.query.spec.LegacySegmentSpec] value failed: Format requires a '/' separator: invalid interval (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.JsonMappingException\",\"host\":null}"
exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Cannot construct instance of `org.apache.druid.query.spec.LegacySegmentSpec`, problem: Format requires a '/' separator: invalid interval\\n at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 186] (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.exc.ValueInstantiationException\",\"host\":null}"
case response => fail(s"expected HttpStatusException, got $response")
}

Expand Down
37 changes: 0 additions & 37 deletions src/test/scala/ing/wbaa/druid/dql/DQLSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -429,43 +429,6 @@ class DQLSpec extends WordSpec with Matchers with ScalaFutures {
}
}

"DQL also work with 'select' queries" should {
val resultsThreshold = 10
val numberOfResults = resultsThreshold * 24

val query: SelectQuery = DQL
.select(resultsThreshold)
.dimensions(d"channel", d"cityName", d"countryIsoCode", d"user")
.granularity(GranularityType.Hour)
.interval("2011-06-01/2017-06-01")
.build()

"successfully be interpreted by Druid" in {

val request = query.execute()
whenReady(request) { response =>
val resultList = response.list[SelectResult]
resultList.size shouldBe numberOfResults

val resultSeries = response.series[SelectResult]
resultSeries.size shouldBe 24
resultSeries.map { case (_, results) => results.size }.sum shouldBe numberOfResults
}
}

"successfully be streamed" in {
val requestSeq = query.streamAs[SelectResult].runWith(Sink.seq)
whenReady(requestSeq) { response =>
response.size shouldBe numberOfResults
}

val requestSeries = query.streamSeriesAs[SelectResult].runWith(Sink.seq)
whenReady(requestSeries) { response =>
response.size shouldBe numberOfResults
}
}
}

"DQL also work with 'scan' queries" should {
val numberOfResults = 100

Expand Down

0 comments on commit 6ca225b

Please sign in to comment.