From 5eca362be45bde9e0155e1d0b4560d2093d94512 Mon Sep 17 00:00:00 2001 From: Anastasios Skarlatidis Date: Fri, 1 May 2020 03:21:46 +0300 Subject: [PATCH 1/2] Deletes Select queries. Since Druid 0.17.0, it has been removed and replaced by the Scan query Updates docker images - Druid 0.18.0 - Zookeeper 3.5.7 Updates documentation and unit tests --- .env | 4 +- README.md | 4 +- docker/druid/Dockerfile | 2 +- docs/dql.md | 32 +---------- .../scala/ing/wbaa/druid/DruidQuery.scala | 29 ---------- .../scala/ing/wbaa/druid/DruidResponse.scala | 43 +------------- .../ing/wbaa/druid/client/DruidClient.scala | 5 -- .../ing/wbaa/druid/dql/QueryBuilder.scala | 56 ------------------- .../druid/DruidAdvancedHttpClientSpec.scala | 2 +- .../ing/wbaa/druid/DruidClientSpec.scala | 2 +- .../scala/ing/wbaa/druid/dql/DQLSpec.scala | 37 ------------ 11 files changed, 12 insertions(+), 204 deletions(-) diff --git a/.env b/.env index e87942a..022db28 100644 --- a/.env +++ b/.env @@ -1,5 +1,5 @@ -ZOOKEEPER_VERSION=3.5.6 -DRUID_VERSION=0.16.1-incubating +ZOOKEEPER_VERSION=3.5.7 +DRUID_VERSION=0.18.0 OPENRESTY_VERSION=1.15.8.2-6 # ClusterF chaos proxy version is defined by the corresponding git commit hash id CHAOS_PROXY_VERSION=39874835 \ No newline at end of file diff --git a/README.md b/README.md index 796a81a..1488dd7 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Scruid (Scala+Druid) is an open source library that allows you to compose Druid queries easily in Scala. The library will take care of the translation of the query into json, parse the result in the case class that you define. -Currently the API is under heavy development, so changes might occur. +Currently, the API is under heavy development, so changes might occur. ## Example queries: @@ -312,7 +312,7 @@ All parameters of `DruidConfig` are optional, and in case that some parameter is ## Druid Clients Scruid provides two client implementations, one for simple requests over a single Druid query host (default) and -an advanced one with queue, cached pool connections and a load balancer when multiple Druid query hosts are provided. +an advanced one with a queue, cached pool connections and, a load balancer when multiple Druid query hosts are provided. Depending on your use case, it is also possible to create a custom client. For details regarding clients, their configuration, as well the creation of a custom one see the [Scruid Clients](docs/scruid_clients.md) documentation. diff --git a/docker/druid/Dockerfile b/docker/druid/Dockerfile index 64d66da..280d749 100644 --- a/docker/druid/Dockerfile +++ b/docker/druid/Dockerfile @@ -13,7 +13,7 @@ RUN set -ex; apk add --no-cache bash perl python curl # druid RUN set -ex; \ - curl -fsLS "https://www.apache.org/dyn/closer.cgi?filename=/incubator/druid/$DRUID_VERSION/apache-druid-$DRUID_VERSION-bin.tar.gz&action=download" | gunzip | tar x -C /opt; \ + curl -fsLS "https://www.apache.org/dyn/closer.cgi?filename=/druid/$DRUID_VERSION/apache-druid-$DRUID_VERSION-bin.tar.gz&action=download" | gunzip | tar x -C /opt; \ mv /opt/apache-druid-${DRUID_VERSION} /opt/druid # zookeeper diff --git a/docs/dql.md b/docs/dql.md index 8bb9a32..040ccfe 100644 --- a/docs/dql.md +++ b/docs/dql.md @@ -3,7 +3,7 @@ Scruid provides a rich Scala API for building queries using the fluent pattern. In order to use DQL, you have to import `ing.wbaa.druid.dql.DSL._` and thereafter build a query using the `DQL` query -builder. The type of the query can be time-series, group-by, top-n, select, scan or search. +builder. The type of the query can be time-series, group-by, top-n, scan or search. For all any type of queries you can define the following: @@ -747,33 +747,9 @@ val query: GroupByQuery = DQL val response: Future[List[GroupByIsAnonymous]] = query.execute().map(_.list[GroupByIsAnonymous]) ``` -#### Select query - -The following query performs select over the dimensions `channel`, `cityName`, `countryIsoCode` and `user`: - -```scala -case class SelectResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String]) - -val query: SelectQuery = DQL - .select(threshold = 10) - .from("wikipedia") - .dimensions(d"channel", d"cityName", d"countryIsoCode", d"user") - .granularity(GranularityType.Hour) - .interval("2011-06-01/2017-06-01") - .build() - -val response: Future[List[SelectResult]] = query.execute().map(_.list[SelectResult]) -``` -Select queries support pagination, in the example above the pagination threshold is set to 10 rows per block of -paginated results. The resulting response, however, is being flattened to a single list of results -(due to `_.list[SelectResult]`). The pagination can be controlled with the parameters of the official -[Druid documentation](https://druid.apache.org/docs/latest/querying/select-query.html#result-pagination). - - #### Scan query -Similar to `SelectQuery`, the following query performs scan over the dimensions `channel`, `cityName`, `countryIsoCode` -and `user`: +The following query performs scan over the dimensions `channel`, `cityName`, `countryIsoCode` and `user`: ```scala case class ScanResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String]) @@ -787,10 +763,6 @@ val query: ScanQuery = DQL .build() ``` -[Scan query](https://druid.apache.org/docs/latest/querying/scan-query.html) is more efficient than -[Select query](https://druid.apache.org/docs/latest/querying/select-query.html). The main difference is that it does -not support pagination, but is able to return a virtually unlimited number of results. - #### Search query The following query performs case insensitive [search](https://druid.apache.org/docs/latest/querying/searchquery.html) over the dimensions `countryIsoCode`: diff --git a/src/main/scala/ing/wbaa/druid/DruidQuery.scala b/src/main/scala/ing/wbaa/druid/DruidQuery.scala index a94ed1e..2ab4c7c 100644 --- a/src/main/scala/ing/wbaa/druid/DruidQuery.scala +++ b/src/main/scala/ing/wbaa/druid/DruidQuery.scala @@ -36,7 +36,6 @@ object QueryType extends EnumCodec[QueryType] { case object GroupBy extends QueryType case object Timeseries extends QueryType case object Scan extends QueryType - case object Select extends QueryType case object Search extends QueryType val values: Set[QueryType] = sealerate.values[QueryType] } @@ -65,7 +64,6 @@ object DruidQuery { case x: TimeSeriesQuery => x.asJsonObject case x: TopNQuery => x.asJsonObject case x: ScanQuery => x.asJsonObject - case x: SelectQuery => x.asJsonObject case x: SearchQuery => x.asJsonObject }).add("queryType", query.queryType.asJson) .add("dataSource", query.dataSource.asJson) @@ -252,33 +250,6 @@ object ScanQuery { } } -case class SelectQuery( - granularity: Granularity, - intervals: Iterable[String], - pagingSpec: PagingSpec, - filter: Option[Filter] = None, - descending: Boolean = false, - dimensions: Iterable[Dimension] = Iterable.empty, - metrics: Iterable[String] = Iterable.empty, - context: Map[QueryContextParam, QueryContextValue] = Map.empty -)(implicit val config: DruidConfig = DruidConfig.DefaultConfig) - extends DruidQuery - with DruidQueryFunctions { - val queryType = QueryType.Select - val dataSource: String = config.datasource -} - -case class PagingSpec( - threshold: Int, - fromNext: Boolean = true, - pagingIdentifiers: Map[String, Int] = Map.empty -) - -object PagingSpec { - def legacy(threshold: Int, pagingIdentifiers: Map[String, Int] = Map.empty): PagingSpec = - new PagingSpec(threshold, false, pagingIdentifiers) -} - case class SearchQuery( granularity: Granularity, intervals: Iterable[String], diff --git a/src/main/scala/ing/wbaa/druid/DruidResponse.scala b/src/main/scala/ing/wbaa/druid/DruidResponse.scala index 8df718e..0a59c1e 100644 --- a/src/main/scala/ing/wbaa/druid/DruidResponse.scala +++ b/src/main/scala/ing/wbaa/druid/DruidResponse.scala @@ -38,9 +38,6 @@ sealed trait DruidResponse extends CirceDecoders { case class DruidResponseTimeseriesImpl(results: List[DruidResult], queryType: QueryType) extends DruidResponse { - private implicit val decoderDruidSelectEvent = DruidSelectEvent.decoder - private implicit val decoderDruidSelectResult = DruidSelectEvents.decoder - private def decodeList[T](implicit decoder: Decoder[T]): List[T] = results.map { result => result.as[T](decoder) } @@ -49,20 +46,14 @@ case class DruidResponseTimeseriesImpl(results: List[DruidResult], queryType: Qu decoder.decodeJson(result).toTry.get override def list[T](implicit decoder: Decoder[T]): List[T] = queryType match { - case QueryType.TopN => decodeList[List[T]].flatten - case QueryType.Select => decodeList[DruidSelectEvents].flatMap(_.events.map(_.as[T])) - case _ => decodeList[T] + case QueryType.TopN => decodeList[List[T]].flatten + case _ => decodeList[T] } override def series[T](implicit decoder: Decoder[T]): ListMap[ZonedDateTime, List[T]] = results.foldLeft[ListMap[ZonedDateTime, List[T]]](ListMap.empty) { case (acc, DruidResult(timestamp, result)) => - val elements = queryType match { - case QueryType.Select => - decode[DruidSelectEvents](result).events.map(_.as[T]) - case _ => - List(decode(result)(decoder)) - } + val elements = List(decode(result)(decoder)) acc ++ ListMap( timestamp -> (acc.getOrElse(timestamp, List.empty[T]) ++ elements) @@ -96,34 +87,6 @@ object DruidResult extends CirceDecoders { } } -case class DruidSelectEvent( - segmentId: String, - offset: Long, - event: Json -) extends BaseResult - with CirceDecoders { - - def as[T](implicit decoder: Decoder[T]): T = decoder.decodeJson(this.event).toTry.get - - override val timestamp: ZonedDateTime = - event.hcursor.downField("timestamp").as[ZonedDateTime].toTry.get -} - -object DruidSelectEvent { - implicit val decoder = deriveDecoder[DruidSelectEvent] -} - -case class DruidSelectEvents( - pagingIdentifiers: Map[String, Long], - dimensions: Seq[String], - metrics: Seq[String], - events: List[DruidSelectEvent] -) - -object DruidSelectEvents { - implicit val decoder = deriveDecoder[DruidSelectEvents] -} - case class DruidScanResponse(results: List[DruidScanResults]) extends DruidResponse { override def list[T](implicit decoder: Decoder[T]): List[T] = results.flatMap(_.as[T]) diff --git a/src/main/scala/ing/wbaa/druid/client/DruidClient.scala b/src/main/scala/ing/wbaa/druid/client/DruidClient.scala index b875fc7..4a0e5fc 100644 --- a/src/main/scala/ing/wbaa/druid/client/DruidClient.scala +++ b/src/main/scala/ing/wbaa/druid/client/DruidClient.scala @@ -178,11 +178,6 @@ trait DruidResponseHandler extends CirceDecoders { CirceStreamSupport .decode[List[DruidScanResults]](AsyncParser.ValueStream) .mapConcat(_.flatMap(_.events)) - case QueryType.Select => - CirceStreamSupport - .decode[DruidResult](AsyncParser.UnwrapArray) - .mapConcat(_.as[DruidSelectEvents].events) - case _ => CirceStreamSupport.decode[DruidResult](AsyncParser.UnwrapArray) } diff --git a/src/main/scala/ing/wbaa/druid/dql/QueryBuilder.scala b/src/main/scala/ing/wbaa/druid/dql/QueryBuilder.scala index f5a322f..48edd26 100644 --- a/src/main/scala/ing/wbaa/druid/dql/QueryBuilder.scala +++ b/src/main/scala/ing/wbaa/druid/dql/QueryBuilder.scala @@ -199,22 +199,6 @@ final class QueryBuilder private[dql] () def groupBy(dimensions: Iterable[Dim]): GroupByQueryBuilder = copyTo(new GroupByQueryBuilder(dimensions)) - /** - * Define that the query will be a select query - * - * @param pagingSpec the paging specification - * @return the builder for select queries - */ - def select(pagingSpec: PagingSpec): SelectQueryBuilder = - copyTo(new SelectQueryBuilder(pagingSpec)) - - def select( - threshold: Int, - fromNext: Boolean = true, - pagingIdentifiers: Map[String, Int] = Map.empty - ): SelectQueryBuilder = - copyTo(new SelectQueryBuilder(PagingSpec(threshold, fromNext, pagingIdentifiers))) - def scan(): ScanQueryBuilder = copyTo(new ScanQueryBuilder()) def search(q: SearchQuerySpec): SearchQueryBuilder = copyTo(new SearchQueryBuilder(q)) @@ -271,7 +255,6 @@ final class TopNQueryBuilder private[dql] (dimension: Dim, metric: String, n: In .map(ds => druidConfig.copy(datasource = ds)) .getOrElse(druidConfig) - TopNQuery( dimension = this.dimension.build(), threshold = n, @@ -441,45 +424,6 @@ final class ScanQueryBuilder private[dql] () extends QueryBuilderCommons { } } -final class SelectQueryBuilder private[dql] (pagingSpec: PagingSpec) - extends QueryBuilderCommons - with DescendingOption { - - private var allMetrics: List[String] = Nil - private var dimensions: List[Dim] = Nil - - def dimensions(dims: Dim*): this.type = this.dimensions(dims) - - def dimensions(dims: Iterable[Dim]): this.type = { - dimensions = dims.foldRight(dimensions)((dim, acc) => dim :: acc) - this - } - - def metrics(metrics: String*): this.type = this.metrics(metrics) - - def metrics(metrics: Iterable[String]): this.type = { - allMetrics = metrics.foldRight(allMetrics)((metric, acc) => metric :: acc) - this - } - - def build()(implicit druidConfig: DruidConfig = DruidConfig.DefaultConfig): SelectQuery = { - val conf = dataSourceOpt - .map(ds => druidConfig.copy(datasource = ds)) - .getOrElse(druidConfig) - - SelectQuery( - granularity = this.granularityOpt.getOrElse(GranularityType.All), - intervals = this.intervals, - pagingSpec = this.pagingSpec, - filter = this.getFilters, - descending = this.descending, - dimensions = this.dimensions.map(_.build()), - metrics = this.allMetrics, - context = this.queryContextParams - )(conf) - } -} - final class SearchQueryBuilder private[dql] (query: SearchQuerySpec) extends QueryBuilderCommons { private var sortOpt: Option[DimensionOrderType] = None diff --git a/src/test/scala/ing/wbaa/druid/DruidAdvancedHttpClientSpec.scala b/src/test/scala/ing/wbaa/druid/DruidAdvancedHttpClientSpec.scala index 2a7cc69..fb0e7bb 100644 --- a/src/test/scala/ing/wbaa/druid/DruidAdvancedHttpClientSpec.scala +++ b/src/test/scala/ing/wbaa/druid/DruidAdvancedHttpClientSpec.scala @@ -249,7 +249,7 @@ class DruidAdvancedHttpClientSpec extends WordSpec with Matchers with ScalaFutur case exception: HttpStatusException => exception.status shouldBe StatusCodes.InternalServerError exception.entity.isFailure shouldBe false - exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Instantiation of [simple type, class org.apache.druid.query.spec.LegacySegmentSpec] value failed: Format requires a '/' separator: invalid interval (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.JsonMappingException\",\"host\":null}" + exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Cannot construct instance of `org.apache.druid.query.spec.LegacySegmentSpec`, problem: Format requires a '/' separator: invalid interval\\n at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 186] (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.exc.ValueInstantiationException\",\"host\":null}" case response => fail(s"expected HttpStatusException, got $response") } diff --git a/src/test/scala/ing/wbaa/druid/DruidClientSpec.scala b/src/test/scala/ing/wbaa/druid/DruidClientSpec.scala index ad1ca6b..f50bf00 100644 --- a/src/test/scala/ing/wbaa/druid/DruidClientSpec.scala +++ b/src/test/scala/ing/wbaa/druid/DruidClientSpec.scala @@ -142,7 +142,7 @@ class DruidClientSpec extends WordSpec with Matchers with ScalaFutures { case exception: HttpStatusException => exception.status shouldBe StatusCodes.InternalServerError exception.entity.isFailure shouldBe false - exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Instantiation of [simple type, class org.apache.druid.query.spec.LegacySegmentSpec] value failed: Format requires a '/' separator: invalid interval (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.JsonMappingException\",\"host\":null}" + exception.entity.get.data.utf8String shouldBe "{\"error\":\"Unknown exception\",\"errorMessage\":\"Cannot construct instance of `org.apache.druid.query.spec.LegacySegmentSpec`, problem: Format requires a '/' separator: invalid interval\\n at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 186] (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery[\\\"intervals\\\"])\",\"errorClass\":\"com.fasterxml.jackson.databind.exc.ValueInstantiationException\",\"host\":null}" case response => fail(s"expected HttpStatusException, got $response") } diff --git a/src/test/scala/ing/wbaa/druid/dql/DQLSpec.scala b/src/test/scala/ing/wbaa/druid/dql/DQLSpec.scala index 7c915f5..8d2ddb4 100644 --- a/src/test/scala/ing/wbaa/druid/dql/DQLSpec.scala +++ b/src/test/scala/ing/wbaa/druid/dql/DQLSpec.scala @@ -429,43 +429,6 @@ class DQLSpec extends WordSpec with Matchers with ScalaFutures { } } - "DQL also work with 'select' queries" should { - val resultsThreshold = 10 - val numberOfResults = resultsThreshold * 24 - - val query: SelectQuery = DQL - .select(resultsThreshold) - .dimensions(d"channel", d"cityName", d"countryIsoCode", d"user") - .granularity(GranularityType.Hour) - .interval("2011-06-01/2017-06-01") - .build() - - "successfully be interpreted by Druid" in { - - val request = query.execute() - whenReady(request) { response => - val resultList = response.list[SelectResult] - resultList.size shouldBe numberOfResults - - val resultSeries = response.series[SelectResult] - resultSeries.size shouldBe 24 - resultSeries.map { case (_, results) => results.size }.sum shouldBe numberOfResults - } - } - - "successfully be streamed" in { - val requestSeq = query.streamAs[SelectResult].runWith(Sink.seq) - whenReady(requestSeq) { response => - response.size shouldBe numberOfResults - } - - val requestSeries = query.streamSeriesAs[SelectResult].runWith(Sink.seq) - whenReady(requestSeries) { response => - response.size shouldBe numberOfResults - } - } - } - "DQL also work with 'scan' queries" should { val numberOfResults = 100 From 3f6237b4d92379dbc63980a4e7913d9f33bcc526 Mon Sep 17 00:00:00 2001 From: Anastasios Skarlatidis Date: Sat, 2 May 2020 15:13:28 +0300 Subject: [PATCH 2/2] Updates the list of developers --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index edb8fef..0db010e 100644 --- a/build.sbt +++ b/build.sbt @@ -76,7 +76,8 @@ lazy val commonSettings: Seq[Setting[_]] = Seq( developers in ThisBuild := List( Developer("fokko", "Fokko Driesprong", "@fokkodriesprong", url("https://github.com/fokko")), Developer("bjgbeelen", "Bas Beelen", "", url("https://github.com/bjgbeelen")), - Developer("krisgeus", "Kris Geusebroek", "", url("https://github.com/krisgeus")) + Developer("krisgeus", "Kris Geusebroek", "", url("https://github.com/krisgeus")), + Developer("anskarl", "Anastasios Skarlatidis", "", url("https://github.com/anskarl")) ), scmInfo in ThisBuild := Some( ScmInfo(