Skip to content
This repository has been archived by the owner on Apr 8, 2021. It is now read-only.

ISC-379: ElasticLogSource implementation #2

Open
wants to merge 9 commits into
base: feature/initial-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ lazy val elasticsearch = project.in(file("instrumentation/elasticsearch"))
.settings(CommonProject.settings)
.settings(
name := "logging-testkit-elasticsearch",
libraryDependencies += aws.logs,
libraryDependencies ++= Seq(
elastic4s.aws,
elastic4s.core,
elastic4s.http
),
coverageMinimum := 100
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2018 Cake Solutions Limited

package net.cakesolutions.testkit.logging.elasticsearch

import scala.concurrent.Future
import com.sksamuel.elastic4s.aws.{Aws4ElasticClient, Aws4ElasticConfig}
import com.sksamuel.elastic4s.http.RequestFailure
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.searches.SearchDefinition
import monix.execution.Scheduler
import monix.reactive.Observable

/**
* Client for querying logs from ElasticSearch.
*
* @param config AWS elasticsearch configuration
*/
class ElasticSearchLogClient(config: Aws4ElasticConfig) {
import ElasticSearchLogClient._

private val elasticSearchClient = Aws4ElasticClient(config)

/**
* Query ElasticSearch and return the results as an Observable.
*
* @param searchDef search query
* @param scheduler monix scheduler
* @return query results as observable stream
*/
def search(searchDef: SearchDefinition)(
implicit scheduler: Scheduler
): Observable[String] = {
Observable
.fromFuture(
elasticSearchClient.execute(searchDef).flatMap {
case Left(failure) =>
Future.failed(ElasticSearchRequestFailureException(failure))
case Right(result) =>
Future.successful(result)
}
)
.flatMap { result =>
Observable.fromIterable(
result.result.hits.hits.map(_.sourceAsString)
)
}
}
}

object ElasticSearchLogClient {

/**
* Execution of an ElasticSearch request failed.
*
* @param requestFailure failure response
*/
case class ElasticSearchRequestFailureException(
requestFailure: RequestFailure
) extends Exception(
s"ElasticSearch request failed: ${requestFailure.error.reason}"
)
}

This file was deleted.

2 changes: 2 additions & 0 deletions project/CommonProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ object CommonProject {
"-Ypartial-unification",
"-Xfatal-warnings"
),
// Disable unused import warnings in Scala console.
scalacOptions in (Compile, console) -= "-Ywarn-unused-import",
scalacOptions in (Compile, doc) ++= {
val nm = (name in(Compile, doc)).value
val ver = (version in(Compile, doc)).value
Expand Down
8 changes: 8 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ object Dependencies {
val reactive: ModuleID = "io.monix" %% "monix-reactive" % version
}

object elastic4s {
private val version = "6.1.4"

val aws: ModuleID = "com.sksamuel.elastic4s" %% "elastic4s-aws" % version
val core: ModuleID = "com.sksamuel.elastic4s" %% "elastic4s-core" % version
val http: ModuleID = "com.sksamuel.elastic4s" %% "elastic4s-http" % version
}

val scalacheck: ModuleID = "org.scalacheck" %% "scalacheck" % "1.13.5"
val scalatest: ModuleID = "org.scalatest" %% "scalatest" % "3.0.5"
}