Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Front end rehaul #212

Merged
merged 19 commits into from
Jan 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,24 @@ Add Tamer as a dependency to your project:
libraryDependencies += "io.laserdisc" %% "tamer" % version
```

See [here](example/src/main/scala/tamer/example/Main.scala) for a sample application that makes use of Tamer.
See [here](example/src/main/scala/tamer/example/DatabaseSimple.scala) for a sample application that makes use of Tamer.

## End to end testing

### Database module

Basic manual testing is available for the code in the example module `tamer.example.DatabaseSimple`
(and/or `tamer.example.DatabaseGeneralized`).
This code covers getting data from a synthetic Postgres database.

Make sure you have docker installed before proceeding.

From the `local` folder launch `docker-compose up` (you can enter `docker-compose down`
if you want to start from scratch). After that you should be able to access the kafka
gui from [http://localhost:8000](http://localhost:8000).

Start the `runDb.sh` program which contains some example environment variables.
sirocchj marked this conversation as resolved.
Show resolved Hide resolved
If tamer works you should see messages appearing in the kafka gui.

## License

Expand Down
22 changes: 16 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ lazy val D = new {
val zio = Seq(
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`,
"dev.zio" %% "zio-kafka" % V.`zio-kafka`,
"dev.zio" %% "zio-streams" % V.zio
"dev.zio" %% "zio-streams" % V.zio,
"dev.zio" %% "zio-test" % V.zio
)
}

Expand Down Expand Up @@ -135,15 +136,15 @@ lazy val commonSettings = Seq(
licenses += "MIT" -> url("http://opensource.org/licenses/MIT"),
developers += Developer("sirocchj", "Julien Sirocchi", "[email protected]", url("https://github.com/sirocchj")),
scalacOptions ++= versionDependent(scalaVersion.value),
resolvers += "confluent" at "https://packages.confluent.io/maven/"
resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven/")
)

lazy val tamer = project
.in(file("core"))
.settings(commonSettings)
.settings(
name := "tamer",
libraryDependencies ++= (D.cats ++ D.config ++ D.doobie ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio)
name := "tamer-core",
libraryDependencies ++= (D.cats ++ D.config ++ D.kafka ++ D.logs ++ D.refined ++ D.serialization ++ D.silencer ++ D.tests ++ D.zio)
.map(_.withSources)
.map(_.withJavadoc),
libraryDependencies ++= D.avro,
Expand All @@ -152,10 +153,19 @@ lazy val tamer = project
Test / console / scalacOptions := (Compile / console / scalacOptions).value
)

lazy val doobie = project
.in(file("doobie"))
.dependsOn(tamer)
.settings(commonSettings)
.settings(
name := "tamer-doobie",
libraryDependencies ++= D.doobie
)

lazy val example = project
.in(file("example"))
.enablePlugins(JavaAppPackaging)
.dependsOn(tamer)
.dependsOn(tamer, doobie)
.settings(commonSettings)
.settings(
libraryDependencies ++= D.postgres,
Expand All @@ -164,7 +174,7 @@ lazy val example = project

lazy val root = project
.in(file("."))
.aggregate(tamer, example)
.aggregate(tamer, example, doobie)
.settings(commonSettings)
.settings(
publish / skip := true,
Expand Down
50 changes: 8 additions & 42 deletions core/src/main/scala/tamer/Setup.scala
Original file line number Diff line number Diff line change
@@ -1,48 +1,14 @@
package tamer

import com.sksamuel.avro4s._
import doobie.util.query.Query0
import tamer.registry.{Registry, Topic}
import zio.UIO
import zio.kafka.serde.Serializer

final case class ResultMetadata(queryExecutionTime: Long)
final case class QueryResult[V](metadata: ResultMetadata, results: List[V])

final case class Setup[K, V, State](
keySerializer: Serializer[Registry with Topic, K],
valueSerializer: Serializer[Registry with Topic, V],
stateSerde: ZSerde[Registry with Topic, State],
valueToKey: V => K,
defaultState: State,
buildQuery: State => Query0[V],
stateFoldM: State => QueryResult[V] => UIO[State]
)

object Setup {
final def avro[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor, State <: Product: Decoder: Encoder: SchemaFor](
defaultState: State
)(
buildQuery: State => Query0[V]
)(
valueToKey: V => K,
stateFoldM: State => QueryResult[V] => UIO[State]
): Setup[K, V, State] =
Setup(Serde[K](isKey = true).serializer, Serde[V]().serializer, Serde[State]().serde, valueToKey, defaultState, buildQuery, stateFoldM)

final def avroSimple[K <: Product: Decoder: Encoder: SchemaFor, V <: Product: Decoder: Encoder: SchemaFor](
defaultState: V
)(
buildQuery: V => Query0[V],
valueToKey: V => K
): Setup[K, V, V] =
Setup(
Serde[K](isKey = true).serializer,
Serde[V]().serializer,
Serde[V]().serde,
valueToKey,
defaultState,
buildQuery,
_ => r => UIO(r.results.last)
)
abstract class Setup[-K, -V, S](
val keySerializer: Serializer[Registry with Topic, K],
val valueSerializer: Serializer[Registry with Topic, V],
val stateSerde: ZSerde[Registry with Topic, S],
val defaultState: S,
val stateKey: Int
) {
def show: String = "not available, please implement the show method to display setup"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not ?

Suggested change
def show: String = "not available, please implement the show method to display setup"
def show: String

Copy link
Member

@sirocchj sirocchj Jan 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @barambani here, can we remove the default message and leave def show: String abstract or is there a reason we're missing?

}
28 changes: 0 additions & 28 deletions core/src/main/scala/tamer/TamerApp.scala

This file was deleted.

25 changes: 2 additions & 23 deletions core/src/main/scala/tamer/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import zio.interop.catz._
import scala.concurrent.duration.FiniteDuration

object Config {
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there's two refinement types left in config's package object that should be moved to the doobie module in the db package object: UriString and Password

final case class Query(fetchChunkSize: PosInt)
final case class KafkaSink(topic: NonEmptyString)
final case class KafkaState(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString)
final case class Kafka(
Expand All @@ -25,20 +23,10 @@ object Config {
sink: KafkaSink,
state: KafkaState
)
final case class Tamer(db: Db, query: Query, kafka: Kafka)

private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] =
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode)

private[this] val dbConfigValue = (
env("DATABASE_DRIVER").as[NonEmptyString],
env("DATABASE_URL").as[UriString],
env("DATABASE_USERNAME").as[NonEmptyString],
env("DATABASE_PASSWORD").as[Password].redacted
).parMapN(Db)

private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query)

private[this] val kafkaSinkConfigValue = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSink)
private[this] val kafkaStateConfigValue = (
env("KAFKA_STATE_TOPIC").as[NonEmptyString],
Expand All @@ -53,17 +41,8 @@ object Config {
kafkaSinkConfigValue,
kafkaStateConfigValue
).parMapN(Kafka)
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply)

trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
val kafkaConfig: URIO[KafkaConfig, Kafka]
}

val live: Layer[TamerError, TamerConfig] = ZLayer.fromEffectMany {
tamerConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
case Tamer(db, query, kafka) => Has(db) ++ Has(query) ++ Has(kafka)
}
val live: Layer[TamerError, KafkaConfig] = ZLayer.fromEffect {
kafkaConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce))
}
}
11 changes: 1 addition & 10 deletions core/src/main/scala/tamer/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@ import eu.timepit.refined.api.Refined
import eu.timepit.refined.boolean.{And, Or}
import eu.timepit.refined.collection.{Forall, NonEmpty}
import eu.timepit.refined.string.{IPv4, Uri, Url}
import zio.{Has, URIO, ZIO}
import zio.Has

package object config {
type HostList = List[String] Refined (NonEmpty And Forall[IPv4 Or Uri])
type Password = String
type UriString = String Refined Uri
type UrlString = String Refined Url

type DbConfig = Has[Config.Db]
type QueryConfig = Has[Config.Query]
type KafkaConfig = Has[Config.Kafka]
type TamerConfig = DbConfig with QueryConfig with KafkaConfig

val dbConfig: URIO[DbConfig, Config.Db] = ZIO.access(_.get)
val queryConfig: URIO[QueryConfig, Config.Query] = ZIO.access(_.get)
val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get)
}
80 changes: 0 additions & 80 deletions core/src/main/scala/tamer/db/Db.scala

This file was deleted.

12 changes: 0 additions & 12 deletions core/src/main/scala/tamer/db/package.scala

This file was deleted.

Loading